/*
This file is part of GNUnet.
- (C) 2008--2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2008--2015 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
* @file testbed/gnunet-service-testbed_connectionpool.c
* @brief connection pooling for connections to peers' services
- * @author Sree Harsha Totakura <sreeharsha@totakura.in>
+ * @author Sree Harsha Totakura <sreeharsha@totakura.in>
*/
#include "gnunet-service-testbed.h"
*/
struct GNUNET_CORE_Handle *handle_core;
+ /**
+ * The ATS handle to the peer correspondign to this entry; can be NULL.
+ */
+ struct GNUNET_ATS_ConnectivityHandle *handle_ats_connectivity;
+
/**
* The operation handle for transport handle
*/
*/
struct GNUNET_TESTBED_Operation *op_core;
+ /**
+ * The operation handle for ATS handle
+ */
+ struct GNUNET_TESTBED_Operation *op_ats_connectivity;
+
/**
* The peer identity of this peer. Will be set upon opening a connection to
* the peers CORE service. Will be NULL until then and after the CORE
/**
* The task to expire this connection from the connection pool
*/
- GNUNET_SCHEDULER_TaskIdentifier expire_task;
+ struct GNUNET_SCHEDULER_Task * expire_task;
/**
* The task to notify a waiting #GST_ConnectionPool_GetHandle object
*/
- GNUNET_SCHEDULER_TaskIdentifier notify_task;
+ struct GNUNET_SCHEDULER_Task * notify_task;
/**
* Number of active requests using this pooled connection
* Did we call the pool_connection_ready_cb already?
*/
int connection_ready_called;
+
+ /**
+ * Are we waiting for any peer connect notifications?
+ */
+ int notify_waiting;
};
static unsigned int max_size;
+/**
+ * Cancel the expiration task of the give #PooledConnection object
+ *
+ * @param entry the #PooledConnection object
+ */
+static void
+expire_task_cancel (struct PooledConnection *entry);
+
+
/**
* Destroy a #PooledConnection object
*
GNUNET_assert ((NULL == entry->head_waiting) && (NULL ==
entry->tail_waiting));
GNUNET_assert (0 == entry->demand);
- GNUNET_free_non_null (entry->peer_identity);
+ expire_task_cancel (entry);
if (entry->in_lru)
GNUNET_CONTAINER_DLL_remove (head_lru, tail_lru, entry);
if (entry->in_pool)
- GNUNET_assert (GNUNET_OK ==
+ GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap32_remove (map,
entry->index,
entry));
+ if (NULL != entry->notify_task)
+ {
+ GNUNET_SCHEDULER_cancel (entry->notify_task);
+ entry->notify_task = NULL;
+ }
LOG_DEBUG ("Cleaning up handles of a pooled connection\n");
if (NULL != entry->handle_transport)
GNUNET_assert (NULL != entry->op_transport);
GNUNET_TESTBED_operation_done (entry->op_transport);
entry->op_transport = NULL;
}
+ if (NULL != entry->handle_ats_connectivity)
+ GNUNET_assert (NULL != entry->op_ats_connectivity);
+ if (NULL != entry->op_ats_connectivity)
+ {
+ GNUNET_TESTBED_operation_done (entry->op_ats_connectivity);
+ entry->op_ats_connectivity = NULL;
+ }
if (NULL != entry->op_core)
{
GNUNET_TESTBED_operation_done (entry->op_core);
entry->op_core = NULL;
}
GNUNET_assert (NULL == entry->handle_core);
+ GNUNET_assert (NULL == entry->handle_ats_connectivity);
GNUNET_assert (NULL == entry->handle_transport);
GNUNET_CONFIGURATION_destroy (entry->cfg);
GNUNET_free (entry);
* Expire a #PooledConnection object
*
* @param cls the #PooledConnection object
- * @param tc scheduler task context
*/
static void
-expire (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+expire (void *cls)
{
struct PooledConnection *entry = cls;
- entry->expire_task = GNUNET_SCHEDULER_NO_TASK;
+ entry->expire_task = NULL;
destroy_pooled_connection (entry);
}
+/**
+ * Cancel the expiration task of the give #PooledConnection object
+ *
+ * @param entry the #PooledConnection object
+ */
static void
expire_task_cancel (struct PooledConnection *entry)
{
- if (GNUNET_SCHEDULER_NO_TASK != entry->expire_task)
+ if (NULL != entry->expire_task)
{
GNUNET_SCHEDULER_cancel (entry->expire_task);
- entry->expire_task = GNUNET_SCHEDULER_NO_TASK;
+ entry->expire_task = NULL;
}
}
GNUNET_assert (!entry->in_lru);
GNUNET_CONTAINER_DLL_insert_tail (head_lru, tail_lru, entry);
entry->in_lru = GNUNET_YES;
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == entry->expire_task);
+ GNUNET_assert (NULL == entry->expire_task);
entry->expire_task = GNUNET_SCHEDULER_add_delayed (CACHE_EXPIRY,
&expire, entry);
}
if (NULL == entry->handle_transport)
continue;
break;
+ case GST_CONNECTIONPOOL_SERVICE_ATS_CONNECTIVITY:
+ if (NULL == entry->handle_ats_connectivity)
+ continue;
+ break;
}
break;
}
* further schedules itself if there are similar waiting objects which can be notified.
*
* @param cls the #PooledConnection object
- * @param tc the task context from scheduler
*/
static void
-connection_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+connection_ready (void *cls)
{
struct PooledConnection *entry = cls;
struct GST_ConnectionPool_GetHandle *gh;
struct GST_ConnectionPool_GetHandle *gh_next;
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != entry->notify_task);
- entry->notify_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (NULL != entry->notify_task);
+ entry->notify_task = NULL;
gh = search_waiting (entry, entry->head_waiting);
GNUNET_assert (NULL != gh);
gh_next = NULL;
if (NULL != gh->next)
gh_next = search_waiting (entry, gh->next);
- GNUNET_CONTAINER_DLL_remove (entry->head_waiting, entry->tail_waiting, gh);
- gh->connection_ready_called = GNUNET_YES;
+ GNUNET_CONTAINER_DLL_remove (entry->head_waiting,
+ entry->tail_waiting,
+ gh);
+ gh->connection_ready_called = 1;
if (NULL != gh_next)
- entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready, entry);
- if ( (NULL != gh->target) && (NULL != gh->connect_notify_cb) )
- GNUNET_CONTAINER_DLL_insert_tail (entry->head_notify, entry->tail_notify, gh);
- LOG_DEBUG ("Calling notify for handle type %u\n", gh->service);
- gh->cb (gh->cb_cls, entry->handle_core, entry->handle_transport,
+ entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready,
+ entry);
+ if ( (NULL != gh->target) &&
+ (NULL != gh->connect_notify_cb) )
+ {
+ GNUNET_CONTAINER_DLL_insert_tail (entry->head_notify,
+ entry->tail_notify,
+ gh);
+ gh->notify_waiting = 1;
+ }
+ LOG_DEBUG ("Connection ready for handle type %u\n",
+ gh->service);
+ gh->cb (gh->cb_cls,
+ entry->handle_core,
+ entry->handle_transport,
+ entry->handle_ats_connectivity,
entry->peer_identity);
}
/**
* Function called from peer connect notify callbacks from CORE and TRANSPORT
- * connections. This function calls the pendning peer connect notify callbacks
+ * connections. This function calls the pending peer connect notify callbacks
* which are queued in an entry.
*
* @param cls the #PooledConnection object
* @param service the service where this notification has originated
*/
static void
-peer_connect_notify_cb (void *cls, const struct GNUNET_PeerIdentity *peer,
+peer_connect_notify_cb (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
const enum GST_ConnectionPool_Service service)
{
struct PooledConnection *entry = cls;
gh = gh->next;
continue;
}
- if (0 != memcmp (gh->target, peer, sizeof (struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (gh->target,
+ peer,
+ sizeof (struct GNUNET_PeerIdentity)))
{
gh = gh->next;
continue;
cb_cls = gh->connect_notify_cb_cls;
gh_next = gh->next;
GNUNET_CONTAINER_DLL_remove (entry->head_notify, entry->tail_notify, gh);
+ gh->notify_waiting = 0;
+ LOG_DEBUG ("Peer connected to peer %u at service %u\n",
+ entry->index,
+ gh->service);
gh = gh_next;
cb (cb_cls, peer);
}
const struct GNUNET_PeerIdentity *peer)
{
struct PooledConnection *entry = cls;
-
- peer_connect_notify_cb (entry, peer, GST_CONNECTIONPOOL_SERVICE_TRANSPORT);
+
+ peer_connect_notify_cb (entry,
+ peer,
+ GST_CONNECTIONPOOL_SERVICE_TRANSPORT);
}
}
if (0 == entry->demand)
return;
- if (GNUNET_SCHEDULER_NO_TASK != entry->notify_task)
+ if (NULL != entry->notify_task)
return;
if (NULL != search_waiting (entry, entry->head_waiting))
{
core_peer_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
{
struct PooledConnection *entry = cls;
-
+
peer_connect_notify_cb (entry, peer, GST_CONNECTIONPOOL_SERVICE_CORE);
}
/**
- * Function called after GNUNET_CORE_connect has succeeded (or failed
+ * Function called after #GNUNET_CORE_connect() has succeeded (or failed
* for good). Note that the private key of the peer is intentionally
* not exposed here; if you need it, your process should try to read
* the private key file directly (which should work if you are
* authorized...). Implementations of this function must not call
- * GNUNET_CORE_disconnect (other than by scheduling a new task to
+ * #GNUNET_CORE_disconnect() (other than by scheduling a new task to
* do this later).
*
* @param cls the #PooledConnection object
* @param my_identity ID of this peer, NULL if we failed
*/
static void
-core_startup_cb (void *cls,
+core_startup_cb (void *cls,
const struct GNUNET_PeerIdentity *my_identity)
{
struct PooledConnection *entry = cls;
sizeof (struct GNUNET_PeerIdentity));
if (0 == entry->demand)
return;
- if (GNUNET_SCHEDULER_NO_TASK != entry->notify_task)
+ if (NULL != entry->notify_task)
return;
if (NULL != search_waiting (entry, entry->head_waiting))
{
/**
- * Function called when the operation responsible for opening a TRANSPORT
+ * Function called when the operation responsible for opening a CORE
* connection is marked as done.
*
* @param cls the #PooledConnection object
}
+/**
+ * Function called when resources for opening a connection to ATS are
+ * available.
+ *
+ * @param cls the #PooledConnection object
+ */
+static void
+opstart_get_handle_ats_connectivity (void *cls)
+{
+ struct PooledConnection *entry = cls;
+
+ entry->handle_ats_connectivity =
+ GNUNET_ATS_connectivity_init (entry->cfg);
+}
+
+
+/**
+ * Function called when the operation responsible for opening a ATS
+ * connection is marked as done.
+ *
+ * @param cls the #PooledConnection object
+ */
+static void
+oprelease_get_handle_ats_connectivity (void *cls)
+{
+ struct PooledConnection *entry = cls;
+
+ if (NULL == entry->handle_ats_connectivity)
+ return;
+ GNUNET_ATS_connectivity_done (entry->handle_ats_connectivity);
+ entry->handle_ats_connectivity = NULL;
+}
+
+
/**
* This function will be called for every #PooledConnection object in @p map
*
void *value)
{
struct PooledConnection *entry = value;
-
+
GNUNET_assert (NULL != entry);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap32_remove (map, key, entry));
- if (entry->in_lru)
- GNUNET_CONTAINER_DLL_remove (head_lru, tail_lru, entry);
destroy_pooled_connection (entry);
return GNUNET_YES;
}
GST_connection_pool_destroy ()
{
struct PooledConnection *entry;
-
+
if (NULL != map)
{
GNUNET_assert (GNUNET_SYSERR !=
GNUNET_CONTAINER_DLL_remove (head_lru, tail_lru, entry);
destroy_pooled_connection (entry);
}
+ GNUNET_assert (NULL == head_not_pooled);
}
*
* @note @a connect_notify_cb will not be called if @a target is
* already connected @a service level. Use
- * GNUNET_TRANSPORT_check_neighbour_connected() or a similar function from the
+ * GNUNET_TRANSPORT_check_peer_connected() or a similar function from the
* respective @a service's API to check if the target peer is already connected or
* not. @a connect_notify_cb will be called only once or never (in case @a target
* cannot be connected or is already connected).
uint32_t peer_id32;
peer_id32 = (uint32_t) peer_id;
+ handle = NULL;
entry = NULL;
if (NULL != map)
entry = GNUNET_CONTAINER_multihashmap32_get (map, peer_id32);
LOG_DEBUG ("Found CORE handle for peer %u\n",
entry->index);
break;
+ case GST_CONNECTIONPOOL_SERVICE_ATS_CONNECTIVITY:
+ handle = entry->handle_ats_connectivity;
+ if (NULL != handle)
+ LOG_DEBUG ("Found ATS CONNECTIVITY handle for peer %u\n",
+ entry->index);
+ break;
}
}
else
{
entry = GNUNET_new (struct PooledConnection);
entry->index = peer_id32;
- if ((NULL != map)
+ if ((NULL != map)
&& (GNUNET_CONTAINER_multihashmap32_size (map) < max_size))
{
GNUNET_assert (GNUNET_OK ==
}
else
{
- GNUNET_CONTAINER_DLL_insert_tail (head_not_pooled, tail_not_pooled, entry);
+ GNUNET_CONTAINER_DLL_insert_tail (head_not_pooled,
+ tail_not_pooled,
+ entry);
}
entry->cfg = GNUNET_CONFIGURATION_dup (cfg);
}
gh->connect_notify_cb = connect_notify_cb;
gh->connect_notify_cb_cls = connect_notify_cb_cls;
gh->service = service;
- GNUNET_CONTAINER_DLL_insert (entry->head_waiting, entry->tail_waiting, gh);
+ GNUNET_CONTAINER_DLL_insert (entry->head_waiting,
+ entry->tail_waiting,
+ gh);
if (NULL != handle)
{
- if (GNUNET_SCHEDULER_NO_TASK == entry->notify_task)
+ if (NULL == entry->notify_task)
{
if (NULL != search_waiting (entry, entry->head_waiting))
- entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready, entry);
+ entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready,
+ entry);
}
return gh;
}
case GST_CONNECTIONPOOL_SERVICE_TRANSPORT:
if (NULL != entry->op_transport)
return gh; /* Operation pending */
- op = GNUNET_TESTBED_operation_create_ (entry, &opstart_get_handle_transport,
+ op = GNUNET_TESTBED_operation_create_ (entry,
+ &opstart_get_handle_transport,
&oprelease_get_handle_transport);
entry->op_transport = op;
break;
case GST_CONNECTIONPOOL_SERVICE_CORE:
if (NULL != entry->op_core)
return gh; /* Operation pending */
- op = GNUNET_TESTBED_operation_create_ (entry, &opstart_get_handle_core,
+ op = GNUNET_TESTBED_operation_create_ (entry,
+ &opstart_get_handle_core,
&oprelease_get_handle_core);
entry->op_core = op;
break;
+ case GST_CONNECTIONPOOL_SERVICE_ATS_CONNECTIVITY:
+ if (NULL != entry->op_ats_connectivity)
+ return gh; /* Operation pending */
+ op = GNUNET_TESTBED_operation_create_ (entry,
+ &opstart_get_handle_ats_connectivity,
+ &oprelease_get_handle_ats_connectivity);
+ entry->op_ats_connectivity = op;
+ break;
}
- GNUNET_TESTBED_operation_queue_insert_ (GST_opq_openfds, op);
+ GNUNET_TESTBED_operation_queue_insert_ (GST_opq_openfds,
+ op);
GNUNET_TESTBED_operation_begin_wait_ (op);
return gh;
}
{
struct PooledConnection *entry;
+ if (NULL == gh)
+ return;
entry = gh->entry;
- if (!gh->connection_ready_called)
- GNUNET_CONTAINER_DLL_remove (entry->head_waiting, entry->tail_waiting, gh);
- else if ((NULL != gh->next) || (NULL != gh->prev))
- GNUNET_CONTAINER_DLL_remove (entry->head_notify, entry->head_notify, gh);
+ LOG_DEBUG ("Cleaning up get handle %p for service %u, peer %u\n",
+ gh,
+ gh->service, entry->index);
+ if (! gh->connection_ready_called)
+ {
+ GNUNET_CONTAINER_DLL_remove (entry->head_waiting,
+ entry->tail_waiting,
+ gh);
+ if ( (NULL == search_waiting (entry, entry->head_waiting)) &&
+ (NULL != entry->notify_task) )
+ {
+ GNUNET_SCHEDULER_cancel (entry->notify_task);
+ entry->notify_task = NULL;
+ }
+ }
+ if (gh->notify_waiting)
+ {
+ GNUNET_CONTAINER_DLL_remove (entry->head_notify,
+ entry->tail_notify,
+ gh);
+ gh->notify_waiting = 0;
+ }
GNUNET_free (gh);
gh = NULL;
- GNUNET_assert (!entry->in_lru);
- if ( (!entry->in_pool) && (NULL != map) )
+ GNUNET_assert (! entry->in_lru);
+ if (! entry->in_pool)
+ GNUNET_CONTAINER_DLL_remove (head_not_pooled,
+ tail_not_pooled,
+ entry);
+ if (NULL != map)
{
if (GNUNET_YES == GNUNET_CONTAINER_multihashmap32_contains (map,
entry->index))
goto unallocate;
- if ((GNUNET_CONTAINER_multihashmap32_size (map) == max_size)
- && (NULL == head_lru))
- goto unallocate;
- destroy_pooled_connection (head_lru);
- GNUNET_CONTAINER_DLL_remove (head_not_pooled, tail_not_pooled, entry);
+ if (GNUNET_CONTAINER_multihashmap32_size (map) == max_size)
+ {
+ if (NULL == head_lru)
+ goto unallocate;
+ destroy_pooled_connection (head_lru);
+ }
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap32_put (map,
- entry->index,
+ GNUNET_CONTAINER_multihashmap32_put (map,
+ entry->index,
entry,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
entry->in_pool = GNUNET_YES;
}
+
unallocate:
GNUNET_assert (0 < entry->demand);
entry->demand--;