new operation queue for limiting overlay connects
[oweals/gnunet.git] / src / transport / transport_api.c
index 49a885c73bc42710a1de7c97c833e77eabedef0b..49f82e66b78b59fa80a71275ed5479006136ade9 100644 (file)
@@ -146,6 +146,11 @@ struct Neighbour
    */
   int is_ready;
 
+  /**
+   * Sending consumed more bytes on wire than payload was announced
+   * This overhead is added to the delay of next sending operation
+   */
+  size_t traffic_overhead;
 };
 
 
@@ -348,12 +353,13 @@ neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
   n->id = *pid;
   n->h = h;
   n->is_ready = GNUNET_YES;
+  n->traffic_overhead = 0;
   GNUNET_BANDWIDTH_tracker_init (&n->out_tracker,
                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
                                  MAX_BANDWIDTH_CARRY_S);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap_put (h->neighbours,
-                                                    &pid->hashPubKey, n,
+                                                    &n->id.hashPubKey, n,
                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   return n;
 }
@@ -410,6 +416,8 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
   struct GNUNET_PeerIdentity me;
   uint16_t size;
   uint32_t ats_count;
+  uint32_t bytes_msg;
+  uint32_t bytes_physical;
 
   GNUNET_assert (h->client != NULL);
   if (msg == NULL)
@@ -508,11 +516,21 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
       break;
     }
     okm = (const struct SendOkMessage *) msg;
+    bytes_msg = ntohl (okm->bytes_msg);
+    bytes_physical = ntohl (okm->bytes_physical);
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving `%s' message, transmission %s.\n",
          "SEND_OK", ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
+
     n = neighbour_find (h, &okm->peer);
     if (n == NULL)
       break;
+
+    if (bytes_physical >= bytes_msg)
+    {
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "Overhead for %u byte message: %u \n",
+            bytes_msg, bytes_physical - bytes_msg);
+      n->traffic_overhead += bytes_physical - bytes_msg;
+    }
     GNUNET_break (GNUNET_NO == n->is_ready);
     n->is_ready = GNUNET_YES;
     if ((n->th != NULL) && (n->hn == NULL))
@@ -783,9 +801,12 @@ schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
   if (NULL != h->control_head)
     delay = GNUNET_TIME_UNIT_ZERO;
   else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
+  {
     delay =
         GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
-                                            n->th->notify_size);
+                                            n->th->notify_size + n->traffic_overhead);
+    n->traffic_overhead = 0;
+  }
   else
     return;                     /* no work to be done */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -988,9 +1009,13 @@ GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle,
                               const struct GNUNET_PeerIdentity *target)
 {
   struct GNUNET_PeerIdentity *pid;
-
   if (NULL == handle->client)
-    return;
+  {
+      /* FIXME: handle->client can be NULL when transport api is reconnecting */
+      GNUNET_break (0);
+      return;
+  }
+
   pid = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
   *pid = *target;
   schedule_control_transmit (handle,
@@ -1023,15 +1048,16 @@ send_hello (void *cls, size_t size, void *buf)
   struct GNUNET_MessageHeader *msg = shc->msg;
   uint16_t ssize;
   struct GNUNET_SCHEDULER_TaskContext tc;
+  tc.read_ready = NULL;
+  tc.write_ready = NULL;
+  tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
 
   if (buf == NULL)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Timeout while trying to transmit `%s' request.\n", "HELLO");
-       tc.read_ready = NULL;
-       tc.write_ready = NULL;
-       tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
-       shc->cont (shc->cls, &tc);
+    if (NULL != shc->cont)
+      shc->cont (shc->cls, &tc);
     GNUNET_free (msg);
     GNUNET_free (shc);
     return 0;
@@ -1041,10 +1067,9 @@ send_hello (void *cls, size_t size, void *buf)
   GNUNET_assert (size >= ssize);
   memcpy (buf, msg, ssize);
   GNUNET_free (msg);
-  tc.read_ready = NULL;
-  tc.write_ready = NULL;
   tc.reason = GNUNET_SCHEDULER_REASON_READ_READY;
-  shc->cont (shc->cls, &tc);
+  if (NULL != shc->cont)
+    shc->cont (shc->cls, &tc);
   GNUNET_free (shc);
   return ssize;
 }
@@ -1074,12 +1099,14 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
   struct SendHelloContext * shc;
   struct GNUNET_SCHEDULER_TaskContext tc;
 
+  tc.read_ready = NULL;
+  tc.write_ready = NULL;
+  tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
+
   if (NULL == handle->client)
   {
-       tc.read_ready = NULL;
-       tc.write_ready = NULL;
-       tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
-       cont (cls, &tc);
+    if (NULL != cont)
+      cont (cls, &tc);
     return;
   }
   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
@@ -1089,10 +1116,9 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
       GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello, &peer))
   {
     GNUNET_break (0);
-       tc.read_ready = NULL;
-       tc.write_ready = NULL;
-       tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
-       cont (cls, &tc);
+    if (NULL != cont)
+      if (NULL != cont)
+        cont (cls, &tc);
     return;
   }
   msg = GNUNET_malloc (size);
@@ -1186,10 +1212,17 @@ GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
   ret->nd_cb = nd;
   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   ret->neighbours =
-      GNUNET_CONTAINER_multihashmap_create (STARTING_NEIGHBOURS_SIZE);
+    GNUNET_CONTAINER_multihashmap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
   ret->ready_heap =
       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-  ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
+  ret->client = GNUNET_CLIENT_connect ("transport", cfg);
+  if (ret->client == NULL)
+  {
+    GNUNET_free (ret);
+    return NULL;
+  }
+  schedule_control_transmit (ret, sizeof (struct StartMessage), &send_start, ret);
   return ret;
 }
 
@@ -1284,7 +1317,8 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
   th->priority = priority;
   n->th = th;
   /* calculate when our transmission should be ready */
-  delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size);
+  delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size + n->traffic_overhead);
+  n->traffic_overhead = 0;
   if (delay.rel_value > timeout.rel_value)
     delay.rel_value = 0;        /* notify immediately (with failure) */
   LOG (GNUNET_ERROR_TYPE_DEBUG,