this is a merged version of revision 13866 and my latestest changes without the old...
authorMatthias Wachs <wachs@net.in.tum.de>
Thu, 2 Dec 2010 13:03:09 +0000 (13:03 +0000)
committerMatthias Wachs <wachs@net.in.tum.de>
Thu, 2 Dec 2010 13:03:09 +0000 (13:03 +0000)
all changes from r13826 not made from me are included

src/transport/transport_api.c

index 643f8b0f44f3429d9ab22dd3f15fa3e613df37c5..4f9433c6ccab5cd188fae0cb2892c49830f67c1b 100644 (file)
  */
 #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
+/**
+ * How large to start with for the hashmap of neighbours.
+ */
+#define STARTING_NEIGHBOURS_SIZE 10
+
 
 /**
  * What stage are we in for transmission processing?
@@ -187,18 +192,33 @@ struct ControlMessage
 
 };
 
-
 /**
- * Entry in linked list of all of our current neighbours.
+ * Context for storing information about attempted next transmission.
  */
-struct NeighbourList
+struct TryTransmitContext
 {
 
   /**
-   * This is a linked list.
+   * Main transport handle.
    */
-  struct NeighbourList *next;
+  struct GNUNET_TRANSPORT_Handle *h;
+
+  /**
+   * Returned transmission handle.
+   */
+  struct GNUNET_TRANSPORT_TransmitHandle *ret;
 
+  /**
+   * Time to retry the send task.
+   */
+  struct GNUNET_TIME_Relative retry_time;
+};
+
+/**
+ * Entry in hash table of all of our current neighbours.
+ */
+struct NeighbourList
+{
   /**
    * Overall transport handle.
    */
@@ -235,6 +255,11 @@ struct NeighbourList
    */
   int is_connected;
 
+  /**
+   * Are we in the middle of disconnecting the peer already?
+   */
+  unsigned int in_disconnect;
+
 };
 
 
@@ -334,7 +359,7 @@ struct GNUNET_TRANSPORT_Handle
   /**
    * Linked list of the current neighbours of this peer.
    */
