- move connection message accounting
authorBart Polot <bart@net.in.tum.de>
Thu, 10 Oct 2013 17:23:13 +0000 (17:23 +0000)
committerBart Polot <bart@net.in.tum.de>
Thu, 10 Oct 2013 17:23:13 +0000 (17:23 +0000)
src/mesh/gnunet-service-mesh_connection.c
src/mesh/gnunet-service-mesh_peer.c
src/mesh/gnunet-service-mesh_peer.h
src/mesh/gnunet-service-mesh_tunnel.c

index 2feeab7cd130af6eecab4f3c0755c44766c7e4e8..63e4b8bd0db49d44c4e7475527ebc1ca9b0edd2b 100644 (file)
@@ -353,20 +353,25 @@ connection_change_state (struct MeshConnection* c,
  *
  * @param cls Closure.
  * @param c Connection this message was on.
+ * @param type Type of message sent.
+ * @param fwd Was this a FWD going message?
+ * @param size Size of the message.
  * @param wait Time spent waiting for core (only the time for THIS message)
  */
 static void 
 message_sent (void *cls,
-              struct MeshConnection *c,
+              struct MeshConnection *c, uint16_t type,
+              int fwd, size_t size,
               struct GNUNET_TIME_Relative wait)
 {
   struct MeshConnectionPerformance *p;
-  size_t size = (size_t) cls;
+  struct MeshFlowControl *fc;
   double usecsperbyte;
 
   if (NULL == c->perf)
     return; /* Only endpoints are interested in this. */
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
   p = c->perf;
   usecsperbyte = ((double) wait.rel_value_us) / size;
   if (p->size == AVG_MSGS)
@@ -386,6 +391,16 @@ message_sent (void *cls,
     p->avg /= p->size;
   }
   p->idx = (p->idx + 1) % AVG_MSGS;
+
+  fc = fwd ? &c->fwd_fc : &c->bck_fc;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "!  Q_N- %p %u\n", fc, fc->queue_n);
+  fc->queue_n--;
+  c->pending_messages--;
+  if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "!  destroying connection!\n");
+    GMC_destroy (c);
+  }
 }
 
 
@@ -528,8 +543,7 @@ send_connection_ack (struct MeshConnection *connection, int fwd)
                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
                  sizeof (struct GNUNET_MESH_ConnectionACK),
                  connection, NULL, fwd,
-                 &message_sent,
-                 (void *) sizeof (struct GNUNET_MESH_ConnectionACK));
+                 &message_sent, NULL);
   if (MESH_TUNNEL3_NEW == GMT_get_state (t))
     GMT_change_state (t, MESH_TUNNEL3_WAITING);
   if (MESH_CONNECTION_READY != connection->state)
@@ -1933,9 +1947,11 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
                            struct MeshChannel *ch,
                            int fwd)
 {
+  struct MeshFlowControl *fc;
   void *data;
   size_t size;
   uint16_t type;
+  int droppable;
 
   size = ntohs (message->size);
   data = GNUNET_malloc (size);
@@ -1944,6 +1960,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u) on connection %s\n",
               GNUNET_MESH_DEBUG_M2S (type), size, GNUNET_h2s (&c->id));
 
+  droppable = GNUNET_YES;
   switch (type)
   {
     struct GNUNET_MESH_Encrypted *emsg;
@@ -1972,6 +1989,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
       amsg = (struct GNUNET_MESH_ACK *) data;
       amsg->cid = c->id;
       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
+      droppable = GNUNET_NO;
       break;
 
     case GNUNET_MESSAGE_TYPE_MESH_POLL:
@@ -1979,6 +1997,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
       pmsg->cid = c->id;
       pmsg->pid = htonl (fwd ? c->fwd_fc.last_pid_sent : c->bck_fc.last_pid_sent);
       LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid));
+      droppable = GNUNET_NO;
       break;
 
     case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY:
@@ -2001,8 +2020,30 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
       GNUNET_break (0);
   }
 
