- use window size client<->server, x4 faster
[oweals/gnunet.git] / src / mesh / mesh_api.c
index 8ac1efa615ee7c75392260a7f770821aec600103..46747a0d4c8649c2d39f19675aefbfa900124dd3 100644 (file)
@@ -21,7 +21,6 @@
  * @author Bartlomiej Polot
  *
  * STRUCTURE:
- * - CONSTANTS
  * - DATA STRUCTURES
  * - DECLARATIONS
  * - AUXILIARY FUNCTIONS
 #define LOG(kind,...) GNUNET_log_from (kind, "mesh-api",__VA_ARGS__)
 
 
-/******************************************************************************/
-/************************        CONSTANTS         ****************************/
-/******************************************************************************/
-
-#define HIGH_PID 0xFFFF0000
-#define LOW_PID 0x0000FFFF
-
-#define PID_OVERFLOW(pid, max) (pid > HIGH_PID && max < LOW_PID)
-
 /******************************************************************************/
 /************************      DATA STRUCTURES     ****************************/
 /******************************************************************************/
@@ -328,16 +318,19 @@ struct GNUNET_MESH_Tunnel
   int buffering;
 
     /**
-     * Next packet PID.
+     * Next packet ID to send.
      */
-  uint32_t pid;
+  uint32_t next_send_pid;
 
     /**
-     * Maximum allowed PID.
+     * Maximum allowed PID to send (ACK recevied).
      */
-  uint32_t max_pid;
-
+  uint32_t max_send_pid;
 
+    /**
+     * Last pid received from the service.
+     */
+  uint32_t last_recv_pid;
 };
 
 
@@ -378,6 +371,39 @@ th_is_payload (struct GNUNET_MESH_TransmitHandle *th)
 }
 
 
+/**
+ * Check whether there is any message ready in the queue and find the size.
+ * 
+ * @param h Mesh handle.
+ * 
+ * @return The size of the first ready message in the queue,
+ *         0 if there is none.
+ */
+static size_t
+message_ready_size (struct GNUNET_MESH_Handle *h)
+{
+  struct GNUNET_MESH_TransmitHandle *th;
+  struct GNUNET_MESH_Tunnel *t;
+
+  for (th = h->th_head; NULL != th; th = th->next)
+  {
+    t = th->tunnel;
+    if (GNUNET_NO == th_is_payload (th))
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "  message internal\n");
+      return th->size;
+    }
+    if (GNUNET_NO == GMC_is_pid_bigger(t->next_send_pid, t->max_send_pid))
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "  message payload ok (%u <= %u)\n",
+           t->next_send_pid, t->max_send_pid);
+      return th->size;
+    }
+  }
+  return 0;
+}
+
+
 /**
  * Get the tunnel handler for the tunnel specified by id from the given handle
  * @param h Mesh handle
@@ -428,6 +454,8 @@ create_tunnel (struct GNUNET_MESH_Handle *h, MESH_TunnelNumber tid)
   {
     t->tid = tid;
   }
+  t->max_send_pid = INITIAL_WINDOW_SIZE - 1;
+  t->last_recv_pid = (uint32_t) -1;
   return t;
 }
 
@@ -498,7 +526,7 @@ destroy_tunnel (struct GNUNET_MESH_Tunnel *t, int call_cleaner)
 
   /* if there are no more pending requests with mesh service, cancel active request */
   /* Note: this should be unnecessary... */
