-ensure stats queues do not grow too big
[oweals/gnunet.git] / src / testbed / gnunet-service-testbed_connectionpool.c
index 158c9ec3a21c8705df954d041d0c29b12f11c37d..0fa2a64567500eea83f96cfcc62f9d4f555bc9dc 100644 (file)
@@ -1,6 +1,6 @@
 /*
   This file is part of GNUnet.
-  (C) 2008--2013 Christian Grothoff (and other contributing authors)
+  Copyright (C) 2008--2015 GNUnet e.V.
 
   GNUnet is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
   You should have received a copy of the GNU General Public License
   along with GNUnet; see the file COPYING.  If not, write to the
-  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-  Boston, MA 02111-1307, USA.
+  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+  Boston, MA 02110-1301, USA.
 */
 
 /**
@@ -78,6 +78,11 @@ struct PooledConnection
    */
   struct GNUNET_CORE_Handle *handle_core;
 
+  /**
+   * The ATS handle to the peer correspondign to this entry; can be NULL.
+   */
+  struct GNUNET_ATS_ConnectivityHandle *handle_ats_connectivity;
+
   /**
    * The operation handle for transport handle
    */
@@ -88,6 +93,11 @@ struct PooledConnection
    */
   struct GNUNET_TESTBED_Operation *op_core;
 
+  /**
+   * The operation handle for ATS handle
+   */
+  struct GNUNET_TESTBED_Operation *op_ats_connectivity;
+
   /**
    * The peer identity of this peer. Will be set upon opening a connection to
    * the peers CORE service. Will be NULL until then and after the CORE
@@ -126,12 +136,12 @@ struct PooledConnection
   /**
    * The task to expire this connection from the connection pool
    */
-  GNUNET_SCHEDULER_TaskIdentifier expire_task;
+  struct GNUNET_SCHEDULER_Task * expire_task;
 
   /**
    * The task to notify a waiting #GST_ConnectionPool_GetHandle object
    */
-  GNUNET_SCHEDULER_TaskIdentifier notify_task;
+  struct GNUNET_SCHEDULER_Task * notify_task;
 
   /**
    * Number of active requests using this pooled connection
@@ -286,6 +296,11 @@ destroy_pooled_connection (struct PooledConnection *entry)
                    GNUNET_CONTAINER_multihashmap32_remove (map,
                                                            entry->index,
                                                            entry));
+  if (NULL != entry->notify_task)
+  {
+    GNUNET_SCHEDULER_cancel (entry->notify_task);
+    entry->notify_task = NULL;
+  }
   LOG_DEBUG ("Cleaning up handles of a pooled connection\n");
   if (NULL != entry->handle_transport)
     GNUNET_assert (NULL != entry->op_transport);
@@ -294,12 +309,20 @@ destroy_pooled_connection (struct PooledConnection *entry)
     GNUNET_TESTBED_operation_done (entry->op_transport);
     entry->op_transport = NULL;
   }
+  if (NULL != entry->handle_ats_connectivity)
+    GNUNET_assert (NULL != entry->op_ats_connectivity);
+  if (NULL != entry->op_ats_connectivity)
+  {
+    GNUNET_TESTBED_operation_done (entry->op_ats_connectivity);
+    entry->op_ats_connectivity = NULL;
+  }
   if (NULL != entry->op_core)
   {
     GNUNET_TESTBED_operation_done (entry->op_core);
     entry->op_core = NULL;
   }
   GNUNET_assert (NULL == entry->handle_core);
+  GNUNET_assert (NULL == entry->handle_ats_connectivity);
   GNUNET_assert (NULL == entry->handle_transport);
   GNUNET_CONFIGURATION_destroy (entry->cfg);
   GNUNET_free (entry);
@@ -310,14 +333,13 @@ destroy_pooled_connection (struct PooledConnection *entry)
  * Expire a #PooledConnection object
  *
  * @param cls the #PooledConnection object
- * @param tc scheduler task context
  */
 static void
