- use strings
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh.c
index 9f4fdbcf07b76cd24bcf2587dfcd13a5725afbf8..4cd0aff8e1318c0488fd32f89f5bb5b822ecf028 100644 (file)
@@ -60,6 +60,9 @@
 #define MESH_DEBUG_DHT          GNUNET_YES
 #define MESH_DEBUG_CONNECTION   GNUNET_NO
 
+#define INITIAL_WINDOW_SIZE     2
+#define ACK_THRESHOLD           INITIAL_WINDOW_SIZE / 2
+
 #if MESH_DEBUG_CONNECTION
 #define DEBUG_CONN(...) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
 #else
@@ -323,11 +326,6 @@ struct MeshTunnel
      */
   uint32_t last_fwd_ack;
 
-    /**
-     * Last ACK sent towards the next hop (for traffic towards root).
-     */
-  uint32_t last_bck_ack;
-
   /**
    * BCK ACK value received from the hop towards the owner of the tunnel,
    * (previous node / owner): up to what message PID can we sent back to him.
@@ -500,9 +498,14 @@ struct MeshTunnelChildInfo
 struct MeshTunnelClientInfo
 {
   /**
-   * Last sent PID.
+   * PID of the last packet sent to the client (FWD).
    */
-  uint32_t pid;
+  uint32_t fwd_pid;
+
+  /**
+   * PID of the last packet received from the client (BCK).
+   */
+  uint32_t bck_pid;
 
   /**
    * Maximum PID allowed (FWD ACK received).
@@ -528,6 +531,11 @@ struct MeshTunnelChildIteratorContext
      */
   struct MeshTunnel *t;
 
+    /**
+     * Is this context initialized? Is the value in max_child_ack valid?
+     */
+  int init;
+
     /**
      * Maximum child ACK so far.
      */
@@ -1696,55 +1704,6 @@ announce_id (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 /******************      GENERAL HELPER FUNCTIONS      ************************/
 /******************************************************************************/
 
-/**
- * Check if one pid is bigger than other, accounting for overflow.
- *
- * @param bigger Argument that should be bigger.
- * @param smaller Argument that should be smaller.
- *
- * @return True if bigger (arg1) has a higher value than smaller (arg 2).
- */
-static int
-is_pid_bigger (uint32_t bigger, uint32_t smaller)
-{
-    return (GNUNET_YES == PID_OVERFLOW(smaller, bigger) ||
-            (bigger > smaller && GNUNET_NO == PID_OVERFLOW(bigger, smaller)));
-}
-
-/**
- * Get the higher ACK value out of two values, taking in account overflow.
- *
- * @param a First ACK value.
- * @param b Second ACK value.
- *
- * @return Highest ACK value from the two.
- */
-static uint32_t
-max_pid (uint32_t a, uint32_t b)
-{
-  if (is_pid_bigger(a, b))
-    return a;
-  return b;
-}
-
-
-/**
- * Get the lower ACK value out of two values, taking in account overflow.
- *
- * @param a First ACK value.
- * @param b Second ACK value.
- *
- * @return Lowest ACK value from the two.
- */
-static uint32_t
-min_pid (uint32_t a, uint32_t b)
-{
-  if (is_pid_bigger(a, b))
-    return b;
-  return a;
-}
-
-
 /**
  * Decrements the reference counter and frees all resources if needed
  *
@@ -1973,7 +1932,8 @@ send_subscribed_clients (const struct GNUNET_MessageHeader *msg,
 
   type = ntohs (payload->type);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending to clients...\n");
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "message of type %u\n", type);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "message of type %s\n",
+              GNUNET_MESH_DEBUG_M2S (type));
 
   memcpy (cbuf, msg, sizeof (cbuf));
   switch (htons (msg->type))
@@ -2318,6 +2278,7 @@ send_message (const struct GNUNET_MessageHeader *message,
   struct MeshPeerInfo *neighbor;
   struct MeshPeerPath *p;
   size_t size;
+  uint16_t type;
 
 //   GNUNET_TRANSPORT_try_connect(); FIXME use?
 
@@ -2326,7 +2287,8 @@ send_message (const struct GNUNET_MessageHeader *message,
   info->mesh_data = GNUNET_malloc (sizeof (struct MeshData));
   info->mesh_data->data = GNUNET_malloc (size);
   memcpy (info->mesh_data->data, message, size);
-  if (ntohs(message->type) == GNUNET_MESSAGE_TYPE_MESH_UNICAST)
+  type = ntohs(message->type);
+  if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type)
   {
     struct GNUNET_MESH_Unicast *m;
 
@@ -2346,6 +2308,30 @@ send_message (const struct GNUNET_MessageHeader *message,
   }
   if (NULL == p)
   {
+#if MESH_DEBUG
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "  %s IS NOT DIRECTLY CONNECTED\n",
+                GNUNET_i2s(peer));
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "  PATHS TO %s:\n",
+                GNUNET_i2s(peer));
+    for (p = neighbor->path_head; NULL != p; p = p->next)
+    {
+      struct GNUNET_PeerIdentity debug_id;
+      unsigned int i;
+
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "    path with %u hops through:\n",
+                  p->length);
+      for (i = 0; i < p->length; i++)
+      {
+        GNUNET_PEER_resolve(p->peers[i], &debug_id);
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "      hop %u: %s\n",
+                    i, GNUNET_i2s(&debug_id));
+      }
+    }
+#endif
     GNUNET_break (0); // FIXME sometimes fails (testing disconnect?)
     GNUNET_free (info->mesh_data->data);
     GNUNET_free (info->mesh_data);
@@ -2353,8 +2339,19 @@ send_message (const struct GNUNET_MessageHeader *message,
     return;
   }
   info->peer = neighbor;
+  if (GNUNET_MESSAGE_TYPE_MESH_PATH_ACK == type)
+  {
+    /*
+     * TODO: in this case we only need the service to retransmit
+     * the message down the path. If we pass the real type to queue_add,
+     * queue_send will try to build the message from scratch. This can
+     * probably be done by some other way instead of deleteing the type
+     * info.
+     */
+    type = 0;
+  }
   queue_add (info,
-             0,
+             type,
              size,
              neighbor,
              t);
