- add framework for mesh2 mutipeer tests
[oweals/gnunet.git] / src / mesh / gnunet-service-mesh-new.c
index 7a22c331d3315896f18db44fee5395f30b15e575..3e64003f9a2453510550480423ea0bbb707a9e05 100644 (file)
@@ -937,6 +937,7 @@ send_client_tunnel_create (struct MeshTunnel *t)
   msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE);
   msg.tunnel_id = htonl (t->local_tid_dest);
   msg.port = htonl (t->port);
+  GNUNET_PEER_resolve (t->id.oid, &msg.peer);
   GNUNET_SERVER_notification_context_unicast (nc, t->client->handle,
                                               &msg.header, GNUNET_NO);
 }
@@ -1500,6 +1501,30 @@ peer_info_add_path_to_origin (struct MeshPeerInfo *peer_info,
 }
 
 
+
+/**
+ * Remove a tunnel from the list of tunnels a peer participates in.
+ * 
+ * @param p Peer to clean.
+ * @param t Tunnel to remove.
+ */
+static void
+peer_info_remove_tunnel (struct MeshPeerInfo *p, struct MeshTunnel *t)
+{
+  unsigned int i;
+
+  for (i = 0; i < p->ntunnels; i++)
+  {
+    if (p->tunnels[i] == t)
+    {
+      p->tunnels[i] = p->tunnels[p->ntunnels - 1];
+      GNUNET_array_grow (p->tunnels, p->ntunnels, p->ntunnels - 1);
+      return;
+    }
+  }
+}
+
+
 /**
  * Function called if the connection to the peer has been stalled for a while,
  * possibly due to a missed ACK. Poll the peer about its ACK status.
@@ -1752,22 +1777,32 @@ tunnel_get (const struct GNUNET_PeerIdentity *oid, MESH_TunnelNumber tid)
 /**
  * Add a client to a tunnel, initializing all needed data structures.
  * 
- * FIXME: make static after implementing port numbers
- * 
  * @param t Tunnel to which add the client.
  * @param c Client which to add to the tunnel.
  */
-void
+static void
 tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c)
 {
+  struct GNUNET_HashCode hash;
+
   if (NULL != t->client)
   {
     GNUNET_break(0);
     return;
   }
-  if (0 != t->next_hop)
+  GMC_hash32 (t->local_tid_dest, &hash);
+  if (GNUNET_OK !=
+      GNUNET_CONTAINER_multihashmap_put (c->incoming_tunnels, &hash, t,
+                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST))
   {
-    GNUNET_break(0);
+    GNUNET_break (0);
+    return;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CONTAINER_multihashmap_put (incoming_tunnels, &hash, t,
+                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST))
+  {
+    GNUNET_break (0);
     return;
   }
   t->client = c;
@@ -1930,6 +1965,7 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type)
     case GNUNET_MESSAGE_TYPE_MESH_ACK:
     case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK:
       break;
+    case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK:
     case GNUNET_MESSAGE_TYPE_MESH_POLL:
       t->force_ack = GNUNET_YES;
       break;
@@ -2037,6 +2073,72 @@ tunnel_send_bck_ack (struct MeshTunnel *t, uint16_t type)
 }
 
 