-  struct NeighbourList *neighbours;
+  struct GNUNET_CONTAINER_MultiHashMap *neighbours;
 
   /**
    * Peer identity as assumed by this process, or all zeros.
@@ -371,7 +396,6 @@ struct GNUNET_TRANSPORT_Handle
 };
 
 
-// FIXME: replace with hash map!
 /**
  * Get the neighbour list entry for the given peer
  *
@@ -383,13 +407,7 @@ static struct NeighbourList *
 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
                 const struct GNUNET_PeerIdentity *peer)
 {
-  struct NeighbourList *pos;
-
-  pos = h->neighbours;
-  while ((pos != NULL) &&
-         (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
-    pos = pos->next;
-  return pos;
+  return GNUNET_CONTAINER_multihashmap_get(h->neighbours, &peer->hashPubKey);
 }
 
 
@@ -417,6 +435,90 @@ quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 }
 
 
+/**
+ * Iterator over hash map entries, attempt to schedule
+ * a transmission to entries in the neighbour hashmap.
+ *
+ * @param cls closure a TryTransmitContext
+ * @param key current key code
+ * @param value value in the hash map, the neighbour entry to consider
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+static int
+try_schedule_transmission (void *cls,
+                           const GNUNET_HashCode * key,
+                           void *value)
+{
+  struct NeighbourList *n = value;
+  struct TryTransmitContext *try_transmit_ctx = cls;
+  struct GNUNET_TIME_Relative duration;
+  GNUNET_CONNECTION_TransmitReadyNotify notify;
+  struct GNUNET_TRANSPORT_TransmitHandle *th;
+  struct GNUNET_TIME_Absolute duration_abs;
+
+  if (n->transmit_stage != TS_QUEUED)
+    return GNUNET_YES; /* not eligible, keep iterating */
+  if (n->is_connected != GNUNET_YES)
+    return GNUNET_YES; /* keep iterating */
+
+  th = &n->transmit_handle;
+  GNUNET_break (n == th->neighbour);
+  /* check outgoing quota */
+  duration = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
+                                                 th->notify_size - sizeof (struct OutboundMessage));
+  duration_abs = GNUNET_TIME_relative_to_absolute (duration);
+  if (th->timeout.abs_value < duration_abs.abs_value)
+    {
+      /* signal timeout! */
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long.  Signaling timeout.\n",
+                  duration.rel_value,
+                  GNUNET_i2s (&n->id));
+#endif
+      if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+        {
+          GNUNET_SCHEDULER_cancel (th->notify_delay_task);
+         th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+        }
+      n->transmit_stage = TS_NEW;
+      if (NULL != (notify = th->notify))
+        {
+          th->notify = NULL;
+          GNUNET_assert (0 == notify (th->notify_cls, 0, NULL));
+        }
+      return GNUNET_YES; /* keep iterating */
+    }
+  if (duration.rel_value > 0)
+    {
+#if DEBUG_TRANSPORT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Need more bandwidth (%u b/s allowed, %u b needed), delaying delivery to `%4s' by %llu ms\n",
+                  (unsigned int) n->out_tracker.available_bytes_per_s__,
+                  (unsigned int) th->notify_size - sizeof (struct OutboundMessage),
+                  GNUNET_i2s (&n->id),
+                  (unsigned long long) duration.rel_value);
+#endif
+      try_transmit_ctx->retry_time = GNUNET_TIME_relative_min (try_transmit_ctx->retry_time,
+                                                               duration);
+      return GNUNET_YES; /* keep iterating */
+    }
+#if DEBUG_TRANSPORT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Have %u bytes of bandwidth available for transmission to `%4s' right now\n",
+              th->notify_size - sizeof (struct OutboundMessage),
+              GNUNET_i2s (&n->id));
+#endif
+
+  if ( (try_transmit_ctx->ret == NULL) ||
+       (try_transmit_ctx->ret->priority < th->priority) )
+    try_transmit_ctx->ret = th;
+  return GNUNET_YES;
+}
+
+
 /**
  * Figure out which transmission to a peer can be done right now.
  * If none can, schedule a task to call 'schedule_transmission'
@@ -430,88 +532,24 @@ quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 static struct GNUNET_TRANSPORT_TransmitHandle *
 schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *ret;
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-  struct NeighbourList *n;
-  struct NeighbourList *next;
-  struct GNUNET_TIME_Relative retry_time;
-  struct GNUNET_TIME_Relative duration;
-  GNUNET_CONNECTION_TransmitReadyNotify notify;
+  struct TryTransmitContext try_transmit_ctx;
 
   if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
     {
       GNUNET_SCHEDULER_cancel (h->quota_task);
       h->quota_task = GNUNET_SCHEDULER_NO_TASK;
     }
-  retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
-  ret = NULL;
-  next = h->neighbours;
-  while (NULL != (n = next))
-    {
-      next = n->next;
-      if (n->transmit_stage != TS_QUEUED)
-       continue; /* not eligible */
-      if (n->is_connected != GNUNET_YES)
-        continue;
-
-      th = &n->transmit_handle;
-      GNUNET_break (n == th->neighbour);
-      /* check outgoing quota */
-      duration = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
-                                                    th->notify_size - sizeof (struct OutboundMessage));
-      struct GNUNET_TIME_Absolute duration_abs = GNUNET_TIME_relative_to_absolute (duration);
-      if (th->timeout.abs_value < duration_abs.abs_value)
-       {
-         /* signal timeout! */
-#if DEBUG_TRANSPORT
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long.  Signaling timeout.\n",
-                     duration.rel_value,
-                     GNUNET_i2s (&n->id));
-#endif
-         if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-           {
-             GNUNET_SCHEDULER_cancel (th->notify_delay_task);
-             th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-           }   
-         n->transmit_stage = TS_NEW;
-         if (NULL != (notify = th->notify))
-           {
-             th->notify = NULL;
-             GNUNET_assert (0 == notify (th->notify_cls, 0, NULL));
-           }
-         continue;
-       }
-      if (duration.rel_value > 0)
-       {
-#if DEBUG_TRANSPORT
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Need more bandwidth (%u b/s allowed, %u b needed), delaying delivery to `%4s' by %llu ms\n",
-                     (unsigned int) n->out_tracker.available_bytes_per_s__,
-                     (unsigned int) th->notify_size - sizeof (struct OutboundMessage),
-                     GNUNET_i2s (&n->id),
-                     duration.rel_value);
-#endif
-         retry_time = GNUNET_TIME_relative_min (retry_time,
-                                                duration);
-         continue;
-       }
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Have %u bytes of bandwidth available for transmission to `%4s' right now\n",
-                 th->notify_size - sizeof (struct OutboundMessage),
-                 GNUNET_i2s (&n->id));
-#endif 
-
-      if ( (ret == NULL) ||
-          (ret->priority < th->priority) )
-       ret = th;
-    }
-  if (ret == NULL)
-    h->quota_task = GNUNET_SCHEDULER_add_delayed (retry_time,
+  try_transmit_ctx.h = h;
+  try_transmit_ctx.ret = NULL;
+  try_transmit_ctx.retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
+  GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, 
+                                       &try_schedule_transmission, 
+                                       &try_transmit_ctx);
+  if (try_transmit_ctx.ret == NULL)
+    h->quota_task = GNUNET_SCHEDULER_add_delayed (try_transmit_ctx.retry_time,
                                                  &quota_transmit_ready,
                                                  h);