-  if ( (NULL == h->th_head) && (NULL != h->th))
+  if ((0 == message_ready_size (h)) && (NULL != h->th))
   {
     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
     h->th = NULL;
@@ -588,6 +616,7 @@ remove_peer_from_tunnel (struct GNUNET_MESH_Peer *p)
 
 /**
  * Notify client that the transmission has timed out
+ * 
  * @param cls closure
  * @param tc task context
  */
@@ -599,12 +628,13 @@ timeout_transmission (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
   mesh = th->tunnel->mesh;
   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
+  th->tunnel->packet_size = 0;
   if (GNUNET_YES == th_is_payload (th))
     th->notify (th->notify_cls, 0, NULL);
   GNUNET_free (th);
-  if ((NULL == mesh->th_head) && (NULL != mesh->th))
+  if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
   {
-    /* queue empty, no point in asking for transmission */
+    /* nothing ready to transmit, no point in asking for transmission */
     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);
     mesh->th = NULL;
   }
@@ -655,6 +685,31 @@ send_packet (struct GNUNET_MESH_Handle *h,
              struct GNUNET_MESH_Tunnel *tunnel);
 
 
+/**
+ * Send an ack on the tunnel to confirm the processing of a message.
+ * 
+ * @param h Mesh handle.
+ * @param t Tunnel on which to send the ACK.
+ */
+static void
+send_ack (struct GNUNET_MESH_Handle *h, struct GNUNET_MESH_Tunnel *t)
+{
+  struct GNUNET_MESH_LocalAck msg;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Sending ACK on tunnel %X: %u\n",
+       t->tid, t->last_recv_pid + INITIAL_WINDOW_SIZE);
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
+  msg.header.size = htons (sizeof (msg));
+  msg.tunnel_id = htonl (t->tid);
+  msg.max_pid = htonl (t->last_recv_pid + INITIAL_WINDOW_SIZE);
+
+  send_packet (h, &msg.header, t);
+  return;
+}
+
+
+
 /**
  * Reconnect callback: tries to reconnect again after a failer previous
  * reconnecttion
@@ -696,11 +751,16 @@ send_connect (struct GNUNET_MESH_Handle *h)
     for (napps = 0; napps < h->n_applications; napps++)
     {
       apps[napps] = htonl (h->applications[napps]);
-      LOG (GNUNET_ERROR_TYPE_DEBUG, " app %u\n", h->applications[napps]);
+      LOG (GNUNET_ERROR_TYPE_DEBUG, " app %u\n",
+           h->applications[napps]);
     }
     types = (uint16_t *) & apps[napps];
     for (ntypes = 0; ntypes < h->n_handlers; ntypes++)
+    {
       types[ntypes] = htons (h->message_handlers[ntypes].type);
+      LOG (GNUNET_ERROR_TYPE_DEBUG, " type %u\n",
+           h->message_handlers[ntypes].type);
+    }
     msg->applications = htons (napps);
     msg->types = htons (ntypes);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -728,8 +788,9 @@ do_reconnect (struct GNUNET_MESH_Handle *h)
   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
   LOG (GNUNET_ERROR_TYPE_DEBUG, "*******   RECONNECT   *******\n");
   LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
 
-  h->in_receive = GNUNET_NO;
   /* disconnect */
   if (NULL != h->th)
   {
@@ -775,6 +836,9 @@ do_reconnect (struct GNUNET_MESH_Handle *h)
        */
       continue;
     }
+    t->next_send_pid = 0;
+    t->max_send_pid = INITIAL_WINDOW_SIZE - 1;
+    t->last_recv_pid = (uint32_t) -1;
     tmsg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
     tmsg.header.size = htons (sizeof (struct GNUNET_MESH_TunnelMessage));
     tmsg.tunnel_id = htonl (t->tid);
@@ -842,6 +906,7 @@ static void
 reconnect (struct GNUNET_MESH_Handle *h)
 {
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requested RECONNECT\n");
+  h->in_receive = GNUNET_NO;
   if (GNUNET_SCHEDULER_NO_TASK == h->reconnect_task)
     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
                                                       &reconnect_cbk, h);
@@ -866,6 +931,7 @@ process_tunnel_created (struct GNUNET_MESH_Handle *h,
   MESH_TunnelNumber tid;
 
   tid = ntohl (msg->tunnel_id);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating incoming tunnel %X\n", tid);
   if (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
   {
     GNUNET_break (0);
@@ -888,8 +954,9 @@ process_tunnel_created (struct GNUNET_MESH_Handle *h,
     t->tid = tid;
     atsi.type = 0;
     atsi.value = 0;
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  created tunnel %p\n", t);
     t->ctx = h->new_tunnel (h->cls, t, &msg->peer, &atsi);
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel %X\n", t->tid);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n");
   }
   else
   {
@@ -1015,6 +1082,7 @@ process_incoming_data (struct GNUNET_MESH_Handle *h,
   struct GNUNET_MESH_ToOrigin *to_orig;
   struct GNUNET_MESH_Tunnel *t;
   unsigned int i;
+  uint32_t pid;
   uint16_t type;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n");
@@ -1027,6 +1095,7 @@ process_incoming_data (struct GNUNET_MESH_Handle *h,
     t = retrieve_tunnel (h, ntohl (ucast->tid));
     payload = (struct GNUNET_MessageHeader *) &ucast[1];
     peer = &ucast->oid;
+    pid = ntohl (ucast->pid);
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  ucast on tunnel %s [%X]\n",
          GNUNET_i2s (peer), ntohl (ucast->tid));
     break;
@@ -1035,6 +1104,7 @@ process_incoming_data (struct GNUNET_MESH_Handle *h,
     t = retrieve_tunnel (h, ntohl (mcast->tid));
     payload = (struct GNUNET_MessageHeader *) &mcast[1];
     peer = &mcast->oid;
+    pid = ntohl (mcast->pid);
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  mcast on tunnel %s [%X]\n",
          GNUNET_i2s (peer), ntohl (mcast->tid));
     break;
@@ -1043,6 +1113,7 @@ process_incoming_data (struct GNUNET_MESH_Handle *h,
     t = retrieve_tunnel (h, ntohl (to_orig->tid));
     payload = (struct GNUNET_MessageHeader *) &to_orig[1];
     peer = &to_orig->sender;
+    pid = ntohl (to_orig->pid);
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  torig on tunnel %s [%X]\n",
          GNUNET_i2s (peer), ntohl (to_orig->tid));
     break;
@@ -1050,11 +1121,22 @@ process_incoming_data (struct GNUNET_MESH_Handle *h,
     GNUNET_break (0);
     return GNUNET_YES;
   }
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  pid %u\n", pid);
   if (NULL == t)
   {
     /* Tunnel was ignored, probably service didn't get it yet */
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  ignored!\n");
+    return GNUNET_YES;
+  }
+    if (GNUNET_YES ==
+        GMC_is_pid_bigger(pid, t->last_recv_pid + INITIAL_WINDOW_SIZE))
+  {
+    GNUNET_break (0);
+    LOG (GNUNET_ERROR_TYPE_WARNING, "  unauthorized message!\n");
+    // FIXME fc what now? accept? reject?
     return GNUNET_YES;
   }
+  t->last_recv_pid = pid;
   type = ntohs (payload->type);
   for (i = 0; i < h->n_handlers; i++)
   {
@@ -1076,6 +1158,7 @@ process_incoming_data (struct GNUNET_MESH_Handle *h,
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "callback completed successfully\n");
+        send_ack (h, t);
       }
     }
   }
@@ -1111,13 +1194,19 @@ process_ack (struct GNUNET_MESH_Handle *h,
     return;
   }
   ack = ntohl (msg->max_pid);
-  if (ack > t->max_pid || PID_OVERFLOW (t->max_pid, ack))
-    t->max_pid = ack;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  on tunnel %X, ack %u!\n", t->tid, ack);
+  if (GNUNET_YES == GMC_is_pid_bigger(ack, t->max_send_pid))
+    t->max_send_pid = ack;
+  else
+    return;
   if (NULL == h->th && 0 < t->packet_size)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  tmt rdy was NULL, requesting!\n", t->tid, ack);
     h->th =
         GNUNET_CLIENT_notify_transmit_ready (h->client, t->packet_size,
                                              GNUNET_TIME_UNIT_FOREVER_REL,
                                              GNUNET_YES, &send_callback, h);
+  }
 }
 
 
@@ -1138,8 +1227,10 @@ msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
     reconnect (h);
     return;
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "received a message type %hu from MESH\n",
-       ntohs (msg->type));
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "\n",
+       GNUNET_MESH_DEBUG_M2S (ntohs (msg->type)));
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n",
+       GNUNET_MESH_DEBUG_M2S (ntohs (msg->type)));
   switch (ntohs (msg->type))
   {
     /* Notify of a new incoming tunnel */
@@ -1168,12 +1259,20 @@ msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
   default:
     /* We shouldn't get any other packages, log and ignore */
     LOG (GNUNET_ERROR_TYPE_WARNING,
-         "unsolicited message form service (type %d)\n",
+         "unsolicited message form service (type %hu)\n",
          ntohs (msg->type));
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n");
-  GNUNET_CLIENT_receive (h->client, &msg_received, h,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
+  if (GNUNET_YES == h->in_receive)
+  {
+    GNUNET_CLIENT_receive (h->client, &msg_received, h,
+                           GNUNET_TIME_UNIT_FOREVER_REL);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "in receive off, not calling CLIENT_receive\n");
+  }
 }
 
 
@@ -1201,7 +1300,9 @@ send_callback (void *cls, size_t size, void *buf)
   char *cbuf = buf;
   size_t tsize;
   size_t psize;
+  size_t nsize;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Send packet() Buffer %u\n", size);
   if ((0 == size) || (NULL == buf))
   {
@@ -1212,13 +1313,15 @@ send_callback (void *cls, size_t size, void *buf)
   }
   tsize = 0;
   next = h->th_head;