+  fc = fwd ? &c->fwd_fc : &c->bck_fc;
+  if (fc->queue_n >= fc->queue_max && droppable)
+  {
+    GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
+                              1, GNUNET_NO);
+    GNUNET_break (0);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+                "queue full: %u/%u\n",
+                fc->queue_n, fc->queue_max);
+    return; /* Drop this message */
+  }
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack %u\n", fc->last_ack_recv);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
+  if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
+  {
+    GMC_start_poll (c, fwd);
+  }
+  fc->queue_n++;
+  c->pending_messages++;
+
   GMP_queue_add (get_hop (c, fwd), data, type, size, c, ch, fwd,
-                 &message_sent, (void *) size);
+                 &message_sent, NULL);
 }
 
 
@@ -2023,10 +2064,8 @@ enum MeshTunnel3State state;
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n");
   GMP_queue_add (get_next_hop (connection), NULL,
                  GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE,
-                 size,
-                 connection,
-                 NULL,
-                 GNUNET_YES, &message_sent, (void *) size);
+                 size, connection, NULL,
+                 GNUNET_YES, &message_sent, NULL);
   state = GMT_get_state (connection->t);
   if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state)
     GMT_change_state (connection->t, MESH_TUNNEL3_WAITING);
index 6d87960b5c7d7a3f599e44849d1470089ea0d271..f997cf838c9eb5cb26011d0f1a04394fa4d35521 100644 (file)
@@ -768,7 +768,6 @@ static size_t
 queue_send (void *cls, size_t size, void *buf)
 {
   struct MeshPeer *peer = cls;
-  struct MeshFlowControl *fc;
   struct MeshConnection *c;
   struct GNUNET_MessageHeader *msg;
   struct MeshPeerQueue *queue;
@@ -798,7 +797,6 @@ queue_send (void *cls, size_t size, void *buf)
   }
   c = queue->c;
   fwd = queue->fwd;
-  fc = fwd ? &c->fwd_fc : &c->bck_fc;
 
   dst_id = GNUNET_PEER_resolve2 (peer->id);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "*   towards %s\n", GNUNET_i2s (dst_id));
@@ -825,7 +823,7 @@ queue_send (void *cls, size_t size, void *buf)
   /* Fill buf */
   switch (queue->type)
   {
-    case GNUNET_MESSAGE_TYPE_MESH_TUNNEL3_DESTROY:
+    case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY:
     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY:
     case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
     case GNUNET_MESSAGE_TYPE_MESH_FWD:
@@ -877,14 +875,6 @@ queue_send (void *cls, size_t size, void *buf)
     data_size = 0;
   }
 
-  if (NULL != queue->callback)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "*   Calling callback\n");
-    queue->callback (queue->callback_cls,
-                    queue->c,
-                    GNUNET_TIME_absolute_get_duration (queue->start_waiting));
-  }
-
   /* Free queue, but cls was freed by send_core_* */
   ch = queue->ch;
   GMP_queue_destroy (queue, GNUNET_NO);
@@ -940,22 +930,13 @@ queue_send (void *cls, size_t size, void *buf)
       fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
     }
   }
-  if (NULL != c)
-  {
-    c->pending_messages--;
-    if (GNUNET_YES == c->destroy && 0 == c->pending_messages)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "*  destroying connection!\n");
-      GMC_destroy (c);
-    }
-  }
 
   if (NULL != t)
   {
     t->pending_messages--;
     if (GNUNET_YES == t->destroy && 0 == t->pending_messages)
     {
-//       LOG (GNUNET_ERROR_TYPE_DEBUG, "*  destroying tunnel!\n");
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "*  destroying tunnel!\n");
       GMT_destroy (t);
     }
   }
@@ -1021,24 +1002,19 @@ void
 GMP_queue_destroy (struct MeshPeerQueue *queue, int clear_cls)
 {
   struct MeshPeer *peer;
-  struct MeshFlowControl *fc;
-  int fwd;
 
-  fwd = queue->fwd;
   peer = queue->peer;
   GNUNET_assert (NULL != queue->c);
-  fc = fwd ? &queue->c->fwd_fc : &queue->c->bck_fc;
 
   if (GNUNET_YES == clear_cls)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "   queue destroy type %s\n",
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "#   queue destroy type %s\n",
                 GNUNET_MESH_DEBUG_M2S (queue->type));
     switch (queue->type)
     {
       case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_DESTROY:
-      case GNUNET_MESSAGE_TYPE_MESH_TUNNEL3_DESTROY:
+      case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY:
         LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n");
-        GNUNET_break (GNUNET_YES == queue->c->destroy);
         /* fall through */
       case GNUNET_MESSAGE_TYPE_MESH_FWD:
       case GNUNET_MESSAGE_TYPE_MESH_BCK:
@@ -1047,33 +1023,31 @@ GMP_queue_destroy (struct MeshPeerQueue *queue, int clear_cls)
       case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
       case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
       case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN:
-        LOG (GNUNET_ERROR_TYPE_DEBUG, "   prebuilt message\n");;
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "#   prebuilt message\n");;
         GNUNET_free_non_null (queue->cls);
         break;
 
       default:
         GNUNET_break (0);