-  return ret;
+  return try_transmit_ctx.ret;
 }
 
 
@@ -777,18 +815,39 @@ schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
 }
 
 
+/**
+ * FIXME: document
+ */
 struct SetQuotaContext
 {
+  /**
+   * FIXME: document
+   */
   struct GNUNET_TRANSPORT_Handle *handle;
 
+  /**
+   * FIXME: document
+   */
   struct GNUNET_PeerIdentity target;
 
+  /**
+   * FIXME: document
+   */
   GNUNET_SCHEDULER_Task cont;
 
+  /**
+   * Closure for 'cont'.
+   */
   void *cont_cls;
 
+  /**
+   * FIXME: document
+   */
   struct GNUNET_TIME_Absolute timeout;
 
+  /**
+   * FIXME: document
+   */
   struct GNUNET_BANDWIDTH_Value32NBO quota_in;
 };
 
@@ -809,9 +868,10 @@ send_set_quota (void *cls, size_t size, void *buf)
 
   if (buf == NULL)
     {
-      GNUNET_SCHEDULER_add_continuation (sqc->cont,
-                                         sqc->cont_cls,
-                                         GNUNET_SCHEDULER_REASON_TIMEOUT);
+      if (sqc->cont != NULL)
+        GNUNET_SCHEDULER_add_continuation (sqc->cont,
+                                           sqc->cont_cls,
+                                           GNUNET_SCHEDULER_REASON_TIMEOUT);
       GNUNET_free (sqc);
       return 0;
     }