-  while ((NULL != (th = next)) && (size >= th->size))
+  nsize = message_ready_size (h);
+  while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize))
   {
     t = th->tunnel;
     if (GNUNET_YES == th_is_payload (th))
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG, " payload\n");
-      if (t->max_pid < t->pid && ! PID_OVERFLOW (t->pid, t->max_pid)) {
+      if (GNUNET_YES == GMC_is_pid_bigger(t->next_send_pid, t->max_send_pid))
+      {
         /* This tunnel is not ready to transmit yet, try next message */
         next = th->next;
         continue;
@@ -1233,8 +1336,8 @@ send_callback (void *cls, size_t size, void *buf)
         GNUNET_assert (size >= th->size);
         mh = (struct GNUNET_MessageHeader *) &cbuf[sizeof (to)];
         psize = th->notify (th->notify_cls, size - sizeof (to), mh);
-        LOG (GNUNET_ERROR_TYPE_DEBUG, "  to origin, type %u\n",
-             ntohs (mh->type));
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "  to origin, type %s\n",
+             GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
         if (psize > 0)
         {
           psize += sizeof (to);
@@ -1242,7 +1345,7 @@ send_callback (void *cls, size_t size, void *buf)
           to.header.size = htons (psize);
           to.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
           to.tid = htonl (t->tid);
-          // FIXME pid?
+          to.pid = htonl (t->next_send_pid);
           memset (&to.oid, 0, sizeof (struct GNUNET_PeerIdentity));
           memset (&to.sender, 0, sizeof (struct GNUNET_PeerIdentity));
           memcpy (cbuf, &to, sizeof (to));
@@ -1257,8 +1360,8 @@ send_callback (void *cls, size_t size, void *buf)
         GNUNET_assert (size >= th->size);
         mh = (struct GNUNET_MessageHeader *) &cbuf[sizeof (mc)];
         psize = th->notify (th->notify_cls, size - sizeof (mc), mh);
-        LOG (GNUNET_ERROR_TYPE_DEBUG, "  multicast, type %u\n",
-             ntohs (mh->type));
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "  multicast, type %s\n",
+             GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
         if (psize > 0)
         {
           psize += sizeof (mc);
@@ -1266,7 +1369,7 @@ send_callback (void *cls, size_t size, void *buf)
           mc.header.size = htons (psize);
           mc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
           mc.tid = htonl (t->tid);
-          mc.pid = htonl (t->pid);
+          mc.pid = htonl (t->next_send_pid);
           mc.ttl = 0;
           memset (&mc.oid, 0, sizeof (struct GNUNET_PeerIdentity));
           memcpy (cbuf, &mc, sizeof (mc));
@@ -1281,8 +1384,8 @@ send_callback (void *cls, size_t size, void *buf)
         GNUNET_assert (size >= th->size);
         mh = (struct GNUNET_MessageHeader *) &cbuf[sizeof (uc)];
         psize = th->notify (th->notify_cls, size - sizeof (uc), mh);
-        LOG (GNUNET_ERROR_TYPE_DEBUG, "  unicast, type %u\n",
-             ntohs (mh->type));
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "  unicast, type %s\n",
+             GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
         if (psize > 0)
         {
           psize += sizeof (uc);
@@ -1290,20 +1393,20 @@ send_callback (void *cls, size_t size, void *buf)
           uc.header.size = htons (psize);
           uc.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_UNICAST);
           uc.tid = htonl (t->tid);
-          uc.pid = htonl (t->pid);
+          uc.pid = htonl (t->next_send_pid);
           memset (&uc.oid, 0, sizeof (struct GNUNET_PeerIdentity));
           GNUNET_PEER_resolve (th->target, &uc.destination);
           memcpy (cbuf, &uc, sizeof (uc));
         }
       }
-      t->pid++;
+      t->next_send_pid++;
     }
     else
     {
       struct GNUNET_MessageHeader *mh = (struct GNUNET_MessageHeader *) &th[1];
 
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  mesh traffic, type %u\n",
-             ntohs (mh->type));
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "  mesh traffic, type %s\n",
+           GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
       memcpy (cbuf, &th[1], th->size);
       psize = th->size;
     }
@@ -1312,46 +1415,28 @@ send_callback (void *cls, size_t size, void *buf)
     GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
     GNUNET_free (th);
     next = h->th_head;
+    nsize = message_ready_size (h);
     cbuf += psize;
     size -= psize;
     tsize += psize;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  total size: %u\n", tsize);
   h->th = NULL;