-expire (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+expire (void *cls)
 {
   struct PooledConnection *entry = cls;
 
-  entry->expire_task = GNUNET_SCHEDULER_NO_TASK;
+  entry->expire_task = NULL;
   destroy_pooled_connection (entry);
 }
 
@@ -330,10 +352,10 @@ expire (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 static void
 expire_task_cancel (struct PooledConnection *entry)
 {
-  if (GNUNET_SCHEDULER_NO_TASK != entry->expire_task)
+  if (NULL != entry->expire_task)
   {
     GNUNET_SCHEDULER_cancel (entry->expire_task);
-    entry->expire_task = GNUNET_SCHEDULER_NO_TASK;
+    entry->expire_task = NULL;
   }
 }
 
@@ -350,7 +372,7 @@ add_to_lru (struct PooledConnection *entry)
   GNUNET_assert (!entry->in_lru);
   GNUNET_CONTAINER_DLL_insert_tail (head_lru, tail_lru, entry);
   entry->in_lru = GNUNET_YES;
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == entry->expire_task);
+  GNUNET_assert (NULL == entry->expire_task);
   entry->expire_task = GNUNET_SCHEDULER_add_delayed (CACHE_EXPIRY,
                                                      &expire, entry);
 }
@@ -387,6 +409,10 @@ search_waiting (const struct PooledConnection *entry,
       if (NULL == entry->handle_transport)
         continue;
       break;
+    case GST_CONNECTIONPOOL_SERVICE_ATS_CONNECTIVITY:
+      if (NULL == entry->handle_ats_connectivity)
+        continue;
+      break;
     }
     break;
   }