-        LOG (GNUNET_ERROR_TYPE_ERROR, "   type %s unknown!\n",
+        LOG (GNUNET_ERROR_TYPE_ERROR, "#   type %s unknown!\n",
                     GNUNET_MESH_DEBUG_M2S (queue->type));
     }
-
   }
   GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue);
 
   if (queue->type != GNUNET_MESSAGE_TYPE_MESH_ACK &&
       queue->type != GNUNET_MESSAGE_TYPE_MESH_POLL)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N- %p %u\n", fc, fc->queue_n);
-    fc->queue_n--;
     peer->queue_n--;
   }
-  if (NULL != queue->c)
+
+  if (NULL != queue->callback)
   {
-    queue->c->pending_messages--;
-    if (NULL != queue->c->t)
-    {
-      queue->c->t->pending_messages--;
-    }
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "#   Calling callback\n");
+    queue->callback (queue->callback_cls,
+                     queue->c, queue->type,
+                     queue->fwd, queue->size,
+                     GNUNET_TIME_absolute_get_duration (queue->start_waiting));
   }
 
   GNUNET_free (queue);
@@ -1124,34 +1098,8 @@ GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size,
   }
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "fc %p\n", fc);
-  if (fc->queue_n >= fc->queue_max && 0 == priority)
-  {
-    GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
-                              1, GNUNET_NO);
-    GNUNET_break (0);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-                "queue full: %u/%u\n",
-                fc->queue_n, fc->queue_max);
-    return; /* Drop this message */
-  }
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "     ack %u\n", fc->last_ack_recv);
-  if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv))
-  {
-    call_core = GNUNET_NO;
-    if (GNUNET_SCHEDULER_NO_TASK == fc->poll_task &&
-        GNUNET_MESSAGE_TYPE_MESH_POLL != type)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-                  "no buffer space (%u > %u): starting poll\n",
-                  fc->last_pid_sent + 1, fc->last_ack_recv);
-      GMC_start_poll (c, fwd);
-    }
-  }
-  else
-    call_core = GNUNET_YES;
+  call_core = GMC_is_sendable (c, fwd);
   queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
   queue->cls = cls;
   queue->type = type;
@@ -1181,8 +1129,6 @@ GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size,
   else
   {
     GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, queue);
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
-    fc->queue_n++;
     peer->queue_n++;
   }
 
@@ -1209,9 +1155,6 @@ GMP_queue_add (struct MeshPeer *peer, void *cls, uint16_t type, size_t size,
                 peer2s (peer));
 
   }
-  c->pending_messages++;
-  if (NULL != c->t)
-    c->t->pending_messages++;
 }
 
 
index abc45f0c969e5ca1d7aad33d865bcd78ccf1b4eb..c7265ce2f62327b5974e7fd8ff747d3d34e10d07 100644 (file)
@@ -50,10 +50,14 @@ struct MeshPeer;
  *
  * @param cls Closure.
  * @param c Connection this message was on.
+ * @param type Type of message sent.
+ * @param fwd Was this a FWD going message?
+ * @param size Size of the message.
  * @param wait Time spent waiting for core (only the time for THIS message)
  */
 typedef void (*GMP_sent) (void *cls,
-                          struct MeshConnection *c,
+                          struct MeshConnection *c, uint16_t type,
+                          int fwd, size_t size,
                           struct GNUNET_TIME_Relative wait);
 
 #include "gnunet-service-mesh_connection.h"
index 286eca61847ca404fc8a5a56807cd180f034f5a5..c5ccc9f5c07d915022cd616540467697b38ce510 100644 (file)
@@ -1151,6 +1151,7 @@ GMT_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
   }
   msg->reserved = 0;
 
+  t->pending_messages++;
   GMC_send_prebuilt_message (&msg->header, c, ch, fwd);
 }