+/**
+ * Modify the unicast message TID from global to local and send to client.
+ * 
+ * @param t Tunnel on which to send the message.
+ * @param msg Message to modify and send.
+ */
+static void
+tunnel_send_client_ucast (struct MeshTunnel *t,
+                          const struct GNUNET_MESH_Unicast *msg)
+{
+  struct GNUNET_MESH_Unicast *copy;
+  uint16_t size = ntohs (msg->header.size);
+  char cbuf[size];
+
+  if (size < sizeof (struct GNUNET_MESH_Unicast) +
+             sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  if (NULL == t->client)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  copy = (struct GNUNET_MESH_Unicast *) cbuf;
+  memcpy (copy, msg, size);
+  copy->tid = htonl (t->local_tid_dest);
+  GNUNET_SERVER_notification_context_unicast (nc, t->client->handle,
+                                              &copy->header, GNUNET_NO);
+}
+
+
+/**
+ * Modify the to_origin  message TID from global to local and send to client.
+ * 
+ * @param t Tunnel on which to send the message.
+ * @param msg Message to modify and send.
+ */
+static void
+tunnel_send_client_to_orig (struct MeshTunnel *t,
+                            const struct GNUNET_MESH_ToOrigin *msg)
+{
+  struct GNUNET_MESH_ToOrigin *copy;
+  uint16_t size = ntohs (msg->header.size);
+  char cbuf[size];
+
+  if (size < sizeof (struct GNUNET_MESH_ToOrigin) +
+             sizeof (struct GNUNET_MessageHeader))
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+  if (NULL == t->owner)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  copy = (struct GNUNET_MESH_ToOrigin *) cbuf;
+  memcpy (cbuf, msg, size);
+  copy->tid = htonl (t->local_tid);
+  GNUNET_SERVER_notification_context_unicast (nc, t->owner->handle,
+                                              &copy->header, GNUNET_NO);
+}
+
+
 /**
  * @brief Re-initiate traffic to this peer if necessary.
  *
@@ -2103,7 +2205,7 @@ tunnel_send_destroy (struct MeshTunnel *t)
               "  sending tunnel destroy for tunnel: %s [%X]\n",
               GNUNET_i2s (&msg.oid), t->id.tid);
 
-  if (NULL == t->client)
+  if (NULL == t->client && 0 != t->next_hop)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  child: %u\n", t->next_hop);
     GNUNET_PEER_resolve (t->next_hop, &id);
@@ -2112,7 +2214,7 @@ tunnel_send_destroy (struct MeshTunnel *t)
                 GNUNET_i2s (&id));
     send_prebuilt_message (&msg.header, t->next_hop, t);
   }
-  if (NULL == t->owner)
+  if (NULL == t->owner && 0 != t->prev_hop)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  parent: %u\n", t->prev_hop);
     GNUNET_PEER_resolve (t->prev_hop, &id);
@@ -2137,6 +2239,8 @@ peer_cancel_queues (GNUNET_PEER_Id neighbor, struct MeshTunnel *t)
   struct MeshPeerQueue *pq;
   struct MeshPeerQueue *next;
 
+  if (0 == neighbor)
+    return; /* Was local peer, 0'ed in tunnel_destroy_iterator */
   peer_info = peer_get_short (neighbor);
   for (pq = peer_info->queue_head; NULL != pq; pq = next)
   {
@@ -2211,6 +2315,7 @@ tunnel_destroy (struct MeshTunnel *t)
 
   if (NULL != t->client)
   {
+    c = t->client;
     GMC_hash32 (t->local_tid_dest, &hash);
     if (GNUNET_YES !=
           GNUNET_CONTAINER_multihashmap_remove (c->incoming_tunnels, &hash, t))
@@ -2218,12 +2323,12 @@ tunnel_destroy (struct MeshTunnel *t)
       GNUNET_break (0);
       r = GNUNET_SYSERR;
     }
-  }
-  if (GNUNET_YES != 
+    if (GNUNET_YES != 
       GNUNET_CONTAINER_multihashmap_remove (incoming_tunnels, &hash, t))
-  {
-    GNUNET_break (0);
-    r = GNUNET_SYSERR;
+    {
+      GNUNET_break (0);
+      r = GNUNET_SYSERR;
+    }
   }
 
   peer_cancel_queues (t->next_hop, t);
@@ -2236,6 +2341,7 @@ tunnel_destroy (struct MeshTunnel *t)
 
   n_tunnels--;
   GNUNET_STATISTICS_update (stats, "# tunnels", -1, GNUNET_NO);
+  path_destroy (t->path);
   GNUNET_free (t);
   return r;
 }
@@ -2259,7 +2365,8 @@ tunnel_destroy_empty (struct MeshTunnel *t)
   }
   #endif
 