@@ -401,41 +427,49 @@ search_waiting (const struct PooledConnection *entry,
  * further schedules itself if there are similar waiting objects which can be notified.
  *
  * @param cls the #PooledConnection object
- * @param tc the task context from scheduler
  */
 static void
-connection_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+connection_ready (void *cls)
 {
   struct PooledConnection *entry = cls;
   struct GST_ConnectionPool_GetHandle *gh;
   struct GST_ConnectionPool_GetHandle *gh_next;
 
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != entry->notify_task);
-  entry->notify_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (NULL != entry->notify_task);
+  entry->notify_task = NULL;
   gh = search_waiting (entry, entry->head_waiting);
   GNUNET_assert (NULL != gh);
   gh_next = NULL;
   if (NULL != gh->next)
     gh_next = search_waiting (entry, gh->next);
-  GNUNET_CONTAINER_DLL_remove (entry->head_waiting, entry->tail_waiting, gh);
+  GNUNET_CONTAINER_DLL_remove (entry->head_waiting,
+                               entry->tail_waiting,
+                               gh);
   gh->connection_ready_called = 1;
   if (NULL != gh_next)
-    entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready, entry);
-  if ( (NULL != gh->target) && (NULL != gh->connect_notify_cb) )
+    entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready,
+                                                   entry);
+  if ( (NULL != gh->target) &&
+       (NULL != gh->connect_notify_cb) )
   {
-    GNUNET_CONTAINER_DLL_insert_tail (entry->head_notify, entry->tail_notify,
+    GNUNET_CONTAINER_DLL_insert_tail (entry->head_notify,
+                                      entry->tail_notify,
                                       gh);
     gh->notify_waiting = 1;
   }
-  LOG_DEBUG ("Connection ready for handle type %u\n", gh->service);
-  gh->cb (gh->cb_cls, entry->handle_core, entry->handle_transport,
+  LOG_DEBUG ("Connection ready for handle type %u\n",
+             gh->service);
+  gh->cb (gh->cb_cls,
+          entry->handle_core,
+          entry->handle_transport,
+          entry->handle_ats_connectivity,
           entry->peer_identity);
 }
 
 
 /**
  * Function called from peer connect notify callbacks from CORE and TRANSPORT
- * connections. This function calls the pendning peer connect notify callbacks
+ * connections. This function calls the pending peer connect notify callbacks
  * which are queued in an entry.
  *
  * @param cls the #PooledConnection object
@@ -443,7 +477,8 @@ connection_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
  * @param service the service where this notification has originated
  */
 static void
-peer_connect_notify_cb (void *cls, const struct GNUNET_PeerIdentity *peer,
+peer_connect_notify_cb (void *cls,
+                        const struct GNUNET_PeerIdentity *peer,
                         const enum GST_ConnectionPool_Service service)
 {
   struct PooledConnection *entry = cls;
@@ -462,7 +497,9 @@ peer_connect_notify_cb (void *cls, const struct GNUNET_PeerIdentity *peer,
       gh = gh->next;
       continue;
     }
-    if (0 != memcmp (gh->target, peer, sizeof (struct GNUNET_PeerIdentity)))
+    if (0 != memcmp (gh->target,
+                     peer,
+                     sizeof (struct GNUNET_PeerIdentity)))
     {
       gh = gh->next;
       continue;
@@ -472,7 +509,9 @@ peer_connect_notify_cb (void *cls, const struct GNUNET_PeerIdentity *peer,
     gh_next = gh->next;
     GNUNET_CONTAINER_DLL_remove (entry->head_notify, entry->tail_notify, gh);
     gh->notify_waiting = 0;
-    LOG_DEBUG ("Peer connected to peer %u at service %u\n", entry->index, gh->service);
+    LOG_DEBUG ("Peer connected to peer %u at service %u\n",
+               entry->index,
+               gh->service);
     gh = gh_next;
     cb (cb_cls, peer);
   }
@@ -492,7 +531,9 @@ transport_peer_connect_notify_cb (void *cls,
 {
   struct PooledConnection *entry = cls;
 
-  peer_connect_notify_cb (entry, peer, GST_CONNECTIONPOOL_SERVICE_TRANSPORT);
+  peer_connect_notify_cb (entry,
+                          peer,
+                          GST_CONNECTIONPOOL_SERVICE_TRANSPORT);
 }
 
 
@@ -519,7 +560,7 @@ opstart_get_handle_transport (void *cls)
   }
   if (0 == entry->demand)
     return;
-  if (GNUNET_SCHEDULER_NO_TASK != entry->notify_task)
+  if (NULL != entry->notify_task)
     return;
   if (NULL != search_waiting (entry, entry->head_waiting))
   {
@@ -563,12 +604,12 @@ core_peer_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
 
 
 /**
- * Function called after GNUNET_CORE_connect has succeeded (or failed
+ * Function called after #GNUNET_CORE_connect() has succeeded (or failed
  * for good).  Note that the private key of the peer is intentionally
  * not exposed here; if you need it, your process should try to read
  * the private key file directly (which should work if you are
  * authorized...).  Implementations of this function must not call
- * GNUNET_CORE_disconnect (other than by scheduling a new task to
+ * #GNUNET_CORE_disconnect() (other than by scheduling a new task to
  * do this later).
  *
  * @param cls the #PooledConnection object
@@ -592,7 +633,7 @@ core_startup_cb (void *cls,
           sizeof (struct GNUNET_PeerIdentity));
   if (0 == entry->demand)
     return;
-  if (GNUNET_SCHEDULER_NO_TASK != entry->notify_task)
+  if (NULL != entry->notify_task)
     return;
   if (NULL != search_waiting (entry, entry->head_waiting))
   {
@@ -632,7 +673,7 @@ opstart_get_handle_core (void *cls)
 
 
 /**
- * Function called when the operation responsible for opening a TRANSPORT
+ * Function called when the operation responsible for opening a CORE
  * connection is marked as done.
  *
  * @param cls the #PooledConnection object
@@ -651,6 +692,40 @@ oprelease_get_handle_core (void *cls)
 }
 
 
+/**
+ * Function called when resources for opening a connection to ATS are
+ * available.
+ *
+ * @param cls the #PooledConnection object
+ */
+static void
+opstart_get_handle_ats_connectivity (void *cls)
+{
+  struct PooledConnection *entry = cls;
+
+  entry->handle_ats_connectivity =
+    GNUNET_ATS_connectivity_init (entry->cfg);
+}
+
+
+/**
+ * Function called when the operation responsible for opening a ATS
+ * connection is marked as done.
+ *
+ * @param cls the #PooledConnection object
+ */
+static void
+oprelease_get_handle_ats_connectivity (void *cls)
+{
+  struct PooledConnection *entry = cls;
+
+  if (NULL == entry->handle_ats_connectivity)
+    return;
+  GNUNET_ATS_connectivity_done (entry->handle_ats_connectivity);
+  entry->handle_ats_connectivity = NULL;
+}
+
+
 /**
  * This function will be called for every #PooledConnection object in @p map
  *
@@ -729,7 +804,7 @@ GST_connection_pool_destroy ()
  *
  * @note @a connect_notify_cb will not be called if @a target is
  * already connected @a service level. Use
- * GNUNET_TRANSPORT_check_neighbour_connected() or a similar function from the
+ * GNUNET_TRANSPORT_check_peer_connected() or a similar function from the
  * respective @a service's API to check if the target peer is already connected or
  * not. @a connect_notify_cb will be called only once or never (in case @a target
  * cannot be connected or is already connected).
@@ -793,6 +868,12 @@ GST_connection_pool_get_handle (unsigned int peer_id,
         LOG_DEBUG ("Found CORE handle for peer %u\n",
                    entry->index);
       break;
+    case GST_CONNECTIONPOOL_SERVICE_ATS_CONNECTIVITY:
+      handle = entry->handle_ats_connectivity;
+      if (NULL != handle)
+        LOG_DEBUG ("Found ATS CONNECTIVITY handle for peer %u\n",
+                   entry->index);
+      break;
     }
   }
   else
@@ -811,7 +892,9 @@ GST_connection_pool_get_handle (unsigned int peer_id,
     }
     else
     {
-      GNUNET_CONTAINER_DLL_insert_tail (head_not_pooled, tail_not_pooled, entry);
+      GNUNET_CONTAINER_DLL_insert_tail (head_not_pooled,
+                                        tail_not_pooled,
+                                        entry);
     }
     entry->cfg = GNUNET_CONFIGURATION_dup (cfg);
   }
@@ -824,13 +907,16 @@ GST_connection_pool_get_handle (unsigned int peer_id,
   gh->connect_notify_cb = connect_notify_cb;
   gh->connect_notify_cb_cls = connect_notify_cb_cls;
   gh->service = service;
-  GNUNET_CONTAINER_DLL_insert (entry->head_waiting, entry->tail_waiting, gh);
+  GNUNET_CONTAINER_DLL_insert (entry->head_waiting,
+                               entry->tail_waiting,
+                               gh);
   if (NULL != handle)
   {
-    if (GNUNET_SCHEDULER_NO_TASK == entry->notify_task)
+    if (NULL == entry->notify_task)
     {
       if (NULL != search_waiting (entry, entry->head_waiting))
-        entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready, entry);
+        entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready,
+                                                       entry);
     }
     return gh;
   }
@@ -840,19 +926,30 @@ GST_connection_pool_get_handle (unsigned int peer_id,
   case GST_CONNECTIONPOOL_SERVICE_TRANSPORT:
     if (NULL != entry->op_transport)
       return gh;                /* Operation pending */
-    op = GNUNET_TESTBED_operation_create_ (entry, &opstart_get_handle_transport,
+    op = GNUNET_TESTBED_operation_create_ (entry,
+                                           &opstart_get_handle_transport,
                                            &oprelease_get_handle_transport);
     entry->op_transport = op;
     break;
   case GST_CONNECTIONPOOL_SERVICE_CORE:
     if (NULL != entry->op_core)
       return gh;                /* Operation pending */
-    op = GNUNET_TESTBED_operation_create_ (entry, &opstart_get_handle_core,
+    op = GNUNET_TESTBED_operation_create_ (entry,
+                                           &opstart_get_handle_core,
                                            &oprelease_get_handle_core);
     entry->op_core = op;
     break;
+  case GST_CONNECTIONPOOL_SERVICE_ATS_CONNECTIVITY:
+    if (NULL != entry->op_ats_connectivity)
+      return gh;                /* Operation pending */
+    op = GNUNET_TESTBED_operation_create_ (entry,
+                                           &opstart_get_handle_ats_connectivity,
+                                           &oprelease_get_handle_ats_connectivity);
+    entry->op_ats_connectivity = op;
+    break;
   }
-  GNUNET_TESTBED_operation_queue_insert_ (GST_opq_openfds, op);
+  GNUNET_TESTBED_operation_queue_insert_ (GST_opq_openfds,
+                                          op);
   GNUNET_TESTBED_operation_begin_wait_ (op);
   return gh;
 }
@@ -875,21 +972,39 @@ GST_connection_pool_get_handle_done (struct GST_ConnectionPool_GetHandle *gh)
 {
   struct PooledConnection *entry;
 
+  if (NULL == gh)
+    return;
   entry = gh->entry;
   LOG_DEBUG ("Cleaning up get handle %p for service %u, peer %u\n",
              gh,
              gh->service, entry->index);
-  if (!gh->connection_ready_called)
-    GNUNET_CONTAINER_DLL_remove (entry->head_waiting, entry->tail_waiting, gh);
+  if (! gh->connection_ready_called)
+  {
+    GNUNET_CONTAINER_DLL_remove (entry->head_waiting,
+                                 entry->tail_waiting,
+                                 gh);
+    if ( (NULL == search_waiting (entry, entry->head_waiting)) &&
+         (NULL != entry->notify_task) )
+    {
+      GNUNET_SCHEDULER_cancel (entry->notify_task);
+      entry->notify_task = NULL;
+    }
+  }
   if (gh->notify_waiting)
-  { 
-    GNUNET_CONTAINER_DLL_remove (entry->head_notify, entry->tail_notify, gh);
+  {
+    GNUNET_CONTAINER_DLL_remove (entry->head_notify,
+                                 entry->tail_notify,
+                                 gh);
     gh->notify_waiting = 0;
   }
   GNUNET_free (gh);
   gh = NULL;
-  GNUNET_assert (!entry->in_lru);
-  if ( (!entry->in_pool) && (NULL != map) )
+  GNUNET_assert (! entry->in_lru);
+  if (! entry->in_pool)
+    GNUNET_CONTAINER_DLL_remove (head_not_pooled,
+                                 tail_not_pooled,
+                                 entry);
+  if (NULL != map)
   {
     if (GNUNET_YES == GNUNET_CONTAINER_multihashmap32_contains (map,
                                                                 entry->index))
@@ -900,7 +1015,6 @@ GST_connection_pool_get_handle_done (struct GST_ConnectionPool_GetHandle *gh)
         goto unallocate;
       destroy_pooled_connection (head_lru);
     }
-    GNUNET_CONTAINER_DLL_remove (head_not_pooled, tail_not_pooled, entry);
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CONTAINER_multihashmap32_put (map,
                                                         entry->index,
@@ -908,6 +1022,7 @@ GST_connection_pool_get_handle_done (struct GST_ConnectionPool_GetHandle *gh)
                                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
     entry->in_pool = GNUNET_YES;
   }
+
  unallocate:
   GNUNET_assert (0 < entry->demand);
   entry->demand--;