new operation queue for limiting overlay connects
[oweals/gnunet.git] / src / transport / transport_api.c
index 2e50b490175bdc0179d90caf93835dc1d3e66d93..49f82e66b78b59fa80a71275ed5479006136ade9 100644 (file)
@@ -146,6 +146,11 @@ struct Neighbour
    */
   int is_ready;
 
+  /**
+   * Sending consumed more bytes on wire than payload was announced
+   * This overhead is added to the delay of next sending operation
+   */
+  size_t traffic_overhead;
 };
 
 
@@ -348,12 +353,13 @@ neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
   n->id = *pid;
   n->h = h;
   n->is_ready = GNUNET_YES;
+  n->traffic_overhead = 0;
   GNUNET_BANDWIDTH_tracker_init (&n->out_tracker,
                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
                                  MAX_BANDWIDTH_CARRY_S);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap_put (h->neighbours,
-                                                    &pid->hashPubKey, n,
+                                                    &n->id.hashPubKey, n,
                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   return n;
 }
@@ -410,6 +416,8 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
   struct GNUNET_PeerIdentity me;
   uint16_t size;
   uint32_t ats_count;
+  uint32_t bytes_msg;
+  uint32_t bytes_physical;
 
   GNUNET_assert (h->client != NULL);
   if (msg == NULL)
@@ -508,11 +516,21 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
       break;
     }
     okm = (const struct SendOkMessage *) msg;
+    bytes_msg = ntohl (okm->bytes_msg);
+    bytes_physical = ntohl (okm->bytes_physical);
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving `%s' message, transmission %s.\n",
          "SEND_OK", ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+
     n = neighbour_find (h, &okm->peer);
     if (n == NULL)
       break;
+
+    if (bytes_physical >= bytes_msg)
+    {
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "Overhead for %u byte message: %u \n",
+            bytes_msg, bytes_physical - bytes_msg);
+      n->traffic_overhead += bytes_physical - bytes_msg;
+    }
     GNUNET_break (GNUNET_NO == n->is_ready);
     n->is_ready = GNUNET_YES;
     if ((n->th != NULL) && (n->hn == NULL))
@@ -783,9 +801,12 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
   if (NULL != h->control_head)
     delay = GNUNET_TIME_UNIT_ZERO;
   else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
+  {
     delay =
         GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
-                                            n->th->notify_size);
+                                            n->th->notify_size + n->traffic_overhead);
+    n->traffic_overhead = 0;
+  }
   else
     return;                     /* no work to be done */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -988,9 +1009,13 @@ GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle,
                               const struct GNUNET_PeerIdentity *target)
 {
   struct GNUNET_PeerIdentity *pid;
-
   if (NULL == handle->client)
-    return;
+  {
+      /* FIXME: handle->client can be NULL when transport api is reconnecting */
+      GNUNET_break (0);
+      return;
+  }
+
   pid = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
   *pid = *target;
   schedule_control_transmit (handle,
@@ -1187,10 +1212,17 @@ GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
   ret->nd_cb = nd;
   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   ret->neighbours =
-      GNUNET_CONTAINER_multihashmap_create (STARTING_NEIGHBOURS_SIZE);
+    GNUNET_CONTAINER_multihashmap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
   ret->ready_heap =
       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-  ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
+  ret->client = GNUNET_CLIENT_connect ("transport", cfg);
+  if (ret->client == NULL)
+  {
+    GNUNET_free (ret);
+    return NULL;
+  }
+  schedule_control_transmit (ret, sizeof (struct StartMessage), &send_start, ret);
   return ret;
 }
 
@@ -1285,7 +1317,8 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
   th->priority = priority;
   n->th = th;
   /* calculate when our transmission should be ready */
-  delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size);
+  delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size + n->traffic_overhead);
+  n->traffic_overhead = 0;
   if (delay.rel_value > timeout.rel_value)
     delay.rel_value = 0;        /* notify immediately (with failure) */
   LOG (GNUNET_ERROR_TYPE_DEBUG,