-  tunnel_send_destroy (t);
+  if (GNUNET_NO == t->destroy)
+    tunnel_send_destroy (t);
   if (0 == t->pending_messages)
     tunnel_destroy (t);
   else
@@ -2370,17 +2477,32 @@ tunnel_destroy_iterator (void *cls,
   struct MeshTunnel *t = value;
   struct MeshClient *c = cls;
 
-  send_client_tunnel_destroy (t);
-  if (c != t->owner)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              " Tunnel %X / %X destroy, due to client %u shutdown.\n",
+              t->local_tid, t->local_tid_dest, c->id);
+  if (GNUNET_NO == c->shutting_down)
+    send_client_tunnel_destroy (t);
+  client_delete_tunnel (c, t);
+  if (c == t->client)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %u is destination.\n", c->id);
-    client_delete_tunnel (c, t);
-    tunnel_destroy_empty (t);
-    return GNUNET_OK;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Client %u is destination.\n", c->id);
+    t->client = NULL;
+    t->next_hop = 0;
   }
-  tunnel_send_destroy (t);
-  t->owner = NULL;
-  t->destroy = GNUNET_YES;
+  else if (c == t->owner)
+  {
+    struct MeshPeerInfo *p;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Client %u is owner.\n", c->id);
+    t->owner = NULL;
+    t->prev_hop = 0;
+    p = peer_get_short(t->dest);
+    peer_info_remove_tunnel (p, t);
+  }
+  else
+  {
+    GNUNET_break (0);
+  }
+  tunnel_destroy_empty (t);
 
   return GNUNET_OK;
 }
@@ -2506,7 +2628,7 @@ send_core_path_ack (void *cls, size_t size, void *buf)
     GNUNET_break (0);
     return 0;
   }
-  t->prev_fc.last_ack_sent = t->nobuffer ? 0 : INITIAL_WINDOW_SIZE - 1;
+  t->prev_fc.last_ack_sent = t->nobuffer ? 0 : t->queue_max - 1;
   msg->header.size = htons (sizeof (struct GNUNET_MESH_PathACK));
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_ACK);
   GNUNET_PEER_resolve (t->id.oid, &msg->oid);
@@ -2604,12 +2726,12 @@ queue_get_next (const struct MeshPeerInfo *peer)
   uint32_t pid;
   uint32_t ack;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   selecting message\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   selecting message\n");
   for (q = peer->queue_head; NULL != q; q = q->next)
   {
     t = q->tunnel;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "*********     %s\n",
+                "*     %s\n",
                 GNUNET_MESH_DEBUG_M2S(q->type));
     switch (q->type)
     {
@@ -2625,26 +2747,26 @@ queue_get_next (const struct MeshPeerInfo *peer)
         break;
       default:
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "*********   OK!\n");
+                    "*   OK!\n");
         return q;
     }
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "*********     ACK: %u, PID: %u\n",
+                "*     ACK: %u, PID: %u\n",
                 ack, pid);
     if (GNUNET_NO == GMC_is_pid_bigger (pid, ack))
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "*********   OK!\n");
+                  "*   OK!\n");
       return q;
     }
     else
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "*********     NEXT!\n");
+                  "*     NEXT!\n");
     }
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "*********   nothing found\n");
+                "*   nothing found\n");
   return NULL;
 }
 
@@ -2663,28 +2785,28 @@ queue_send (void *cls, size_t size, void *buf)
 
   peer->core_transmit = NULL;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* Queue send\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Queue send\n");
   queue = queue_get_next (peer);
 
   /* Queue has no internal mesh traffic nor sendable payload */
   if (NULL == queue)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   not ready, return\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   not ready, return\n");
     if (NULL == peer->queue_head)
       GNUNET_break (0); /* Core tmt_rdy should've been canceled */
     return 0;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   not empty\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   not empty\n");
 
   GNUNET_PEER_resolve (peer->id, &dst_id);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "*********   towards %s\n",