@@ -3198,6 +3195,26 @@ tunnel_add_path (struct MeshTunnel *t, struct MeshPeerPath *p,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tunnel_add_path END\n");
 }
 
+/**
+ * Add a client to a tunnel, initializing all needed data structures.
+ * 
+ * @param t Tunnel to which add the client.
+ * @param c Client which to add to the tunnel.
+ */
+static void
+tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c)
+{
+  struct MeshTunnelClientInfo clinfo;
+
+  GNUNET_array_append (t->clients, t->nclients, c);
+  clinfo.fwd_ack = t->fwd_pid + 1;
+  clinfo.bck_ack = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE;
+  clinfo.fwd_pid = t->fwd_pid;
+  clinfo.bck_pid = (uint32_t) -1; // Expected next: 0
+  t->nclients--;
+  GNUNET_array_append (t->clients_fc, t->nclients, clinfo);
+}
+
 
 /**
  * Notifies a tunnel that a connection has broken that affects at least
@@ -3397,10 +3414,15 @@ tunnel_get_neighbor_fc (const struct MeshTunnel *t,
                                              &peer->hashPubKey);
   if (NULL == cinfo)
   {
+    uint32_t delta;
+
     cinfo = GNUNET_malloc (sizeof (struct MeshTunnelChildInfo));
     cinfo->id = GNUNET_PEER_intern (peer);
     cinfo->skip = t->fwd_pid;
-    cinfo->fwd_ack = t->fwd_pid + t->fwd_queue_max - t->fwd_queue_n; // FIXME review
+
+    delta = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE;
+    cinfo->fwd_ack = t->fwd_pid + delta;
+    cinfo->bck_ack = delta;
 
     GNUNET_assert (GNUNET_OK ==
       GNUNET_CONTAINER_multihashmap_put (t->children_fc,
@@ -3412,6 +3434,31 @@ tunnel_get_neighbor_fc (const struct MeshTunnel *t,
 }
 
 
+/**
+ * Get the Flow Control info of a client.
+ * 
+ * @param t Tunnel on which to look.
+ * @param c Client whose ACK to get.
+ * 
+ * @return ACK value.
+ */
+static struct MeshTunnelClientInfo *
+tunnel_get_client_fc (struct MeshTunnel *t,
+                      struct MeshClient *c)
+{
+  unsigned int i;
+
+  for (i = 0; i < t->nclients; i++)
+  {
+    if (t->clients[i] != c)
+      continue;
+    return &t->clients_fc[i];
+  }
+  GNUNET_assert (0);
+  return NULL; // avoid compiler / coverity complaints
+}
+
+
 /**
  * Iterator to get the appropiate ACK value from all children nodes.
  *
@@ -3419,8 +3466,8 @@ tunnel_get_neighbor_fc (const struct MeshTunnel *t,
  * @param id Id of the child node.
  */
 static void