-  if (NULL != h->th_head)
+  size = message_ready_size (h);
+  if (0 != size)
   {
-    int request = GNUNET_NO;
-
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  head not empty\n");
-    for (th = h->th_head; NULL != th; th = th->next)
-    {
-      struct GNUNET_MESH_Tunnel *t = th->tunnel;
-
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  [%p] notify: %p, size %u\n",
-           th, th->notify, th->size);
-      GNUNET_assert (NULL != t);
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  pid %u, max %u\n", t->pid, t->max_pid);
-
-      if (GNUNET_NO == th_is_payload (th) ||
-          (t->max_pid >= t->pid || PID_OVERFLOW (t->pid, t->max_pid)))
-      {
-        request = GNUNET_YES;
-        break;
-      }
-    }
-
-    if (GNUNET_YES == request)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  next size: %u\n", th->size);
-      h->th =
-          GNUNET_CLIENT_notify_transmit_ready (h->client, th->size,
-                                               GNUNET_TIME_UNIT_FOREVER_REL,
-                                               GNUNET_YES, &send_callback, h);
-    }
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  next size: %u\n", size);
+    h->th =
+        GNUNET_CLIENT_notify_transmit_ready (h->client, size,
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             GNUNET_YES, &send_callback, h);
+  }
+  else
+  {
+    if (NULL != h->th_head)
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "  can't transmit any more\n");
     else
-    {
       LOG (GNUNET_ERROR_TYPE_DEBUG, "  nothing left to transmit\n");
-    }
   }
   if (GNUNET_NO == h->in_receive)
   {
@@ -1382,6 +1467,8 @@ send_packet (struct GNUNET_MESH_Handle *h,
   struct GNUNET_MESH_TransmitHandle *th;
   size_t msize;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n",
+       GNUNET_MESH_DEBUG_M2S(ntohs(msg->type)));
   msize = ntohs (msg->size);
   th = GNUNET_malloc (sizeof (struct GNUNET_MESH_TransmitHandle) + msize);
   th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
@@ -1389,8 +1476,10 @@ send_packet (struct GNUNET_MESH_Handle *h,
   th->tunnel = tunnel;
   memcpy (&th[1], msg, msize);
   add_to_queue (h, th);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  queued\n");
   if (NULL != h->th)
     return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  calling ntfy tmt rdy for %u bytes\n", msize);
   h->th =
       GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
                                            GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1587,6 +1676,8 @@ GNUNET_MESH_tunnel_create (struct GNUNET_MESH_Handle *h, void *tunnel_ctx,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating new tunnel\n");
   t = create_tunnel (h, 0);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  at %p\n", t);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  number %X\n", t->tid);
   t->connect_handler = connect_handler;
   t->disconnect_handler = disconnect_handler;
   t->cls = handler_cls;
@@ -1937,8 +2028,11 @@ GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel, int cork,
   size_t overhead;
 
   GNUNET_assert (NULL != tunnel);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "mesh notify transmit ready called\n");
-  if (NULL != target)
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "    on tunnel %X\n", tunnel->tid);
+  if (tunnel->tid >= GNUNET_MESH_LOCAL_TUNNEL_ID_SERV)
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "    to origin\n");
+  else if (NULL != target)
     LOG (GNUNET_ERROR_TYPE_DEBUG, "    target %s\n", GNUNET_i2s (target));
   else
     LOG (GNUNET_ERROR_TYPE_DEBUG, "    target multicast\n");
@@ -1962,12 +2056,15 @@ GNUNET_MESH_notify_transmit_ready (struct GNUNET_MESH_Tunnel *tunnel, int cork,
   add_to_queue (tunnel->mesh, th);
   if (NULL != tunnel->mesh->th)
     return th;
+  if (GMC_is_pid_bigger(tunnel->next_send_pid, tunnel->max_send_pid))
+    return th;
   LOG (GNUNET_ERROR_TYPE_DEBUG, "    call notify tmt rdy\n");
   tunnel->mesh->th =
       GNUNET_CLIENT_notify_transmit_ready (tunnel->mesh->client, th->size,
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            GNUNET_YES, &send_callback,
                                            tunnel->mesh);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "MESH NOTIFY TRANSMIT READY END\n");
   return th;
 }
 
@@ -1982,12 +2079,13 @@ GNUNET_MESH_notify_transmit_ready_cancel (struct GNUNET_MESH_TransmitHandle *th)
 {
   struct GNUNET_MESH_Handle *mesh;
 
+  th->tunnel->packet_size = 0;
   mesh = th->tunnel->mesh;
   if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK)
     GNUNET_SCHEDULER_cancel (th->timeout_task);
   GNUNET_CONTAINER_DLL_remove (mesh->th_head, mesh->th_tail, th);
   GNUNET_free (th);
-  if ((NULL == mesh->th_head) && (NULL != mesh->th))
+  if ((0 == message_ready_size (mesh)) && (NULL != mesh->th))
   {
     /* queue empty, no point in asking for transmission */
     GNUNET_CLIENT_notify_transmit_ready_cancel (mesh->th);