+              "*   towards %s\n",
               GNUNET_i2s (&dst_id));
   /* Check if buffer size is enough for the message */
   if (queue->size > size)
   {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "*********   not enough room, reissue\n");
+                  "*   not enough room, reissue\n");
       peer->core_transmit =
           GNUNET_CORE_notify_transmit_ready (core_handle,
                                              GNUNET_NO,
@@ -2696,7 +2818,7 @@ queue_send (void *cls, size_t size, void *buf)
                                              peer);
       return 0;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   size ok\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   size ok\n");
 
   t = queue->tunnel;
   GNUNET_assert (0 < t->pending_messages);
@@ -2714,7 +2836,7 @@ queue_send (void *cls, size_t size, void *buf)
     case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY:
     case GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE:
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "*********   raw: %s\n",
+                  "*   raw: %s\n",
                   GNUNET_MESH_DEBUG_M2S (queue->type));
       /* Fall through */
     case GNUNET_MESSAGE_TYPE_MESH_UNICAST:
@@ -2724,17 +2846,17 @@ queue_send (void *cls, size_t size, void *buf)
       type = ntohs (msg->type);
       break;
     case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   path create\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   path create\n");
       data_size = send_core_path_create (queue->cls, size, buf);
       break;
     case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   path ack\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   path ack\n");
       data_size = send_core_path_ack (queue->cls, size, buf);
       break;
     default:
       GNUNET_break (0);
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "*********   type unknown: %u\n",
+                  "*   type unknown: %u\n",
                   queue->type);
       data_size = 0;
   }
@@ -2759,7 +2881,7 @@ queue_send (void *cls, size_t size, void *buf)
 
   if (GNUNET_YES == t->destroy && 0 == t->pending_messages)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********  destroying tunnel!\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*  destroying tunnel!\n");
     tunnel_destroy (t);
   }
 
@@ -2769,7 +2891,7 @@ queue_send (void *cls, size_t size, void *buf)
   {
       struct GNUNET_PeerIdentity id;
 
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********   more data!\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   more data!\n");
       GNUNET_PEER_resolve (peer->id, &id);
       peer->core_transmit =
           GNUNET_CORE_notify_transmit_ready(core_handle,
@@ -2784,7 +2906,7 @@ queue_send (void *cls, size_t size, void *buf)
   else if (NULL != peer->queue_head)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                "*********   %s stalled\n",
+                "*   %s stalled\n",
                 GNUNET_i2s (&my_full_id));
     if (peer->id == t->next_hop)
       fc = &t->next_fc;
@@ -2802,7 +2924,7 @@ queue_send (void *cls, size_t size, void *buf)
                                                     &tunnel_poll, fc);
     }
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*********  Return %d\n", data_size);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*  Return %d\n", data_size);
   return data_size;
 }
 
@@ -2902,7 +3024,6 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer,
   MESH_TunnelNumber tid;
   struct GNUNET_MESH_CreateTunnel *msg;
   struct GNUNET_PeerIdentity *pi;
-  struct GNUNET_HashCode hash;
   struct MeshPeerPath *path;
   struct MeshPeerInfo *dest_peer_info;
   struct MeshPeerInfo *orig_peer_info;
@@ -2961,15 +3082,6 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer,
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  nobuffer:%d\n", t->nobuffer);
 
     tunnel_reset_timeout (t);
-    GMC_hash32 (t->local_tid_dest, &hash);
-    if (GNUNET_OK !=
-        GNUNET_CONTAINER_multihashmap_put (incoming_tunnels, &hash, t,
-                                           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST))
-    {
-      tunnel_destroy (t);
-      GNUNET_break (0);
-      return GNUNET_OK;
-    }
   }
   t->state = MESH_TUNNEL_WAITING;
   dest_peer_info =