-tunnel_get_child_ack (void *cls,
-                      GNUNET_PEER_Id id)
+tunnel_get_child_fwd_ack (void *cls,
+                          GNUNET_PEER_Id id)
 {
   struct GNUNET_PeerIdentity peer_id;
   struct MeshTunnelChildInfo *cinfo;
@@ -3432,8 +3479,11 @@ tunnel_get_child_ack (void *cls,
   cinfo = tunnel_get_neighbor_fc (t, &peer_id);
   ack = cinfo->fwd_ack;
 
-  if (0 == ctx->max_child_ack)
+  if (GNUNET_NO == ctx->init)
+  {
     ctx->max_child_ack = ack;
+    ctx->init = GNUNET_YES;
+  }
 
   if (GNUNET_YES == t->speed_min)
   {
@@ -3457,43 +3507,26 @@ tunnel_get_child_ack (void *cls,
  * @return Maximum PID allowed (uint32 MAX), -1 if node has no children.
  */
 static int64_t
-tunnel_get_children_ack (struct MeshTunnel *t)
+tunnel_get_children_fwd_ack (struct MeshTunnel *t)
 {
   struct MeshTunnelChildIteratorContext ctx;
   ctx.t = t;
   ctx.max_child_ack = 0;
   ctx.nchildren = 0;
-  tree_iterate_children (t->tree, tunnel_get_child_ack, &ctx);
+  tree_iterate_children (t->tree, tunnel_get_child_fwd_ack, &ctx);
 
   if (0 == ctx.nchildren)
     return -1LL;
 
-  return (int64_t) ctx.max_child_ack;
-}
-
-
-/**
- * Add a client to a tunnel, initializing all needed data structures.
- * 
- * @param t Tunnel to which add the client.
- * @param c Client which to add to the tunnel.
- */
-static void
-tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c)
-{
-  struct MeshTunnelClientInfo clinfo;
+  if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ctx.max_child_ack, t->fwd_pid))
+    ctx.max_child_ack = t->fwd_pid + 1; // Might overflow, it's ok.
 
-  GNUNET_array_append (t->clients, t->nclients, c);
-  t->nclients--;
-  clinfo.fwd_ack = t->fwd_pid + 1;
-  clinfo.bck_ack = t->bck_ack + 1; // FIXME fc review
-  clinfo.pid = t->fwd_pid;
-  GNUNET_array_append (t->clients_fc, t->nclients, clinfo);
+  return (int64_t) ctx.max_child_ack;
 }
 
 
 /**
- * Set the ACK value of a client in a particular tunnel.
+ * Set the FWD ACK value of a client in a particular tunnel.
  * 
  * @param t Tunnel affected.
  * @param c Client whose ACK to set.
@@ -3517,31 +3550,6 @@ tunnel_set_client_fwd_ack (struct MeshTunnel *t,
 }
 
 
-/**
- * Get the ACK value of a client in a particular tunnel.
- * 
- * @param t Tunnel on which to look.
- * @param c Client whose ACK to get.
- * 
- * @return ACK value.
- */
-uint32_t // FIXME static when used!!
-tunnel_get_client_ack (struct MeshTunnel *t,
-                       struct MeshClient *c)
-{
-  unsigned int i;
-
-  for (i = 0; i < t->nclients; i++)
-  {
-    if (t->clients[i] != c)
-      continue;
-    return t->clients_fc[i].fwd_ack;
-  }
-  GNUNET_break (0);
-  return UINT32_MAX;
-}
-
-
 /**
  * Get the highest ACK value of all clients in a particular tunnel,
  * according to the buffering/speed settings.
@@ -3552,7 +3560,7 @@ tunnel_get_client_ack (struct MeshTunnel *t,
  *         If no clients are suscribed, -1.
  */
 static int64_t
-tunnel_get_clients_ack (struct MeshTunnel *t)
+tunnel_get_clients_fwd_ack (struct MeshTunnel *t)
 {
   unsigned int i;
   int64_t ack;
@@ -3564,15 +3572,15 @@ tunnel_get_clients_ack (struct MeshTunnel *t)
   {
     if (-1 == ack ||
         (GNUNET_YES == t->speed_min &&
-         GNUNET_YES == is_pid_bigger (ack, t->clients_fc[i].fwd_ack)) ||
+         GNUNET_YES == GMC_is_pid_bigger (ack, t->clients_fc[i].fwd_ack)) ||
         (GNUNET_NO == t->speed_min &&
-         GNUNET_YES == is_pid_bigger (t->clients_fc[i].fwd_ack, ack)))
+         GNUNET_YES == GMC_is_pid_bigger (t->clients_fc[i].fwd_ack, ack)))
     {
       ack = t->clients_fc[i].fwd_ack;
     }
   }
 
-  if (GNUNET_YES == t->nobuffer && is_pid_bigger(ack, t->fwd_pid))
+  if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ack, t->fwd_pid))
     ack = (uint32_t) t->fwd_pid + 1; // Might overflow, it's ok.
 
   return (uint32_t) ack;
@@ -3599,8 +3607,8 @@ tunnel_get_fwd_ack (struct MeshTunnel *t)
   count = t->fwd_pid - t->skip;
   buffer_free = t->fwd_queue_max - t->fwd_queue_n;
   ack = count + buffer_free; // Might overflow 32 bits, it's ok!