@@ -1090,9 +1150,21 @@ static void
 neighbour_free (struct NeighbourList *n)
 {
   struct GNUNET_TRANSPORT_Handle *h;
-  struct NeighbourList *prev;
-  struct NeighbourList *pos;
 
+  /* Added so task gets canceled when a disconnect is received! */
+  /* Method 1
+  if (n->transmit_handle.notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel(n->transmit_handle.notify_delay_task);
+      n->transmit_handle.notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+      n->transmit_handle.notify = NULL;
+    }
+  */
+  /* NATE: if the above is not needed, then clearly this assertion
+     should hold (I've checked the code and I'm pretty sure this is
+     true. -CG 
+     FIXME: remove above comments once we've seen tests pass with the assert... */
+  GNUNET_assert (n->transmit_handle.notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
   GNUNET_assert (n->transmit_handle.notify == NULL);
   h = n->h;
 #if DEBUG_TRANSPORT
@@ -1103,17 +1175,10 @@ neighbour_free (struct NeighbourList *n)
   GNUNET_break (n->is_connected == GNUNET_NO);
   GNUNET_break (n->transmit_stage == TS_NEW);
 
-  prev = NULL;
-  pos = h->neighbours;
-  while (pos != n)
-    {
-      prev = pos;
-      pos = pos->next;
-    }
-  if (prev == NULL)
-    h->neighbours = n->next;
-  else
-    prev->next = n->next;
+  GNUNET_assert(GNUNET_YES == 
+               GNUNET_CONTAINER_multihashmap_remove(h->neighbours, 
+                                                    &n->id.hashPubKey, 
+                                                    n));
   GNUNET_free (n);
 }
 
@@ -1134,6 +1199,14 @@ neighbour_disconnect (struct NeighbourList *n)
 #endif
   GNUNET_break (n->is_connected == GNUNET_YES);
   n->is_connected = GNUNET_NO;
+  /* FIXME: this 'in_disconnect' flag is dubious; we should define 
+     clearly what disconnect means for pending 'notify_transmit_ready'
+     requests; maybe a good approach is to REQUIRE clients to 
+     call 'notify_transmit_ready_cancel' on pending requests on disconnect
+     and otherwise FAIL HARD with an assertion failure before 
+     'neighbour_free' right here (transmit_stage would be forced
+     to 'TS_NEW') */
+  n->in_disconnect = GNUNET_YES;
   if (h->nd_cb != NULL)
     h->nd_cb (h->cls, &n->id);
   if (n->transmit_stage == TS_NEW)    
@@ -1152,6 +1225,35 @@ static void demultiplexer (void *cls,
                           const struct GNUNET_MessageHeader *msg);
 
 
+/**
+ * Iterator over hash map entries, for getting rid of a neighbor
+ * upon a reconnect call.
+ *
+ * @param cls closure (NULL)
+ * @param key current key code
+ * @param value value in the hash map, the neighbour entry to forget
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+static int
+forget_neighbours (void *cls,
+                   const GNUNET_HashCode * key,
+                   void *value)
+{
+  struct NeighbourList *n = value;
+
+#if DEBUG_TRANSPORT_DISCONNECT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Disconnecting due to reconnect being called\n");
+#endif
+  if (n->is_connected)
+    neighbour_disconnect (n);
+
+  return GNUNET_YES;
+}
+
+
 /**
  * Try again to connect to transport service.
  *
@@ -1164,8 +1266,6 @@ reconnect (void *cls,
 {
   struct GNUNET_TRANSPORT_Handle *h = cls;
   struct ControlMessage *pos;
-  struct NeighbourList *n;
-  struct NeighbourList *next;
 
   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
   if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
@@ -1174,18 +1274,10 @@ reconnect (void *cls,
       return;
     }
   /* Forget about all neighbours that we used to be connected to */
-  n = h->neighbours;
-  while (NULL != n)
-    {
-#if DEBUG_TRANSPORT_DISCONNECT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Disconnecting due to reconnect being called\n");
-#endif
-      next = n->next;
-      if (n->is_connected)
-       neighbour_disconnect (n);
-      n = next;
-    }
+  GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, 
+                                       &forget_neighbours, 
+                                       NULL);
+
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Connecting to transport service.\n");
@@ -1310,6 +1402,7 @@ send_request_connect_message(struct GNUNET_TRANSPORT_Handle *h, struct Neighbour
                              GNUNET_TIME_UNIT_FOREVER_REL, &send_transport_request_connect, trcm);
 }
 