@@ -3030,7 +3142,6 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer,
       /* TODO send reject */
       return GNUNET_OK;
     }
-    t->client = c;
 
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
     peer_info_add_path_to_origin (orig_peer_info, path, GNUNET_YES);
@@ -3042,6 +3153,7 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer,
     t->local_tid_dest = next_local_tid++;
     next_local_tid = next_local_tid | GNUNET_MESH_LOCAL_TUNNEL_ID_SERV;
 
+    tunnel_add_client (t, c);
     send_client_tunnel_create (t);
     send_path_ack (t);
   }
@@ -3063,6 +3175,88 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer,
 }
 
 
+
+/**
+ * Core handler for path ACKs
+ *
+ * @param cls closure
+ * @param message message
+ * @param peer peer identity this notification is about
+ *
+ * @return GNUNET_OK to keep the connection open,
+ *         GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_mesh_path_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
+                      const struct GNUNET_MessageHeader *message)
+{
+  struct GNUNET_MESH_PathACK *msg;
+  struct MeshPeerInfo *peer_info;
+  struct MeshPeerPath *p;
+  struct MeshTunnel *t;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a path ACK msg [%s]\n",
+              GNUNET_i2s (&my_full_id));
+  msg = (struct GNUNET_MESH_PathACK *) message;
+  t = tunnel_get (&msg->oid, ntohl(msg->tid));
+  if (NULL == t)
+  {
+    /* TODO notify that we don't know the tunnel */
+    GNUNET_STATISTICS_update (stats, "# control on unknown tunnel", 1, GNUNET_NO);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  don't know the tunnel %s [%X]!\n",
+                GNUNET_i2s (&msg->oid), ntohl(msg->tid));
+    return GNUNET_OK;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  on tunnel %s [%X]\n",
+              GNUNET_i2s (&msg->oid), ntohl(msg->tid));
+
+  peer_info = peer_get (&msg->peer_id);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  by peer %s\n",
+              GNUNET_i2s (&msg->peer_id));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n",
+              GNUNET_i2s (peer));
+
+  /* Add path to peers? */
+  p = t->path;
+  if (NULL != p)
+  {
+    path_add_to_peers (p, GNUNET_YES);
+  }
+  else
+  {
+    GNUNET_break (0);
+  }
+  t->state = MESH_TUNNEL_READY;
+  t->next_fc.last_ack_recv = ntohl (msg->ack);
+  t->prev_fc.last_ack_sent = ntohl (msg->ack);
+
+  /* Message for us? */
+  if (0 == memcmp (&msg->oid, &my_full_id, sizeof (struct GNUNET_PeerIdentity)))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
+    if (NULL == t->owner)
+    {
+      GNUNET_break_op (0);
+      return GNUNET_OK;
+    }
+    if (NULL != peer_info->dhtget)
+    {
+      GNUNET_DHT_get_stop (peer_info->dhtget);
+      peer_info->dhtget = NULL;
+    }
+    tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_PATH_ACK);
+    tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_PATH_ACK);
+    return GNUNET_OK;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "  not for us, retransmitting...\n");
+  peer_info = peer_get (&msg->oid);
+  send_prebuilt_message (message, t->prev_hop, t);
+  return GNUNET_OK;
+}
+
+
 /**
  * Core handler for notifications of broken paths
  *
@@ -3221,16 +3415,11 @@ handle_mesh_unicast (void *cls, const struct GNUNET_PeerIdentity *peer,
   tunnel_reset_timeout (t);
   if (t->dest == myid)
   {
-    if (NULL == t->client)
-    {
-      GNUNET_break (0);
-      return GNUNET_OK;
-    }
+    /* TODO signature verification */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "  it's for us! sending to clients...\n");
     GNUNET_STATISTICS_update (stats, "# unicast received", 1, GNUNET_NO);
-    GNUNET_SERVER_notification_context_unicast (nc, t->client->handle,
-                                                message, GNUNET_NO);
+    tunnel_send_client_ucast (t, msg);
     tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST);
     return GNUNET_OK;
   }
@@ -3325,20 +3514,13 @@ handle_mesh_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               " pid %u not seen yet, forwarding\n", pid);
 
-  if (NULL != t->owner)
+  if (myid == t->id.oid)
   {
-    char cbuf[size];
-    struct GNUNET_MESH_ToOrigin *copy;
-
+    /* TODO signature verification */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "  it's for us! sending to clients...\n");
-    /* TODO signature verification */
-    memcpy (cbuf, message, size);
-    copy = (struct GNUNET_MESH_ToOrigin *) cbuf;
-    copy->tid = htonl (t->local_tid);
     GNUNET_STATISTICS_update (stats, "# to origin received", 1, GNUNET_NO);
-    GNUNET_SERVER_notification_context_unicast (nc, t->owner->handle,
-                                                &copy->header, GNUNET_NO);
+    tunnel_send_client_to_orig (t, msg);
     tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN);
     return GNUNET_OK;
   }
@@ -3495,85 +3677,6 @@ handle_mesh_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
   return GNUNET_OK;
 }
 
-/**
- * Core handler for path ACKs
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- *
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-static int
-handle_mesh_path_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
-                      const struct GNUNET_MessageHeader *message)
-{
-  struct GNUNET_MESH_PathACK *msg;
-  struct MeshPeerInfo *peer_info;
-  struct MeshPeerPath *p;
-  struct MeshTunnel *t;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a path ACK msg [%s]\n",
-              GNUNET_i2s (&my_full_id));
-  msg = (struct GNUNET_MESH_PathACK *) message;
-  t = tunnel_get (&msg->oid, ntohl(msg->tid));
-  if (NULL == t)
-  {
-    /* TODO notify that we don't know the tunnel */
-    GNUNET_STATISTICS_update (stats, "# control on unknown tunnel", 1, GNUNET_NO);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  don't know the tunnel %s [%X]!\n",
-                GNUNET_i2s (&msg->oid), ntohl(msg->tid));
-    return GNUNET_OK;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  on tunnel %s [%X]\n",
-              GNUNET_i2s (&msg->oid), ntohl(msg->tid));
-
-  peer_info = peer_get (&msg->peer_id);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  by peer %s\n",
-              GNUNET_i2s (&msg->peer_id));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n",
-              GNUNET_i2s (peer));
-
-  /* Add path to peers? */
-  p = t->path;
-  if (NULL != p)
-  {
-    path_add_to_peers (p, GNUNET_YES);
-  }
-  else
-  {
-    GNUNET_break (0);
-  }
-  t->state = MESH_TUNNEL_READY;
-  t->next_fc.last_ack_recv = ntohl (msg->ack);
-  t->prev_fc.last_ack_sent = ntohl (msg->ack);
-
-  /* Message for us? */
-  if (0 == memcmp (&msg->oid, &my_full_id, sizeof (struct GNUNET_PeerIdentity)))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  It's for us!\n");
-    if (NULL == t->owner)
-    {
-      GNUNET_break_op (0);
-      return GNUNET_OK;
-    }
-    if (NULL != peer_info->dhtget)
-    {
-      GNUNET_DHT_get_stop (peer_info->dhtget);
-      peer_info->dhtget = NULL;
-    }
-    tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_PATH_ACK);
-    return GNUNET_OK;
-  }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "  not for us, retransmitting...\n");
-  peer_info = peer_get (&msg->oid);
-  send_prebuilt_message (message, t->prev_hop, t);
-  return GNUNET_OK;
-}
-
 
 /**
  * Core handler for mesh keepalives.