-  child_ack = tunnel_get_children_ack (t);
-  client_ack = tunnel_get_clients_ack (t);
+  child_ack = tunnel_get_children_fwd_ack (t);
+  client_ack = tunnel_get_clients_fwd_ack (t);
   if (-1 == child_ack)
   {
     // Node has no children, child_ack AND core buffer are irrelevant.
@@ -3610,15 +3618,15 @@ tunnel_get_fwd_ack (struct MeshTunnel *t)
 
   if (GNUNET_YES == t->speed_min)
   {
-    ack = min_pid ((uint32_t) child_ack, ack);
-    ack = min_pid ((uint32_t) client_ack, ack);
+    ack = GMC_min_pid ((uint32_t) child_ack, ack);
+    ack = GMC_min_pid ((uint32_t) client_ack, ack);
   }
   else
   {
-    ack = max_pid ((uint32_t) child_ack, ack);
-    ack = max_pid ((uint32_t) client_ack, ack);
+    ack = GMC_max_pid ((uint32_t) child_ack, ack);
+    ack = GMC_max_pid ((uint32_t) client_ack, ack);
   }
-  if (GNUNET_YES == t->nobuffer && is_pid_bigger(ack, t->fwd_pid))
+  if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ack, t->fwd_pid))
     ack = t->fwd_pid + 1; // Might overflow 32 bits, it's ok!
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "c %u, bf %u, ch %u, cl %u, ACK: %u\n",
               count, buffer_free, child_ack, client_ack, ack);
@@ -3655,6 +3663,38 @@ tunnel_get_bck_ack (struct MeshTunnel *t)
   return ack;
 }
 
+static void
+send_local_ack (struct MeshClient *c, struct MeshTunnel *t, uint32_t ack)
+{
+  struct GNUNET_MESH_LocalAck msg;
+
+  msg.header.size = htons (sizeof (msg));
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
+  msg.tunnel_id = htonl (t->local_tid_dest);
+  msg.max_pid = htonl (ack); 
+  GNUNET_SERVER_notification_context_unicast(nc,
+                                              c->handle,
+                                              &msg.header,
+                                              GNUNET_NO);
+}
+
+/**
+ * Build an ACK message and send it to the given peer.
+ */
+static void
+send_ack (struct MeshTunnel *t, struct GNUNET_PeerIdentity *peer,  uint32_t ack)
+{
+  struct GNUNET_MESH_ACK msg;
+
+  GNUNET_PEER_resolve (t->id.oid, &msg.oid);
+  msg.header.size = htons (sizeof (msg));
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
+  msg.pid = htonl (ack);
+  msg.tid = htonl (t->id.tid);
+
+  send_message (&msg.header, peer, t);
+}
+
 
 /**
  * Send an ACK informing the predecessor about the available buffer space.
@@ -3670,7 +3710,6 @@ tunnel_get_bck_ack (struct MeshTunnel *t)
 static void
 tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type)
 {
-  struct GNUNET_MESH_ACK msg;
   struct GNUNET_PeerIdentity id;
   uint32_t ack;
 
@@ -3715,21 +3754,76 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type)
   }
 
   t->last_fwd_ack = ack;
-  msg.pid = htonl (ack);
-
   GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id);
+  send_ack (t, &id, ack);
+}
 
-  msg.header.size = htons (sizeof (msg));
-  msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
-  msg.tid = htonl (t->id.tid);
-  GNUNET_PEER_resolve(t->id.oid, &msg.oid);
-  send_message (&msg.header, &id, t);
+
+/**
+ * Iterator to send a child node a BCK ACK to allow him to send more
+ * to_origin data.
+ *
+ * @param cls Closure (tunnel).
+ * @param id Id of the child node.
+ */
+static void
+tunnel_send_child_bck_ack (void *cls,
+                           GNUNET_PEER_Id id)
+{
+  struct MeshTunnel *t = cls;
+  struct MeshTunnelChildInfo *cinfo;
+  struct GNUNET_PeerIdentity peer;
+
+  GNUNET_PEER_resolve (id, &peer);
+  cinfo = tunnel_get_neighbor_fc (t, &peer);
+
+  if (cinfo->bck_ack != cinfo->pid &&
+      GNUNET_NO == GMC_is_pid_bigger (cinfo->bck_ack, cinfo->pid))
+    return;
+
+  cinfo->bck_ack++;
+  send_ack (t, &peer, cinfo->bck_ack);
 }
 
 
 /**
- * Send an ACK informing the children nodes about the available buffer space.
- * In case there is no child node, inform the destination clients.
+ * @brief Send BCK ACKs to clients to allow them more to_origin traffic
+ * 
+ * Iterates over all clients and sends BCK ACKs to the ones that need it.
+ * 
+ * @param t Tunnel on which to send the BCK ACKs.
+ */
+static void
+tunnel_send_clients_bck_ack (struct MeshTunnel *t)
+{
+  unsigned int i;
+
+  /* Find client whom to allow to send to origin (with lowest buffer space) */
+  for (i = 0; i < t->nclients; i++)
+  {
+    struct MeshTunnelClientInfo *clinfo;
+    unsigned int delta;
+
+    clinfo = &t->clients_fc[i];
+    delta = clinfo->bck_ack - clinfo->bck_pid;
+
+    if ((GNUNET_NO == t->nobuffer && ACK_THRESHOLD > delta) ||
+        (GNUNET_YES == t->nobuffer && 0 == delta))
+    {
+      uint32_t ack;
+
+      ack = clinfo->bck_pid;
+      ack += t->nobuffer ? 1 : INITIAL_WINDOW_SIZE;
+      send_local_ack(t->clients[i],  t, ack);
+      clinfo->bck_ack = ack;
+    }
+  }
+}
+
+
+/**
+ * Send an ACK informing the children nodes and destination clients about
+ * the available buffer space.
  * If buffering is off, send only on behalf of root (can be self).
  * If buffering is on, send when sent to predecessor and buffer space is free.
  * Note that although the name is bck_ack, the BCK mean backwards *traffic*,
@@ -3741,13 +3835,6 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type)
 static void
 tunnel_send_bck_ack (struct MeshTunnel *t, uint16_t type)
 {
-  struct GNUNET_MESH_ACK msg;
-  struct GNUNET_PeerIdentity id;
-  uint32_t ack;
-  unsigned int i;
-  unsigned int min_d;
-  unsigned int min_i;
-
   if (NULL != t->owner)
   {
     send_client_tunnel_ack (t->owner, t);
@@ -3777,51 +3864,8 @@ tunnel_send_bck_ack (struct MeshTunnel *t, uint16_t type)
       GNUNET_break (0);
   }
 
-  /* Ok, ACK might be necessary, what PID to ACK? */
-  ack = tunnel_get_bck_ack (t);
-
-  /* If speed_min and not all children have ack'd, dont send yet */
-  if (ack == t->last_bck_ack)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending BCK ACK, not ready\n");
-    return;
-  }
-
-  /* Unlock local clients. */
-  if (0 < t->nclients)
-  {
-    struct GNUNET_MESH_LocalAck msg;
-
-    /* Find client who to allow to send to origin (with lowest buffer space) */
-    /* FIXME fc Round robin? Priority? FIFO? */
-    for (i = 0; i < t->nclients; i++)
-    {
-      unsigned int d;
-
-      d = t->clients_fc[i].bck_ack - t->clients_fc[i].pid;
-      if (0 == i || d < min_d)
-      {
-        min_d = d;
-        min_i = i;
-      }
-    }
-    msg.header.size = htons (sizeof (msg));
-    msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
-    msg.tunnel_id = htonl (t->local_tid_dest);
-    msg.max_pid = t->bck_pid + 1; // FIXME fc
-    GNUNET_SERVER_notification_context_unicast(nc,
-                                               t->clients[min_i]->handle,
-                                               &msg.header,
-                                               GNUNET_NO);
-  }
-
-  t->last_bck_ack = ack;
-  msg.pid = htonl (ack);
-  msg.header.size = htons (sizeof (msg));
-  msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
-  msg.tid = htonl (t->id.tid);
-  GNUNET_PEER_resolve(t->id.oid, &msg.oid);
-  send_message (&msg.header, &id, t);
+  tunnel_send_clients_bck_ack (t);
+  tree_iterate_children (t->tree, &tunnel_send_child_bck_ack, NULL);
 }
 
 