+
 /**
  * Add neighbour to our list
  *
@@ -1336,18 +1429,61 @@ neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
 #endif
   n = GNUNET_malloc (sizeof (struct NeighbourList));
   n->id = *pid;
+  n->h = h;
   GNUNET_BANDWIDTH_tracker_init (&n->out_tracker,
                                 GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
                                 MAX_BANDWIDTH_CARRY_S);
-  n->next = h->neighbours;
-  n->h = h;
-  h->neighbours = n;
-
+  GNUNET_CONTAINER_multihashmap_put (h->neighbours,
+                                     &pid->hashPubKey,
+                                     n,
+                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
 
   return n;
 }
 
 
+/**
+ * Iterator over hash map entries, for deleting state of a neighbor.
+ *
+ * @param cls closure (NULL)
+ * @param key current key code
+ * @param value value in the hash map, the neighbour entry to delete
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+static int
+delete_neighbours (void *cls,
+                   const GNUNET_HashCode * key,
+                   void *value)
+{
+  struct NeighbourList *n = value;
+  struct GNUNET_TRANSPORT_TransmitHandle *th;
+
+  switch (n->transmit_stage)
+    {
+    case TS_NEW:
+    case TS_TRANSMITTED:
+      /* nothing to do */
+      break;
+    case TS_QUEUED:
+    case TS_TRANSMITTED_QUEUED:
+      th = &n->transmit_handle;
+      if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+        {
+          GNUNET_SCHEDULER_cancel (th->notify_delay_task);
+          th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+        }
+      GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
+      break;
+    default:
+      GNUNET_break (0);
+    }
+  GNUNET_free (n);
+  return GNUNET_YES;
+}
+
+
 /**
  * Connect to the transport service.  Note that the connection may
  * complete (or fail) asynchronously.
@@ -1382,6 +1518,7 @@ GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
   ret->nc_cb = nc;
   ret->nd_cb = nd;
   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
+  ret->neighbours = GNUNET_CONTAINER_multihashmap_create(STARTING_NEIGHBOURS_SIZE);
   schedule_reconnect (ret);
   return ret;
 }
@@ -1393,8 +1530,6 @@ GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
 void
 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
 {
-  struct GNUNET_TRANSPORT_TransmitHandle *th;
-  struct NeighbourList *n;
   struct HelloWaitList *hwl;
   struct GNUNET_CLIENT_Connection *client;
   struct ControlMessage *cm;
@@ -1402,31 +1537,20 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
 #endif
+  /* FIXME: this flag is dubious, we should be able to do this
+     more cleanly; also, we should probably do 'disconnect'
+     callbacks for every connected peer here, i.e. by calling
+     the iterator with 'forget_neighbours' instead of 'delete_neighbours'.
+  */
+  
   handle->in_disconnect = GNUNET_YES;
-  while (NULL != (n = handle->neighbours))
-    {
-      handle->neighbours = n->next;
-      switch (n->transmit_stage)
-       {
-       case TS_NEW:
-       case TS_TRANSMITTED:
-         /* nothing to do */
-         break;
-       case TS_QUEUED:
-       case TS_TRANSMITTED_QUEUED:
-         th = &n->transmit_handle;
-         if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
-           {
-             GNUNET_SCHEDULER_cancel (th->notify_delay_task);
-             th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
-           }
-         GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
-         break;
-       default:
-         GNUNET_break (0);
-       }
-      GNUNET_free (n);
-    }
+
+  GNUNET_assert (GNUNET_SYSERR !=
+                GNUNET_CONTAINER_multihashmap_iterate(handle->neighbours,
+                                                      &delete_neighbours,
+                                                      handle));
+  GNUNET_CONTAINER_multihashmap_destroy (handle->neighbours);
+
   while (NULL != (hwl = handle->hwl_head))
     {
       handle->hwl_head = hwl->next;
@@ -1886,7 +2010,7 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
   n = th->neighbour;
 #if DEBUG_TRANSPORT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmission request of %u bytes to `%4s' was cancelled.\n",
+              "Transmission request of %u bytes to `%4s' was canceled.\n",
               th->notify_size - sizeof (struct OutboundMessage),
               GNUNET_i2s (&n->id));
 #endif
@@ -1902,7 +2026,8 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
       break;
     case TS_QUEUED:
       n->transmit_stage = TS_NEW;
-      if (n->is_connected == GNUNET_NO)
+      if ( (n->in_disconnect == GNUNET_NO) &&
+          (n->is_connected == GNUNET_NO) )
        neighbour_free (n);
       break;
     case TS_TRANSMITTED: