/*
This file is part of GNUnet.
- (C) 2012 Christian Grothoff (and other contributing authors)
+ (C) 2008--2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
*/
/**
- * @file testbed/gnunet-service-testbed_cache.h
+ * @file testbed/gnunet-service-testbed_cache.c
* @brief testbed cache implementation
* @author Sree Harsha Totakura
*/
GNUNET_log_from (kind, "testbed-cache", __VA_ARGS__)
+/**
+ * Time to expire a cache entry
+ */
+#define CACHE_EXPIRY \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
+
+
/**
* Type of cache-get requests
*/
* The cache entry object this handle corresponds to
*/
struct CacheEntry *entry;
-
+
/**
* The cache callback to call when a handle is available
*/
GST_cache_handle_ready_cb cb;
-
+
/**
* The closure for the above callback
*/
/**
* The peer connect notify context created for this handle; can be NULL
- */
+ */
struct ConnectNotifyContext *nctxt;
/**
/**
* Cache entry
*/
-struct CacheEntry
+struct CacheEntry
{
/**
* DLL next ptr for least recently used cache entries
/**
* The transport handle to the peer corresponding to this entry; can be NULL
*/
- struct GNUNET_TRANSPORT_Handle *transport_handle_;
+ struct GNUNET_TRANSPORT_Handle *transport_handle;
/**
* The operation handle for transport handle
*/
- struct GNUNET_TESTBED_Operation *transport_op_;
+ struct GNUNET_TESTBED_Operation *transport_op;
/**
* The core handle to the peer corresponding to this entry; can be NULL
*/
GNUNET_SCHEDULER_TaskIdentifier notify_task;
+ /**
+ * The task to expire this cache entry, free any handlers it has opened and
+ * mark their corresponding operations as done.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier expire_task;
+
/**
* Number of operations this cache entry is being used
*/
* The id of the peer this entry corresponds to
*/
unsigned int peer_id;
+
+ /**
+ * Is this entry in LRU cache queue?
+ */
+ unsigned int in_lru;
};
/**
* Looks up in the cache and returns the entry
*
- * @param id the peer identity of the peer whose corresponding entry has to be looked up
+ * @param key the peer identity of the peer whose corresponding entry has to be
+ * looked up
* @return the HELLO message; NULL if not found
*/
static struct CacheEntry *
close_handles (struct CacheEntry *entry)
{
struct ConnectNotifyContext *ctxt;
-
+
GNUNET_assert (0 == entry->demand);
- if ((NULL != entry->next) || (NULL != entry->prev))
+ if (GNUNET_YES == entry->in_lru)
{
GNUNET_assert (0 < lru_cache_size);
+ if (GNUNET_SCHEDULER_NO_TASK != entry->expire_task)
+ {
+ GNUNET_SCHEDULER_cancel (entry->expire_task);
+ entry->expire_task = GNUNET_SCHEDULER_NO_TASK;
+ }
GNUNET_CONTAINER_DLL_remove (lru_cache_head, lru_cache_tail, entry);
lru_cache_size--;
+ entry->in_lru = GNUNET_NO;
}
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == entry->expire_task);
while (NULL != (ctxt = entry->nctxt_qhead))
{
GNUNET_CONTAINER_DLL_remove (entry->nctxt_qhead, entry->nctxt_qtail, ctxt);
GNUNET_free (ctxt);
}
LOG_DEBUG ("Cleaning up handles from an entry in cache\n");
- if (NULL != entry->transport_handle_)
+ if (NULL != entry->transport_handle)
+ GNUNET_assert (NULL != entry->transport_op);
+ if (NULL != entry->transport_op)
{
- GNUNET_assert (NULL != entry->transport_op_);
- GNUNET_TESTBED_operation_done (entry->transport_op_);
- entry->transport_op_ = NULL;
+ GNUNET_TESTBED_operation_done (entry->transport_op);
+ entry->transport_op = NULL;
}
- if (NULL != entry->core_handle)
+ if (NULL != entry->core_op)
{
- GNUNET_assert (NULL != entry->core_op);
GNUNET_TESTBED_operation_done (entry->core_op);
entry->core_op = NULL;
}
+ GNUNET_assert (NULL == entry->core_handle);
if (NULL != entry->cfg)
{
GNUNET_CONFIGURATION_destroy (entry->cfg);
}
+/**
+ * The task to expire this cache entry, free any handlers it has opened and
+ * mark their corresponding operations as done.
+ *
+ * @param cls the CacheEntry
+ * @param tc the scheduler task context
+ */
+static void
+expire_cache_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct CacheEntry *entry = cls;
+
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != entry->expire_task);
+ entry->expire_task = GNUNET_SCHEDULER_NO_TASK;
+ close_handles (entry);
+}
+
+
/**
* Creates a new cache entry and then puts it into the cache's hashtable.
*
entry->peer_id = peer_id;
memcpy (&entry->key, key, sizeof (struct GNUNET_HashCode));
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (cache, &entry->key,
- entry,
+ GNUNET_CONTAINER_multihashmap_put (cache, &entry->key, entry,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
cache_size++;
return entry;
{
const struct GSTCacheGetHandle *cgh;
- for (cgh=head; NULL != cgh; cgh=cgh->next)
+ for (cgh = head; NULL != cgh; cgh = cgh->next)
{
if (GNUNET_YES == cgh->notify_called)
return NULL;
switch (cgh->type)
{
case CGT_TRANSPORT_HANDLE:
- if (NULL == entry->transport_handle_)
+ if (NULL == entry->transport_handle)
continue;
break;
case CGT_CORE_HANDLE:
if (NULL == entry->core_handle)
continue;
+ if (NULL == entry->peer_identity) /* Our CORE connection isn't ready yet */
+ continue;
break;
}
break;
- }
+ }
return (struct GSTCacheGetHandle *) cgh;
}
struct CacheEntry *entry = cls;
struct GSTCacheGetHandle *cgh;
const struct GSTCacheGetHandle *cgh2;
-
+
GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != entry->notify_task);
entry->notify_task = GNUNET_SCHEDULER_NO_TASK;
cgh = search_suitable_cgh (entry, entry->cgh_qhead);
if (NULL != cgh2)
entry->notify_task = GNUNET_SCHEDULER_add_now (&call_cgh_cb, entry);
if (NULL != cgh->nctxt)
- {/* Register the peer connect notify callback */
+ { /* Register the peer connect notify callback */
GNUNET_CONTAINER_DLL_insert_tail (entry->nctxt_qhead, entry->nctxt_qtail,
cgh->nctxt);
}
LOG_DEBUG ("Calling notify for handle type %u\n", cgh->type);
- cgh->cb (cgh->cb_cls, entry->core_handle,
- entry->transport_handle_, entry->peer_identity);
+ cgh->cb (cgh->cb_cls, entry->core_handle, entry->transport_handle,
+ entry->peer_identity);
}
* @param peer the peer that connected
* @param type the type of the handle this notification corresponds to
*/
-static void
-peer_connect_notify_cb (void *cls,
- const struct GNUNET_PeerIdentity *peer,
+static void
+peer_connect_notify_cb (void *cls, const struct GNUNET_PeerIdentity *peer,
const enum CacheGetType type)
{
struct CacheEntry *entry = cls;
GST_cache_peer_connect_notify cb;
void *cb_cls;
-
- for (ctxt=entry->nctxt_qhead; NULL != ctxt;)
+
+ for (ctxt = entry->nctxt_qhead; NULL != ctxt;)
{
GNUNET_assert (NULL != ctxt->cgh);
if (type != ctxt->cgh->type)
GNUNET_CONTAINER_DLL_remove (entry->nctxt_qhead, entry->nctxt_qtail, ctxt);
GNUNET_free (ctxt);
ctxt = ctxt2;
- cb (cb_cls, peer);
+ cb (cb_cls, peer);
}
if (NULL == ctxt)
return;
-
+
}
*
* @param cls closure
* @param peer the peer that connected
- * @param ats performance data
- * @param ats_count number of entries in ats (excluding 0-termination)
*/
-static void
+static void
transport_peer_connect_notify_cb (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_ATS_Information *ats,
- uint32_t ats_count)
+ const struct GNUNET_PeerIdentity *peer)
{
peer_connect_notify_cb (cls, peer, CGT_TRANSPORT_HANDLE);
}
GNUNET_assert (NULL != entry);
LOG_DEBUG ("Opening a transport connection to peer %u\n", entry->peer_id);
- entry->transport_handle_ =
- GNUNET_TRANSPORT_connect (entry->cfg,
- NULL, entry,
- NULL,
+ entry->transport_handle =
+ GNUNET_TRANSPORT_connect (entry->cfg, NULL, entry, NULL,
&transport_peer_connect_notify_cb, NULL);
- if (NULL == entry->transport_handle_)
+ if (NULL == entry->transport_handle)
{
GNUNET_break (0);
return;
oprelease_get_handle_transport (void *cls)
{
struct CacheEntry *entry = cls;
-
- if (NULL == entry->transport_handle_)
+
+ if (NULL == entry->transport_handle)
return;
- GNUNET_TRANSPORT_disconnect (entry->transport_handle_);
- entry->transport_handle_ = NULL;
+ GNUNET_TRANSPORT_disconnect (entry->transport_handle);
+ entry->transport_handle = NULL;
}
* @param server handle to the server, NULL if we failed
* @param my_identity ID of this peer, NULL if we failed
*/
-static void
-core_startup_cb (void *cls,
- struct GNUNET_CORE_Handle * server,
+static void
+core_startup_cb (void *cls, struct GNUNET_CORE_Handle *server,
const struct GNUNET_PeerIdentity *my_identity)
{
struct CacheEntry *entry = cls;
return;
}
GNUNET_assert (NULL == entry->peer_identity);
+ GNUNET_break (NULL != server);
entry->core_handle = server;
entry->peer_identity = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
memcpy (entry->peer_identity, my_identity,
sizeof (struct GNUNET_PeerIdentity));
if (0 == entry->demand)
- return;
+ return;
if (GNUNET_SCHEDULER_NO_TASK != entry->notify_task)
return;
if (NULL != search_suitable_cgh (entry, entry->cgh_qhead))
*
* @param cls closure
* @param peer peer identity this notification is about
- * @param atsi performance data for the connection
- * @param atsi_count number of records in 'atsi'
- */
-static void
-core_peer_connect_cb (void *cls,
- const struct GNUNET_PeerIdentity * peer,
- const struct GNUNET_ATS_Information * atsi,
- unsigned int atsi_count)
-{
+ */
+static void
+core_peer_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
+{
peer_connect_notify_cb (cls, peer, CGT_CORE_HANDLE);
}
opstart_get_handle_core (void *cls)
{
struct CacheEntry *entry = cls;
+
const struct GNUNET_CORE_MessageHandler no_handlers[] = {
{NULL, 0, 0}
};
GNUNET_assert (NULL != entry);
LOG_DEBUG ("Opening a CORE connection to peer %u\n", entry->peer_id);
- /* void?: We also get the handle when the connection to CORE is successful */
- (void) GNUNET_CORE_connect (entry->cfg,
- entry,
- &core_startup_cb,
- &core_peer_connect_cb,
- NULL, /* disconnect cb */
- NULL, /* inbound notify */
- GNUNET_NO,
- NULL, /* outbound notify */
- GNUNET_NO,
- no_handlers);
- //GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == entry->notify_task);
+ entry->core_handle =
+ GNUNET_CORE_connect (entry->cfg, entry, /* closure */
+ &core_startup_cb, /* core startup notify */
+ &core_peer_connect_cb, /* peer connect notify */
+ NULL, /* peer disconnect notify */
+ NULL, /* inbound notify */
+ GNUNET_NO, /* inbound header only? */
+ NULL, /* outbound notify */
+ GNUNET_NO, /* outbound header only? */
+ no_handlers);
}
oprelease_get_handle_core (void *cls)
{
struct CacheEntry *entry = cls;
-
+
if (NULL == entry->core_handle)
return;
GNUNET_CORE_disconnect (entry->core_handle);
* lookup in the cache; if not, a new operation is started to open the transport
* handle and will be given in the callback when it is available.
*
- * @param cls the cache entry
+ * @param peer_id the index of the peer
+ * @param cgh the CacheGetHandle
+ * @param cfg the configuration with which the transport handle has to be
+ * created if it was not present in the cache
+ * @param target the peer identify of the peer whose connection to
+ * TRANSPORT/CORE (depending on the type of 'cgh') subsystem will be
+ * notified through the connect_notify_cb. Can be NULL
+ * @param connect_notify_cb the callback to call when the given target peer is
+ * connected. This callback will only be called once or never again (in
+ * case the target peer cannot be connected). Can be NULL
+ * @param connect_notify_cb_cls the closure for the above callback
+ * @return the handle which can be used to cancel or mark that the handle is no
+ * longer being used
*/
static struct GSTCacheGetHandle *
-cache_get_handle (unsigned int peer_id,
- struct GSTCacheGetHandle *cgh,
+cache_get_handle (unsigned int peer_id, struct GSTCacheGetHandle *cgh,
const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_PeerIdentity *target,
GST_cache_peer_connect_notify connect_notify_cb,
entry = cache_lookup (&key);
if (NULL != entry)
{
- if (0 == entry->demand)
+ if (GNUNET_YES == entry->in_lru)
{
+ GNUNET_assert (0 == entry->demand);
GNUNET_assert (0 < lru_cache_size);
+ if (GNUNET_SCHEDULER_NO_TASK != entry->expire_task)
+ {
+ GNUNET_SCHEDULER_cancel (entry->expire_task);
+ entry->expire_task = GNUNET_SCHEDULER_NO_TASK;
+ }
GNUNET_CONTAINER_DLL_remove (lru_cache_head, lru_cache_tail, entry);
lru_cache_size--;
+ entry->in_lru = GNUNET_NO;
}
switch (cgh->type)
{
case CGT_TRANSPORT_HANDLE:
- handle = entry->transport_handle_;
+ handle = entry->transport_handle;
if (NULL != handle)
- LOG_DEBUG ("Found TRANSPORT handle in cache for peer %u\n", entry->peer_id);
+ LOG_DEBUG ("Found TRANSPORT handle in cache for peer %u\n",
+ entry->peer_id);
break;
case CGT_CORE_HANDLE:
handle = entry->core_handle;
if (NULL != handle)
{
if (GNUNET_SCHEDULER_NO_TASK == entry->notify_task)
- entry->notify_task = GNUNET_SCHEDULER_add_now (&call_cgh_cb, entry);
+ {
+ if (NULL != search_suitable_cgh (entry, entry->cgh_qhead))
+ entry->notify_task = GNUNET_SCHEDULER_add_now (&call_cgh_cb, entry);
+ }
return cgh;
}
+ op = NULL;
switch (cgh->type)
{
case CGT_TRANSPORT_HANDLE:
- if (NULL != entry->transport_op_)
+ if (NULL != entry->transport_op)
return cgh;
op = GNUNET_TESTBED_operation_create_ (entry, &opstart_get_handle_transport,
&oprelease_get_handle_transport);
- entry->transport_op_ = op;
+ entry->transport_op = op;
break;
case CGT_CORE_HANDLE:
if (NULL != entry->core_op)
&oprelease_get_handle_core);
entry->core_op = op;
break;
+ default:
+ GNUNET_assert (0);
}
GNUNET_TESTBED_operation_queue_insert_ (GST_opq_openfds, op);
GNUNET_TESTBED_operation_begin_wait_ (op);
* GNUNET_NO if not.
*/
static int
-cache_clear_iterator (void *cls,
- const struct GNUNET_HashCode * key,
- void *value)
+cache_clear_iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
{
struct CacheEntry *entry = value;
static unsigned int ncleared;
GNUNET_break (0 == entry->demand);
LOG_DEBUG ("Clearing entry %u of %u\n", ++ncleared, cache_size);
GNUNET_CONTAINER_multihashmap_remove (cache, key, value);
- if (0 == entry->demand)
- close_handles (entry);
+ close_handles (entry);
GNUNET_free_non_null (entry->hello);
- GNUNET_break (NULL == entry->transport_handle_);
- GNUNET_break (NULL == entry->transport_op_);
- GNUNET_break (NULL == entry->core_handle);
- GNUNET_break (NULL == entry->core_op);
- GNUNET_break (NULL == entry->cfg);
+ GNUNET_break (GNUNET_SCHEDULER_NO_TASK == entry->expire_task);
+ GNUNET_assert (NULL == entry->transport_handle);
+ GNUNET_assert (NULL == entry->transport_op);
+ GNUNET_assert (NULL == entry->core_handle);
+ GNUNET_assert (NULL == entry->core_op);
+ GNUNET_assert (NULL == entry->cfg);
GNUNET_assert (NULL == entry->cgh_qhead);
GNUNET_assert (NULL == entry->cgh_qtail);
GNUNET_assert (NULL == entry->nctxt_qhead);
GNUNET_CONTAINER_multihashmap_iterate (cache, &cache_clear_iterator, NULL);
GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (cache));
GNUNET_CONTAINER_multihashmap_destroy (cache);
+ cache = NULL;
+ lru_cache_size = 0;
+ lru_cache_threshold_size = 0;
+ cache_size = 0;
+ lru_cache_head = NULL;
+ lru_cache_tail = NULL;
}
{
GNUNET_assert (cgh == cgh->nctxt->cgh);
if (GNUNET_YES == cgh->notify_called)
- GNUNET_CONTAINER_DLL_remove (entry->nctxt_qhead, entry->nctxt_qtail, cgh->nctxt);
+ GNUNET_CONTAINER_DLL_remove (entry->nctxt_qhead, entry->nctxt_qtail,
+ cgh->nctxt);
GNUNET_free (cgh->nctxt);
}
- GNUNET_free (cgh);
+ GNUNET_free (cgh);
if (0 == entry->demand)
{
+ entry->expire_task =
+ GNUNET_SCHEDULER_add_delayed (CACHE_EXPIRY, &expire_cache_entry, entry);
GNUNET_CONTAINER_DLL_insert_tail (lru_cache_head, lru_cache_tail, entry);
lru_cache_size++;
+ entry->in_lru = GNUNET_YES;
if (lru_cache_size > lru_cache_threshold_size)
close_handles (lru_cache_head);
}
* connected. This callback will only be called once or never again (in
* case the target peer cannot be connected). Can be NULL
* @param connect_notify_cb_cls the closure for the above callback
- * @return the handle which can be used cancel or mark that the handle is no
+ * @return the handle which can be used to cancel or mark that the handle is no
* longer being used
*/
struct GSTCacheGetHandle *
GST_cache_get_handle_transport (unsigned int peer_id,
const struct GNUNET_CONFIGURATION_Handle *cfg,
- GST_cache_handle_ready_cb cb,
- void *cb_cls,
+ GST_cache_handle_ready_cb cb, void *cb_cls,
const struct GNUNET_PeerIdentity *target,
GST_cache_peer_connect_notify connect_notify_cb,
void *connect_notify_cb_cls)
cgh->cb = cb;
cgh->cb_cls = cb_cls;
cgh->type = CGT_TRANSPORT_HANDLE;
- return cache_get_handle (peer_id, cgh, cfg,
- target, connect_notify_cb, connect_notify_cb_cls);
+ return cache_get_handle (peer_id, cgh, cfg, target, connect_notify_cb,
+ connect_notify_cb_cls);
}
* connected. This callback will only be called once or never again (in
* case the target peer cannot be connected). Can be NULL
* @param connect_notify_cb_cls the closure for the above callback
- * @return the handle which can be used cancel or mark that the handle is no
+ * @return the handle which can be used to cancel or mark that the handle is no
* longer being used
*/
struct GSTCacheGetHandle *
GST_cache_get_handle_core (unsigned int peer_id,
const struct GNUNET_CONFIGURATION_Handle *cfg,
- GST_cache_handle_ready_cb cb,
- void *cb_cls,
+ GST_cache_handle_ready_cb cb, void *cb_cls,
const struct GNUNET_PeerIdentity *target,
GST_cache_peer_connect_notify connect_notify_cb,
void *connect_notify_cb_cls)
cgh->cb = cb;
cgh->cb_cls = cb_cls;
cgh->type = CGT_CORE_HANDLE;
- return cache_get_handle (peer_id, cgh, cfg,
- target, connect_notify_cb, connect_notify_cb_cls);
+ return cache_get_handle (peer_id, cgh, cfg, target, connect_notify_cb,
+ connect_notify_cb_cls);
}
{
struct CacheEntry *entry;
struct GNUNET_HashCode key;
-
+
LOG_DEBUG ("Looking up HELLO for peer %u\n", peer_id);
GNUNET_CRYPTO_hash (&peer_id, sizeof (peer_id), &key);
entry = cache_lookup (&key);
* Caches the HELLO of the given peer. Updates the HELLO if it was already
* cached before
*
- * @param id the peer identity of the peer whose HELLO has to be cached
+ * @param peer_id the peer identity of the peer whose HELLO has to be cached
* @param hello the HELLO message
*/
void