@@ -3998,7 +4042,7 @@ tunnel_new (GNUNET_PEER_Id owner,
 {
   struct MeshTunnel *t;
   struct GNUNET_HashCode hash;
-  
+
   if (n_tunnels >= max_tunnels && NULL == client)
     return NULL;
 
@@ -4009,9 +4053,10 @@ tunnel_new (GNUNET_PEER_Id owner,
   t->bck_queue_max = t->fwd_queue_max;
   t->tree = tree_new (owner);
   t->owner = client;
-  t->bck_ack = 1;
-  t->last_bck_ack = 1;
-  t->last_fwd_ack = 1;
+  t->fwd_pid = (uint32_t) -1; // Next (expected) = 0
+  t->bck_pid = (uint32_t) -1; // Next (expected) = 0
+  t->bck_ack = INITIAL_WINDOW_SIZE - 1;
+  t->last_fwd_ack = INITIAL_WINDOW_SIZE - 1;
   t->local_tid = local;
   t->children_fc = GNUNET_CONTAINER_multihashmap_create (8);
   n_tunnels++;
@@ -4234,14 +4279,15 @@ send_core_data_multicast (void *cls, size_t size, void *buf)
       mc = (struct GNUNET_MESH_Multicast *) mh;
       mh = (struct GNUNET_MessageHeader *) &mc[1];
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  " multicast, payload type %u\n", ntohs (mh->type));
+                  " multicast, payload type %s\n",
+                  GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   " multicast, payload size %u\n", ntohs (mh->size));
     }
     else
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type %u\n",
-                  ntohs (mh->type));
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type %s\n",
+                  GNUNET_MESH_DEBUG_M2S (ntohs (mh->type)));
     }
   }
 #endif
@@ -4380,44 +4426,63 @@ queue_send (void *cls, size_t size, void *buf)
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   size ok\n");
 
     t = queue->tunnel;
-    t->fwd_queue_n--;
+    if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == queue->type)
+    {
+      t->fwd_queue_n--;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   unicast: %u\n");
+    }
+    else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == queue->type)
+    {
+      t->bck_queue_n--;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   to origin\n");
+    }
 
     /* Fill buf */
     switch (queue->type)
     {
-        case 0: // RAW data (preconstructed message, retransmission, etc.)
-            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   raw\n");
-            data_size = send_core_data_raw (queue->cls, size, buf);
-            msg = (struct GNUNET_MessageHeader *) buf;
-            switch (ntohs (msg->type)) // Type of preconstructed message
-            {
-              case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
-                tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
-                break;
-              case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
-                tunnel_send_bck_ack(t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
-                break;
-              default:
-                  break;
-            }
-            break;
-        case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
-            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   multicast\n");
-            data_size = send_core_data_multicast(queue->cls, size, buf);
-            tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
-            break;
-        case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE:
-            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   path create\n");
-            data_size = send_core_path_create(queue->cls, size, buf);
+      case 0:
+      case GNUNET_MESSAGE_TYPE_MESH_ACK:
+      case GNUNET_MESSAGE_TYPE_MESH_PATH_BROKEN:
+      case GNUNET_MESSAGE_TYPE_MESH_PATH_DESTROY:
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "*********   raw: %u\n",
+                    queue->type);
+        /* Fall through */
+      case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
+      case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+        data_size = send_core_data_raw (queue->cls, size, buf);
+        msg = (struct GNUNET_MessageHeader *) buf;
+        switch (ntohs (msg->type)) // Type of preconstructed message
+        {
+          case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
+            tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
             break;
-        case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK:
-            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   path ack\n");
-            data_size = send_core_path_ack(queue->cls, size, buf);
+          case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN:
+            tunnel_send_bck_ack(t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
             break;
-        default:
-            GNUNET_break (0);
-            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   type unknown\n");
-            data_size = 0;
+          default:
+              break;
+        }
+        break;
+      case GNUNET_MESSAGE_TYPE_MESH_MULTICAST:
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   multicast\n");
+        data_size = send_core_data_multicast(queue->cls, size, buf);
+        tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
+        break;
+      case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE:
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   path create\n");
+        data_size = send_core_path_create(queue->cls, size, buf);
+        break;
+      case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK:
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   path ack\n");
+        data_size = send_core_path_ack(queue->cls, size, buf);
+        break;
+      default:
+        GNUNET_break (0);
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "*********   type unknown: %u\n",
+                    queue->type);
+        data_size = 0;
     }
 
     /* Free queue, but cls was freed by send_core_* */
@@ -4468,26 +4533,29 @@ queue_add (void *cls, uint16_t type, size_t size,
   unsigned int *max;
   unsigned int *n;
 
+  n = NULL;
   if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type ||
       GNUNET_MESSAGE_TYPE_MESH_MULTICAST == type)
   {
     n = &t->fwd_queue_n;
     max = &t->fwd_queue_max;
   }
-  else
+  else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
   {
     n = &t->bck_queue_n;
     max = &t->bck_queue_max;
   }
-  if (*n >= *max)
-  {
-    if (NULL == t->owner)
-      GNUNET_break_op(0);       // TODO: kill connection?
-    else
-      GNUNET_break(0);
-    return;                       // Drop message
+  if (NULL != n) {
+    if (*n >= *max)
+    {
+      if (NULL == t->owner)
+        GNUNET_break_op(0);       // TODO: kill connection?
+      else
+        GNUNET_break(0);
+      return;                       // Drop message
+    }
+    (*n)++;
   }
-  (*n)++;
   queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
   queue->cls = cls;
   queue->type = type;
@@ -4926,8 +4994,8 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer,
     return GNUNET_OK;
   }
   msg = (struct GNUNET_MESH_Unicast *) message;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %u\n",
-              ntohs (msg[1].header.type));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %s\n",
+              GNUNET_MESH_DEBUG_M2S (ntohs (msg[1].header.type)));
   /* Check tunnel */
   t = tunnel_get (&msg->oid, ntohl (msg->tid));
   if (NULL == t)
@@ -4952,9 +5020,9 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer,
   }
   t->skip += (pid - t->fwd_pid) - 1;
   t->fwd_pid = pid;
-  if (is_pid_bigger (pid, t->last_fwd_ack))
+  if (GMC_is_pid_bigger (pid, t->last_fwd_ack))
   {
-    GNUNET_STATISTICS_update (stats, "# not allowed unicast", 1, GNUNET_NO);
+    GNUNET_STATISTICS_update (stats, "# unsolicited unicast", 1, GNUNET_NO);
     GNUNET_break_op (0);
     return GNUNET_OK;
   }
@@ -4966,7 +5034,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer,
                 "  it's for us! sending to clients...\n");
     GNUNET_STATISTICS_update (stats, "# unicast received", 1, GNUNET_NO);
     send_subscribed_clients (message, (struct GNUNET_MessageHeader *) &msg[1]);
-    // ACK is generated by client (api part), service only retransmits.
+    tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
     return GNUNET_OK;
   }
   ttl = ntohl (msg->ttl);
@@ -4975,11 +5043,11 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer,
   {
     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
+    tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
     return GNUNET_OK;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "  not for us, retransmitting...\n");
-  GNUNET_STATISTICS_update (stats, "# unicast forwarded", 1, GNUNET_NO);
 
   neighbor = tree_get_first_hop (t->tree, dest_id);
   cinfo = tunnel_get_neighbor_fc (t, neighbor);
@@ -4987,12 +5055,15 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer,
   GNUNET_CONTAINER_multihashmap_iterate (t->children_fc,
                                          &tunnel_add_skip,
                                          &neighbor);
-  if (is_pid_bigger(pid, cinfo->fwd_ack))
+  if (GMC_is_pid_bigger (pid, cinfo->fwd_ack))
   {
+    GNUNET_STATISTICS_update (stats, "# unsolicited unicast", 1, GNUNET_NO);
     GNUNET_break_op (0);
     return GNUNET_OK;
   }
   send_message (message, neighbor, t);
+  GNUNET_STATISTICS_update (stats, "# unicast forwarded", 1, GNUNET_NO);
+
   return GNUNET_OK;
 }
 
@@ -5047,6 +5118,7 @@ handle_mesh_data_multicast (void *cls, const struct GNUNET_PeerIdentity *peer,
     GNUNET_STATISTICS_update (stats, "# duplicate PID drops", 1, GNUNET_NO);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 " Already seen pid %u, DROPPING!\n", pid);
+    tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
     return GNUNET_OK;
   }
   else
@@ -5064,12 +5136,14 @@ handle_mesh_data_multicast (void *cls, const struct GNUNET_PeerIdentity *peer,
   {
     GNUNET_STATISTICS_update (stats, "# multicast received", 1, GNUNET_NO);
     send_subscribed_clients (message, &msg[1].header);
+    tunnel_send_fwd_ack(t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST);
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "   ttl: %u\n", ntohl (msg->ttl));
   if (ntohl (msg->ttl) == 0)
   {
     GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO);
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n");
+    tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
     return GNUNET_OK;
   }
   GNUNET_STATISTICS_update (stats, "# multicast forwarded", 1, GNUNET_NO);
@@ -5112,8 +5186,8 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer,
     return GNUNET_OK;
   }
   msg = (struct GNUNET_MESH_ToOrigin *) message;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %u\n",
-              ntohs (msg[1].header.type));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %s\n",
+              GNUNET_MESH_DEBUG_M2S (ntohs (msg[1].header.type)));
   t = tunnel_get (&msg->oid, ntohl (msg->tid));
 
   if (NULL == t)
@@ -5124,22 +5198,13 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer,
     return GNUNET_OK;
   }
 
-  if (t->id.oid == myid)
+  if (NULL != t->owner)
   {
     char cbuf[size];
     struct GNUNET_MESH_ToOrigin *copy;
 
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "  it's for us! sending to clients...\n");
-    if (NULL == t->owner)
-    {
-      /* got data packet for ownerless tunnel */
-      GNUNET_STATISTICS_update (stats, "# data on ownerless tunnel",
-                                1, GNUNET_NO);
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  no clients!\n");
-      GNUNET_break_op (0);
-      return GNUNET_OK;
-    }
     /* TODO signature verification */
     memcpy (cbuf, message, size);
     copy = (struct GNUNET_MESH_ToOrigin *) cbuf;
@@ -5147,6 +5212,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer,
     GNUNET_STATISTICS_update (stats, "# to origin received", 1, GNUNET_NO);
     GNUNET_SERVER_notification_context_unicast (nc, t->owner->handle,
                                                 &copy->header, GNUNET_NO);
+    tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
     return GNUNET_OK;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -5186,15 +5252,14 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
                  unsigned int atsi_count)
 {
   struct GNUNET_MESH_ACK *msg;
-  struct MeshTunnelChildInfo *cinfo;
   struct MeshTunnel *t;
   uint32_t ack;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got an ACK packet from %s\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK packet from %s!\n",
               GNUNET_i2s (peer));
   msg = (struct GNUNET_MESH_ACK *) message;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %u\n",
-              ntohs (msg[1].header.type));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %s\n",
+              GNUNET_MESH_DEBUG_M2S (ntohs (msg[1].header.type)));
   t = tunnel_get (&msg->oid, ntohl (msg->tid));
 
   if (NULL == t)
@@ -5205,9 +5270,24 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
     return GNUNET_OK;
   }
   ack = ntohl (msg->pid);
-  cinfo = tunnel_get_neighbor_fc (t, peer);
-  cinfo->fwd_ack = ack;
-  tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
+
+  /* Is this a forward or backward ACK? */
+  if (tree_get_predecessor(t->tree) == GNUNET_PEER_search(peer))
+  {
+    struct MeshTunnelChildInfo *cinfo;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  FWD ACK\n");
+    cinfo = tunnel_get_neighbor_fc (t, peer);
+    cinfo->fwd_ack = ack;
+    tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  BCK ACK\n");
+    t->bck_ack = ack;
+    tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK);
+  }
+  // FIXME fc Unlock queues?
   return GNUNET_OK;
 }
 
@@ -6732,20 +6812,30 @@ handle_local_unicast (void *cls, struct GNUNET_SERVER_Client *client,
     return;
   }
 
+  /* PID should be as expected */
+  if (ntohl (data_msg->pid) != t->fwd_pid + 1)
+  {
+    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Unicast PID, expected %u, got %u\n",
+              t->fwd_pid + 1, ntohl (data_msg->pid));
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
   /* Ok, everything is correct, send the message
    * (pretend we got it from a mesh peer)
    */
   {
+    /* Work around const limitation */
     char buf[ntohs (message->size)] GNUNET_ALIGN;
     struct GNUNET_MESH_Unicast *copy;
 
-    /* Work around const limitation */
     copy = (struct GNUNET_MESH_Unicast *) buf;
     memcpy (buf, data_msg, size);
     copy->oid = my_full_id;
     copy->tid = htonl (t->id.tid);
     copy->ttl = htonl (default_ttl);
-    copy->pid = htonl (t->fwd_pid + 1);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "  calling generic handler...\n");
     handle_mesh_data_unicast (NULL, &my_full_id, &copy->header, NULL, 0);
@@ -6768,7 +6858,7 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client,
                         const struct GNUNET_MessageHeader *message)
 {
   struct GNUNET_MESH_ToOrigin *data_msg;
-  struct GNUNET_PeerIdentity id;
+  struct MeshTunnelClientInfo *clinfo;
   struct MeshClient *c;
   struct MeshTunnel *t;
   MESH_TunnelNumber tid;
@@ -6811,13 +6901,24 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client,
   }
 
   /*  It should be sent by someone who has this as incoming tunnel. */
-  if (-1 == client_knows_tunnel (c, t))
+  if (GNUNET_NO == client_knows_tunnel (c, t))
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
+  /* PID should be as expected */
+  clinfo = tunnel_get_client_fc (t, c);
+  if (ntohl (data_msg->pid) != clinfo->bck_pid + 1)
   {
     GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "To Origin PID, expected %u, got %u\n",
+                clinfo->bck_pid + 1, ntohl (data_msg->pid));
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
-  GNUNET_PEER_resolve (t->id.oid, &id);
 
   /* Ok, everything is correct, send the message
    * (pretend we got it from a mesh peer)
@@ -6829,8 +6930,10 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client,
     /* Work around const limitation */
     copy = (struct GNUNET_MESH_ToOrigin *) buf;
     memcpy (buf, data_msg, size);
-    copy->oid = id;
+    GNUNET_PEER_resolve (t->id.oid, &copy->oid);
     copy->tid = htonl (t->id.tid);
+    copy->ttl = htonl (default_ttl);
+    GNUNET_assert (ntohl (copy->pid) == (t->bck_pid + 1));
     copy->sender = my_full_id;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "  calling generic handler...\n");
@@ -6895,6 +6998,17 @@ handle_local_multicast (void *cls, struct GNUNET_SERVER_Client *client,
     return;
   }
 
+  /* PID should be as expected */
+  if (ntohl (data_msg->pid) != t->fwd_pid + 1)
+  {
+    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Multicast PID, expected %u, got %u\n",
+              t->fwd_pid + 1, ntohl (data_msg->pid));
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
   {
     char buf[ntohs (message->size)] GNUNET_ALIGN;
     struct GNUNET_MESH_Multicast *copy;
@@ -6904,7 +7018,7 @@ handle_local_multicast (void *cls, struct GNUNET_SERVER_Client *client,
     copy->oid = my_full_id;
     copy->tid = htonl (t->id.tid);
     copy->ttl = htonl (default_ttl);
-    copy->pid = htonl (t->fwd_pid + 1);
+    GNUNET_assert (ntohl (copy->pid) == (t->fwd_pid + 1));
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "  calling generic handler...\n");
     handle_mesh_data_multicast (client, &my_full_id, &copy->header, NULL, 0);