Port CADET to CORE MQ API
authorBart Polot <bart@net.in.tum.de>
Tue, 20 Sep 2016 01:21:59 +0000 (01:21 +0000)
committerBart Polot <bart@net.in.tum.de>
Tue, 20 Sep 2016 01:21:59 +0000 (01:21 +0000)
src/cadet/gnunet-service-cadet_connection.c
src/cadet/gnunet-service-cadet_connection.h
src/cadet/gnunet-service-cadet_local.c
src/cadet/gnunet-service-cadet_peer.c
src/cadet/gnunet-service-cadet_peer.h
src/cadet/gnunet-service-cadet_tunnel.c
src/cadet/gnunet-service-cadet_tunnel.h

index 0c11c24df9f832b8ae2507202486b1960d9e7706..29695243f92ffbe4ee7660b05d933928b23bb699 100644 (file)
@@ -268,7 +268,7 @@ struct CadetConnectionQueue
   /**
    * Peer queue handle, to cancel if necessary.
    */
-  struct CadetPeerQueue *q;
+  struct CadetPeerQueue *peer_q;
 
   /**
    * Continuation to call once sent.
@@ -312,7 +312,8 @@ static struct GNUNET_CONTAINER_MultiHashMap *connections;
 
 /**
  * How many connections are we willing to maintain.
- * Local connections are always allowed, even if there are more connections than max.
+ *  Local connections are always allowed,
+ * even if there are more connections than max.
  */
 static unsigned long long max_connections;
 
@@ -620,41 +621,95 @@ send_ack (struct CadetConnection *c, unsigned int buffer, int fwd, int force)
 }
 
 
+/**
+ * Update performance information if we are a connection's endpoint.
+ *
+ * @param c Connection to update.
+ * @param wait How much time did we wait to send the last message.
+ * @param size Size of the last message.
+ */
+static void
+update_perf (struct CadetConnection *c,
+             struct GNUNET_TIME_Relative wait,
+             uint16_t size)
+{
+  struct CadetConnectionPerformance *p;
+  double usecsperbyte;
+
+  if (NULL == c->perf)
+    return; /* Only endpoints are interested in timing. */
+
+  p = c->perf;
+  usecsperbyte = ((double) wait.rel_value_us) / size;
+  if (p->size == AVG_MSGS)
+  {
+    /* Array is full. Substract oldest value, add new one and store. */
+    p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
+    p->usecsperbyte[p->idx] = usecsperbyte;
+    p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
+  }
+  else
+  {
+    /* Array not yet full. Add current value to avg and store. */
+    p->usecsperbyte[p->idx] = usecsperbyte;
+    p->avg *= p->size;
+    p->avg += p->usecsperbyte[p->idx];
+    p->size++;
+    p->avg /= p->size;
+  }
+  p->idx = (p->idx + 1) % AVG_MSGS;
+}
+
+
 /**
  * Callback called when a connection queued message is sent.
  *
  * Calculates the average time and connection packet tracking.
  *
- * @param cls Closure (ConnectionQueue Handle).
+ * @param cls Closure (ConnectionQueue Handle), can be NULL.
  * @param c Connection this message was on.
+ * @param fwd Was this a FWD going message?
  * @param sent Was it really sent? (Could have been canceled)
  * @param type Type of message sent.
- * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
- * @param fwd Was this a FWD going message?
+ * @param payload_type Type of payload, if applicable.
+ * @param pid Message ID, or 0 if not applicable (create, destroy, etc).
  * @param size Size of the message.
  * @param wait Time spent waiting for core (only the time for THIS message)
- * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise.
  */
-static int
+static void
 conn_message_sent (void *cls,
-                   struct CadetConnection *c, int sent,
-                   uint16_t type, uint32_t pid, int fwd, size_t size,
+                   struct CadetConnection *c, int fwd, int sent,
+                   uint16_t type, uint16_t payload_type, uint32_t pid,
+                   size_t size,
                    struct GNUNET_TIME_Relative wait)
 {
-  struct CadetConnectionPerformance *p;
-  struct CadetFlowControl *fc;
   struct CadetConnectionQueue *q = cls;
-  double usecsperbyte;
+  struct CadetFlowControl *fc;
   int forced;
 
   GCC_check_connections ();
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
 
+  /* If c is NULL, nothing to update. */
+  if (NULL == c)
+  {
+    if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
+        && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
+    {
+      LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
+           GC_m2s (type));
+    }
+    GCC_check_connections ();
+    return;
+  }
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "connection message_sent\n");
   GCC_debug (c, GNUNET_ERROR_TYPE_DEBUG);
 
+  /* Update flow control info. */
   fc = fwd ? &c->fwd_fc : &c->bck_fc;
   LOG (GNUNET_ERROR_TYPE_DEBUG, " %ssent %s %s pid %u\n",
-       sent ? "" : "not ", GC_f2s (fwd), GC_m2s (type), pid);
+       sent ? "" : "not ", GC_f2s (fwd),
+       GC_m2s (type), GC_m2s (payload_type), pid);
   if (NULL != q)
   {
     forced = q->forced;
@@ -674,17 +729,7 @@ conn_message_sent (void *cls,
   {
     forced = GNUNET_NO;
   }
-  if (NULL == c)
-  {
-    if (type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
-        && type != GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY)
-    {
-      LOG (GNUNET_ERROR_TYPE_ERROR, "Message %s sent on NULL connection!\n",
-           GC_m2s (type));
-    }
-    GCC_check_connections ();
-    return GNUNET_NO;
-  }
+
   LOG (GNUNET_ERROR_TYPE_DEBUG, " C_P- %p %u\n", c, c->pending_messages);
   c->pending_messages--;
   if ( (GNUNET_YES == c->destroy) &&
@@ -694,8 +739,9 @@ conn_message_sent (void *cls,
          "!  destroying connection!\n");
     GCC_destroy (c);
     GCC_check_connections ();
-    return GNUNET_YES;
+    return;
   }
+
   /* Send ACK if needed, after accounting for sent ID in fc->queue_n */
   switch (type)
   {
@@ -758,30 +804,8 @@ conn_message_sent (void *cls,
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  message sent!\n");
 
-  if (NULL == c->perf)
-    return GNUNET_NO; /* Only endpoints are interested in timing. */
-
-  p = c->perf;
-  usecsperbyte = ((double) wait.rel_value_us) / size;
-  if (p->size == AVG_MSGS)
-  {
-    /* Array is full. Substract oldest value, add new one and store. */
-    p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS);
-    p->usecsperbyte[p->idx] = usecsperbyte;
-    p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS);
-  }
-  else
-  {
-    /* Array not yet full. Add current value to avg and store. */
-    p->usecsperbyte[p->idx] = usecsperbyte;
-    p->avg *= p->size;
-    p->avg += p->usecsperbyte[p->idx];
-    p->size++;
-    p->avg /= p->size;
-  }
-  p->idx = (p->idx + 1) % AVG_MSGS;
+  update_perf (c, wait, size);
   GCC_check_connections ();
-  return GNUNET_NO;
 }
 
 
@@ -950,27 +974,26 @@ is_ooo_ok (uint32_t last_pid_recv, uint32_t ooo_pid, uint32_t ooo_bitmap)
  * Is traffic coming from this sender 'FWD' traffic?
  *
  * @param c Connection to check.
- * @param sender Peer identity of neighbor.
+ * @param sender Short peer identity of neighbor.
  *
  * @return #GNUNET_YES in case the sender is the 'prev' hop and therefore
  *         the traffic is 'FWD'.
  *         #GNUNET_NO for BCK.
- *         #GNUNET_SYSERR for errors.
+ *         #GNUNET_SYSERR for errors (sender isn't a hop in the connection).
  */
 static int
 is_fwd (const struct CadetConnection *c,
-        const struct GNUNET_PeerIdentity *sender)
+        const struct CadetPeer *sender)
 {
   GNUNET_PEER_Id id;
 
-  id = GNUNET_PEER_search (sender);
+  id = GCP_get_short_id (sender);
   if (GCP_get_short_id (get_prev_hop (c)) == id)
     return GNUNET_YES;
 
   if (GCP_get_short_id (get_next_hop (c)) == id)
     return GNUNET_NO;
 
-  GNUNET_break (0);
   return GNUNET_SYSERR;
 }
 
@@ -979,29 +1002,40 @@ is_fwd (const struct CadetConnection *c,
  * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
  * or a first CONNECTION_ACK directed to us.
  *
- * @param connection Connection to confirm.
+ * @param c Connection to confirm.
  * @param fwd Should we send it FWD? (root->dest)
  *            (First (~SYNACK) goes BCK, second (~ACK) goes FWD)
  */
 static void
-send_connection_ack (struct CadetConnection *connection, int fwd)
+send_connection_ack (struct CadetConnection *c, int fwd)
 {
+  struct GNUNET_CADET_ConnectionACK msg;
   struct CadetTunnel *t;
   size_t size = sizeof (struct GNUNET_CADET_ConnectionACK);
 
   GCC_check_connections ();
-  t = connection->t;
+  t = c->t;
   LOG (GNUNET_ERROR_TYPE_INFO,
        "==> { C %s ACK} %19s on conn %s (%p) %s [%5u]\n",
-       GC_f2s (!fwd), "", GCC_2s (connection), connection, GC_f2s (fwd), size);
-  GCP_queue_add (get_hop (connection, fwd), NULL,
-                 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, UINT16_MAX, 0,
-                 size, connection, fwd, &conn_message_sent, NULL);
-  connection->pending_messages++;
+       GC_f2s (!fwd), "", GCC_2s (c), c, GC_f2s (fwd), size);
+
+  msg.header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK));
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK);
+  msg.cid = c->id;
+
+  GNUNET_assert (NULL == c->maintenance_q);
+  c->maintenance_q = GCP_send (get_hop (c, fwd), &msg.header,
+                               GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK, 0,
+                               c, fwd,
+                               &conn_message_sent, NULL);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (conn`ACK)\n",
+       c, c->pending_messages);
+  c->pending_messages++;
+
   if (CADET_TUNNEL_NEW == GCT_get_cstate (t))
     GCT_change_cstate (t, CADET_TUNNEL_WAITING);
-  if (CADET_CONNECTION_READY != connection->state)
-    connection_change_state (connection, CADET_CONNECTION_SENT);
+  if (CADET_CONNECTION_READY != c->state)
+    connection_change_state (c, CADET_CONNECTION_SENT);
   GCC_check_connections ();
 }
 
@@ -1042,17 +1076,15 @@ send_broken (struct CadetConnection *c,
  * @param connection_id Connection ID.
  * @param id1 Peer that has disconnected, probably local peer.
  * @param id2 Peer that has disconnected can be NULL if unknown.
- * @param peer Peer to notify (neighbor who sent the connection).
+ * @param neighbor Peer to notify (neighbor who sent the connection).
  */
 static void
 send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id,
                      const struct GNUNET_PeerIdentity *id1,
                      const struct GNUNET_PeerIdentity *id2,
-                     const struct GNUNET_PeerIdentity *peer_id)
+                     struct CadetPeer *neighbor)
 {
   struct GNUNET_CADET_ConnectionBroken *msg;
-  struct CadetPeerQueue *q;
-  struct CadetPeer *neighbor;
 
   GCC_check_connections ();
   LOG (GNUNET_ERROR_TYPE_INFO, "--> BROKEN on unknown connection %s\n",
@@ -1067,14 +1099,10 @@ send_broken_unknown (const struct GNUNET_CADET_Hash *connection_id,
     msg->peer2 = *id2;
   else
     memset (&msg->peer2, 0, sizeof (msg->peer2));
-  neighbor = GCP_get (peer_id, GNUNET_NO); /* We MUST know neighbor. */
-  GNUNET_assert (NULL != neighbor);
-  q = GCP_queue_add (neighbor, msg, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
-                     UINT16_MAX, 2,
-                     sizeof (struct GNUNET_CADET_ConnectionBroken),
-                     NULL, GNUNET_SYSERR, /* connection, fwd */
-                     NULL, NULL); /* continuation */
-  GNUNET_assert (NULL != q);
+  GNUNET_assert (NULL != GCP_send (neighbor, &msg->header,
+                                   UINT16_MAX, 2,
+                                   NULL, GNUNET_SYSERR, /* connection, fwd */
+                                   NULL, NULL)); /* continuation */
   GCC_check_connections ();
 }
 
@@ -1310,38 +1338,6 @@ schedule_next_keepalive (struct CadetConnection *c, int fwd)
 }
 
 
-/**
- * @brief Re-initiate traffic on this connection if necessary.
- *
- * Check if there is traffic queued towards this peer
- * and the core transmit handle is NULL (traffic was stalled).
- * If so, call core tmt rdy.
- *
- * @param c Connection on which initiate traffic.
- * @param fwd Is this about fwd traffic?
- */
-static void
-connection_unlock_queue (struct CadetConnection *c, int fwd)
-{
-  struct CadetPeer *peer;
-
-  GCC_check_connections ();
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "connection_unlock_queue %s on %s\n",
-       GC_f2s (fwd), GCC_2s (c));
-
-  if (GCC_is_terminal (c, fwd))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, " is terminal, can unlock!\n");
-    return;
-  }
-
-  peer = get_hop (c, fwd);
-  GCP_queue_unlock (peer, c);
-  GCC_check_connections ();
-}
-
-
 /**
  * Cancel all transmissions that belong to a certain connection.
  *
@@ -1356,7 +1352,6 @@ connection_cancel_queues (struct CadetConnection *c,
                           int fwd)
 {
   struct CadetFlowControl *fc;
-  struct CadetPeer *peer;
 
   GCC_check_connections ();
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1380,8 +1375,6 @@ connection_cancel_queues (struct CadetConnection *c,
     GCC_cancel (fc->poll_msg);
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  cancelled POLL msg for fc %p\n", fc);
   }
-  peer = get_hop (c, fwd);
-  GCP_queue_cancel (peer, c);
   GCC_check_connections ();
 }
 
@@ -1470,53 +1463,6 @@ connection_poll (void *cls)
 }
 
 
-/**
- * Resend all queued messages for a connection on other connections of the
- * same tunnel, if possible. The connection WILL BE DESTROYED by this function.
- *
- * @param c Connection whose messages to resend.
- * @param fwd Resend fwd messages?
- */
-static void
-resend_messages_and_destroy (struct CadetConnection *c, int fwd)
-{
-  struct GNUNET_MessageHeader *out_msg;
-  struct CadetTunnel *t = c->t;
-  struct CadetPeer *neighbor;
-  unsigned int pending;
-  int destroyed;
-
-  GCC_check_connections ();
-  mark_destroyed (c);
-
-  destroyed = GNUNET_NO;
-  neighbor = get_hop (c, fwd);
-  pending = c->pending_messages;
-
-  while (NULL != (out_msg = GCP_connection_pop (neighbor, c, &destroyed)))
-  {
-    if (NULL != t)
-      GCT_resend_message (out_msg, t);
-    GNUNET_free (out_msg);
-  }
-
-  /* All pending messages should have been popped,
-   * and the connection destroyed by the continuation.
-   */
-  if (GNUNET_YES != destroyed)
-  {
-    if (0 != pending)
-    {
-      GNUNET_break (0);
-      GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
-      if (NULL != t) GCT_debug (t, GNUNET_ERROR_TYPE_ERROR);
-    }
-    GCC_destroy (c);
-  }
-  GCC_check_connections ();
-}
-
-
 /**
  * Generic connection timeout implementation.
  *
@@ -1529,10 +1475,7 @@ resend_messages_and_destroy (struct CadetConnection *c, int fwd)
 static void
 connection_timeout (struct CadetConnection *c, int fwd)
 {
-  struct CadetFlowControl *reverse_fc;
-
   GCC_check_connections ();
-  reverse_fc = fwd ? &c->bck_fc : &c->fwd_fc;
 
   LOG (GNUNET_ERROR_TYPE_INFO,
        "Connection %s %s timed out. Destroying.\n",
@@ -1546,17 +1489,13 @@ connection_timeout (struct CadetConnection *c, int fwd)
     return;
   }
 
-  /* If dest, salvage queued traffic. */
+  /* If dest, send "broken" notification. */
   if (GCC_is_terminal (c, fwd))
   {
-    const struct GNUNET_PeerIdentity *next_hop;
+    struct CadetPeer *next_hop;
 
-    next_hop = GCP_get_id (fwd ? get_prev_hop (c) : get_next_hop (c));
+    next_hop = fwd ? get_prev_hop (c) : get_next_hop (c);
     send_broken_unknown (&c->id, &my_full_id, NULL, next_hop);
-    if (0 < reverse_fc->queue_n)
-      resend_messages_and_destroy (c, !fwd);
-    GCC_check_connections ();
-    return;
   }
 
   GCC_destroy (c);
@@ -1907,13 +1846,13 @@ add_to_peer (struct CadetConnection *c,
  * Log receipt of message on stderr (INFO level).
  *
  * @param message Message received.
- * @param peer Peer who sent the message.
- * @param hash Connection ID.
+ * @param peer    Peer who sent the message.
+ * @param conn_id Connection ID of the message.
  */
 static void
 log_message (const struct GNUNET_MessageHeader *message,
-             const struct GNUNET_PeerIdentity *peer,
-             const struct GNUNET_CADET_Hash *hash)
+             const struct CadetPeer *peer,
+             const struct GNUNET_CADET_Hash *conn_id)
 {
   uint16_t size;
   uint16_t type;
@@ -1933,8 +1872,8 @@ log_message (const struct GNUNET_MessageHeader *message,
       arrow = "--";
   }
   LOG (GNUNET_ERROR_TYPE_INFO, "<%s %s on conn %s from %s, %6u bytes\n",
-       arrow, GC_m2s (type), GNUNET_h2s (GC_h2hc (hash)),
-       GNUNET_i2s (peer), (unsigned int) size);
+       arrow, GC_m2s (type), GNUNET_h2s (GC_h2hc (conn_id)),
+       GCP_2s(peer), (unsigned int) size);
 }
 
 /******************************************************************************/
@@ -1942,22 +1881,17 @@ log_message (const struct GNUNET_MessageHeader *message,
 /******************************************************************************/
 
 /**
- * Core handler for connection creation.
+ * Handler for connection creation.
  *
- * @param cls Closure (unused).
- * @param peer Sender (neighbor).
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_create (void *cls,
-                   const struct GNUNET_PeerIdentity *peer,
-                   const struct GNUNET_MessageHeader *message)
+void
+GCC_handle_create (struct CadetPeer *peer,
+                   const struct GNUNET_CADET_ConnectionCreate *msg)
 {
-  struct GNUNET_CADET_ConnectionCreate *msg;
+  const struct GNUNET_CADET_Hash *cid;
   struct GNUNET_PeerIdentity *id;
-  struct GNUNET_CADET_Hash *cid;
   struct CadetPeerPath *path;
   struct CadetPeer *dest_peer;
   struct CadetPeer *orig_peer;
@@ -1966,38 +1900,26 @@ GCC_handle_create (void *cls,
   uint16_t size;
 
   GCC_check_connections ();
-  /* Check size */
-  size = ntohs (message->size);
-  if (size < sizeof (struct GNUNET_CADET_ConnectionCreate))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_OK;
-  }
+  size = ntohs (msg->header.size);
 
   /* Calculate hops */
   size -= sizeof (struct GNUNET_CADET_ConnectionCreate);
-  if (size % sizeof (struct GNUNET_PeerIdentity))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_OK;
-  }
   if (0 != size % sizeof (struct GNUNET_PeerIdentity))
   {
     GNUNET_break_op (0);
-    return GNUNET_OK;
+    return;
   }
   size /= sizeof (struct GNUNET_PeerIdentity);
   if (1 > size)
   {
     GNUNET_break_op (0);
-    return GNUNET_OK;
+    return;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "    path has %u hops.\n", size);
 
   /* Get parameters */
-  msg = (struct GNUNET_CADET_ConnectionCreate *) message;
   cid = &msg->cid;
-  log_message (message, peer, cid);
+  log_message (&msg->header, peer, cid);
   id = (struct GNUNET_PeerIdentity *) &msg[1];
   LOG (GNUNET_ERROR_TYPE_DEBUG, "    origin: %s\n", GNUNET_i2s (id));
 
@@ -2012,16 +1934,15 @@ GCC_handle_create (void *cls,
       /* Path was malformed, probably our own ID was not in it. */
       GNUNET_STATISTICS_update (stats, "# malformed paths", 1, GNUNET_NO);
       GNUNET_break_op (0);
-      return GNUNET_OK;
+      return;
     }
-
     if (0 == own_pos)
     {
       /* We received this request from a neighbor, we cannot be origin */
       GNUNET_STATISTICS_update (stats, "# fake paths", 1, GNUNET_NO);
       GNUNET_break_op (0);
       path_destroy (path);
-      return GNUNET_OK;
+      return;
     }
 
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Own position: %u\n", own_pos);
@@ -2035,14 +1956,14 @@ GCC_handle_create (void *cls,
         GNUNET_break (0);
         path_destroy (path);
         GCC_check_connections ();
-        return GNUNET_OK;
+        return;
       }
       send_broken_unknown (cid, &my_full_id,
                            GNUNET_PEER_resolve2 (path->peers[own_pos + 1]),
                            peer);
       path_destroy (path);
       GCC_check_connections ();
-      return GNUNET_OK;
+      return;
     }
     GCP_add_path_to_all (path, GNUNET_NO);
     connection_reset_timeout (c, GNUNET_YES);
@@ -2092,40 +2013,32 @@ GCC_handle_create (void *cls,
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Retransmitting.\n");
     GCP_add_path (dest_peer, path_duplicate (path), GNUNET_NO);
     GCP_add_path_to_origin (orig_peer, path_duplicate (path), GNUNET_NO);
-    GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c,
-                                                      GNUNET_YES, GNUNET_YES,
-                                                      NULL, NULL));
+    GNUNET_assert (NULL ==
+                   GCC_send_prebuilt_message (&msg->header, 0, 0, c,
+                                              GNUNET_YES, GNUNET_YES,
+                                              NULL, NULL));
   }
   path_destroy (path);
   GCC_check_connections ();
-  return GNUNET_OK;
 }
 
 
 /**
- * Core handler for path confirmations.
+ * Handler for connection confirmations.
  *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_confirm (void *cls,
-                    const struct GNUNET_PeerIdentity *peer,
-                    const struct GNUNET_MessageHeader *message)
+void
+GCC_handle_confirm (struct CadetPeer *peer,
+                    const struct GNUNET_CADET_ConnectionACK *msg)
 {
-  struct GNUNET_CADET_ConnectionACK *msg;
   struct CadetConnection *c;
-  struct CadetPeerPath *p;
-  struct CadetPeer *pi;
   enum CadetConnectionState oldstate;
   int fwd;
 
   GCC_check_connections ();
-  msg = (struct GNUNET_CADET_ConnectionACK *) message;
-  log_message (message, peer, &msg->cid);
+  log_message (&msg->header, peer, &msg->cid);
   c = connection_get (&msg->cid);
   if (NULL == c)
   {
@@ -2135,30 +2048,30 @@ GCC_handle_confirm (void *cls,
          "  don't know the connection!\n");
     send_broken_unknown (&msg->cid, &my_full_id, NULL, peer);
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
-
   if (GNUNET_NO != c->destroy)
   {
     GNUNET_assert (CADET_CONNECTION_DESTROYED == c->state);
+    GNUNET_STATISTICS_update (stats, "# control on dying connection",
+                              1, GNUNET_NO);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "connection %s being destroyed, ignoring confirm\n",
          GCC_2s (c));
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
 
   oldstate = c->state;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GNUNET_i2s (peer));
-  pi = GCP_get (peer, GNUNET_YES);
-  if (get_next_hop (c) == pi)
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  via peer %s\n", GCP_2s (peer));
+  if (get_next_hop (c) == peer)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  SYNACK\n");
     fwd = GNUNET_NO;
     if (CADET_CONNECTION_SENT == oldstate)
       connection_change_state (c, CADET_CONNECTION_ACK);
   }
-  else if (get_prev_hop (c) == pi)
+  else if (get_prev_hop (c) == peer)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  FINAL ACK\n");
     fwd = GNUNET_YES;
@@ -2166,17 +2079,18 @@ GCC_handle_confirm (void *cls,
   }
   else
   {
+    GNUNET_STATISTICS_update (stats, "# control on connection from wrong peer",
+                              1, GNUNET_NO);
     GNUNET_break_op (0);
-    return GNUNET_OK;
+    return;
   }
 
   connection_reset_timeout (c, fwd);
 
   /* Add path to peers? */
-  p = c->path;
-  if (NULL != p)
+  if (NULL != c->path)
   {
-    GCP_add_path_to_all (p, GNUNET_YES);
+    GCP_add_path_to_all (c->path, GNUNET_YES);
   }
   else
   {
@@ -2184,12 +2098,12 @@ GCC_handle_confirm (void *cls,
   }
 
   /* Message for us as creator? */
-  if (GCC_is_origin (c, GNUNET_YES))
+  if (GNUNET_YES == GCC_is_origin (c, GNUNET_YES))
   {
     if (GNUNET_NO != fwd)
     {
-      GNUNET_break_op (0);
-      return GNUNET_OK;
+      GNUNET_break (0);
+      return;
     }
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection (SYN)ACK for us!\n");
 
@@ -2197,7 +2111,7 @@ GCC_handle_confirm (void *cls,
     if (CADET_CONNECTION_SENT == oldstate)
       connection_reset_timeout (c, GNUNET_YES);
 
-    /* Change connection state */
+    /* Change connection state, send ACK */
     connection_change_state (c, CADET_CONNECTION_READY);
     send_connection_ack (c, GNUNET_YES);
 
@@ -2205,7 +2119,7 @@ GCC_handle_confirm (void *cls,
     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
 
   /* Message for us as destination? */
@@ -2213,8 +2127,8 @@ GCC_handle_confirm (void *cls,
   {
     if (GNUNET_YES != fwd)
     {
-      GNUNET_break_op (0);
-      return GNUNET_OK;
+      GNUNET_break (0);
+      return;
     }
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  Connection ACK for us!\n");
 
@@ -2226,41 +2140,34 @@ GCC_handle_confirm (void *cls,
     if (CADET_TUNNEL_WAITING == GCT_get_cstate (c->t))
       GCT_change_cstate (c->t, CADET_TUNNEL_READY);
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
   GNUNET_assert (NULL ==
-                 GCC_send_prebuilt_message (message, 0, 0, c, fwd,
+                 GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
                                             GNUNET_YES, NULL, NULL));
   GCC_check_connections ();
-  return GNUNET_OK;
+  return;
 }
 
 
 /**
- * Core handler for notifications of broken connections.
+ * Handler for notifications of broken connections.
  *
- * @param cls Closure (unused).
- * @param id Peer identity of sending neighbor.
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_broken (void* cls,
-                   const struct GNUNET_PeerIdentity* id,
-                   const struct GNUNET_MessageHeader* message)
+void
+GCC_handle_broken (struct CadetPeer *peer,
+                   const struct GNUNET_CADET_ConnectionBroken *msg)
 {
-  struct GNUNET_CADET_ConnectionBroken *msg;
   struct CadetConnection *c;
   struct CadetTunnel *t;
-  int pending;
   int fwd;
 
   GCC_check_connections ();
-  msg = (struct GNUNET_CADET_ConnectionBroken *) message;
-  log_message (message, id, &msg->cid);
+  log_message (&msg->header, peer, &msg->cid);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
               GNUNET_i2s (&msg->peer1));
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  regarding %s\n",
@@ -2269,13 +2176,21 @@ GCC_handle_broken (void* cls,
   if (NULL == c)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  duplicate CONNECTION_BROKEN\n");
+    GNUNET_STATISTICS_update (stats, "# duplicate CONNECTION_BROKEN",
+                              1, GNUNET_NO);
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
 
   t = c->t;
 
-  fwd = is_fwd (c, id);
+  fwd = is_fwd (c, peer);
+  if (GNUNET_SYSERR == fwd)
+  {
+    GNUNET_break_op (0);
+    GCC_check_connections ();
+    return;
+  }
   mark_destroyed (c);
   if (GCC_is_terminal (c, fwd))
   {
@@ -2286,7 +2201,7 @@ GCC_handle_broken (void* cls,
       /* A terminal connection should not have 't' set to NULL. */
       GNUNET_break (0);
       GCC_debug (c, GNUNET_ERROR_TYPE_ERROR);
-      return GNUNET_OK;
+      return;
     }
     endpoint = GCP_get_short (c->path->peers[c->path->length - 1], GNUNET_YES);
     if (2 < c->path->length)
@@ -2297,44 +2212,35 @@ GCC_handle_broken (void* cls,
     GCT_remove_connection (t, c);
     c->t = NULL;
 
-    pending = c->pending_messages;
-    if (0 < pending)
-      resend_messages_and_destroy (c, !fwd);
-    else
-      GCC_destroy (c);
+    GCC_destroy (c);
   }
   else
   {
-    GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
-                                                      GNUNET_YES, NULL, NULL));
+    GNUNET_assert (NULL ==
+                   GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
+                                              GNUNET_YES, NULL, NULL));
     connection_cancel_queues (c, !fwd);
   }
   GCC_check_connections ();
-  return GNUNET_OK;
+  return;
 }
 
 
 /**
- * Core handler for tunnel destruction
+ * Handler for notifications of destroyed connections.
  *
- * @param cls Closure (unused).
- * @param peer Peer identity of sending neighbor.
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_destroy (void *cls,
-                    const struct GNUNET_PeerIdentity *peer,
-                    const struct GNUNET_MessageHeader *message)
+void
+GCC_handle_destroy (struct CadetPeer *peer,
+                    const struct GNUNET_CADET_ConnectionDestroy *msg)
 {
-  const struct GNUNET_CADET_ConnectionDestroy *msg;
   struct CadetConnection *c;
   int fwd;
 
   GCC_check_connections ();
-  msg = (const struct GNUNET_CADET_ConnectionDestroy *) message;
-  log_message (message, peer, &msg->cid);
+  log_message (&msg->header, peer, &msg->cid);
   c = connection_get (&msg->cid);
   if (NULL == c)
   {
@@ -2346,20 +2252,23 @@ GCC_handle_destroy (void *cls,
                               "# control on unknown connection",
                               1, GNUNET_NO);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "  connection unknown: already destroyed?\n");
+         "  connection unknown destroyed: previously destroyed?\n");
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
+
   fwd = is_fwd (c, peer);
   if (GNUNET_SYSERR == fwd)
   {
-    GNUNET_break_op (0); /* FIXME */
-    return GNUNET_OK;
+    GNUNET_break_op (0);
+    GCC_check_connections ();
+    return;
   }
+
   if (GNUNET_NO == GCC_is_terminal (c, fwd))
   {
     GNUNET_assert (NULL ==
-                   GCC_send_prebuilt_message (message, 0, 0, c, fwd,
+                   GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
                                               GNUNET_YES, NULL, NULL));
   }
   else if (0 == c->pending_messages)
@@ -2367,7 +2276,7 @@ GCC_handle_destroy (void *cls,
     LOG (GNUNET_ERROR_TYPE_DEBUG, "  directly destroying connection!\n");
     GCC_destroy (c);
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
   mark_destroyed (c);
   if (NULL != c->t)
@@ -2376,65 +2285,188 @@ GCC_handle_destroy (void *cls,
     c->t = NULL;
   }
   GCC_check_connections ();
-  return GNUNET_OK;
+  return;
 }
 
 
 /**
- * Check the message against internal state and test if it goes FWD or BCK.
- *
- * Updates the PID, state and timeout values for the connection.
+ * Handler for cadet network traffic hop-by-hop acks.
  *
- * @param message Message to check. It must belong to an existing connection.
- * @param minimum_size The message cannot be smaller than this value.
- * @param cid Connection ID (even if @a c is NULL, the ID is still needed).
- * @param c Connection this message should belong. If NULL, check fails.
- * @param neighbor Neighbor that sent the message.
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-static int
-check_message (const struct GNUNET_MessageHeader *message,
-               size_t minimum_size,
-               const struct GNUNET_CADET_Hash* cid,
-               struct CadetConnection *c,
-               const struct GNUNET_PeerIdentity *neighbor,
-               uint32_t pid)
+void
+GCC_handle_ack (struct CadetPeer *peer,
+                const struct GNUNET_CADET_ACK *msg)
 {
-  GNUNET_PEER_Id neighbor_id;
+  struct CadetConnection *c;
   struct CadetFlowControl *fc;
-  struct CadetPeer *hop;
+  uint32_t ack;
   int fwd;
-  uint16_t type;
-
-  /* Check size */
-  if (ntohs (message->size) < minimum_size)
-  {
-    GNUNET_break_op (0);
-    LOG (GNUNET_ERROR_TYPE_WARNING, "Size %u < %u\n",
-         ntohs (message->size), minimum_size);
-    return GNUNET_SYSERR;
-  }
 
-  /* Check connection */
+  GCC_check_connections ();
+  log_message (&msg->header, peer, &msg->cid);
+  c = connection_get (&msg->cid);
   if (NULL == c)
   {
     GNUNET_STATISTICS_update (stats,
-                              "# unknown connection",
-                              1, GNUNET_NO);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s on unknown connection %s\n",
-         GC_m2s (ntohs (message->type)),
-         GNUNET_h2s (GC_h2hc (cid)));
-    send_broken_unknown (cid,
+                              "# ack on unknown connection",
+                              1,
+                              GNUNET_NO);
+    send_broken_unknown (&msg->cid,
                          &my_full_id,
                          NULL,
-                         neighbor);
-    return GNUNET_SYSERR;
+                         peer);
+    GCC_check_connections ();
+    return;
+  }
+
+  /* Is this a forward or backward ACK? */
+  if (get_next_hop (c) == peer)
+  {
+    fc = &c->fwd_fc;
+    fwd = GNUNET_YES;
+  }
+  else if (get_prev_hop (c) == peer)
+  {
+    fc = &c->bck_fc;
+    fwd = GNUNET_NO;
+  }
+  else
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  ack = ntohl (msg->ack);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " %s ACK %u (was %u)\n",
+       GC_f2s (fwd), ack, fc->last_ack_recv);
+  if (GC_is_pid_bigger (ack, fc->last_ack_recv))
+    fc->last_ack_recv = ack;
+
+  /* Cancel polling if the ACK is big enough. */
+  if (NULL != fc->poll_task &&
+      GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
+    GNUNET_SCHEDULER_cancel (fc->poll_task);
+    fc->poll_task = NULL;
+    fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
+  }
+
+  GCC_check_connections ();
+}
+
+
+/**
+ * Handler for cadet network traffic hop-by-hop data counter polls.
+ *
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
+ */
+void
+GCC_handle_poll (struct CadetPeer *peer,
+                 const struct GNUNET_CADET_Poll *msg)
+{
+  struct CadetConnection *c;
+  struct CadetFlowControl *fc;
+  uint32_t pid;
+  int fwd;
+
+  GCC_check_connections ();
+  log_message (&msg->header, peer, &msg->cid);
+  c = connection_get (&msg->cid);
+  if (NULL == c)
+  {
+    GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
+                              GNUNET_NO);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "POLL message on unknown connection %s!\n",
+         GNUNET_h2s (GC_h2hc (&msg->cid)));
+    send_broken_unknown (&msg->cid,
+                         &my_full_id,
+                         NULL,
+                         peer);
+    GCC_check_connections ();
+    return;
+  }
+
+  /* Is this a forward or backward ACK?
+   * Note: a poll should never be needed in a loopback case,
+   * since there is no possiblility of packet loss there, so
+   * this way of discerining FWD/BCK should not be a problem.
+   */
+  if (get_next_hop (c) == peer)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
+    fc = &c->fwd_fc;
+  }
+  else if (get_prev_hop (c) == peer)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
+    fc = &c->bck_fc;
+  }
+  else
+  {
+    GNUNET_break_op (0);
+    return;
+  }
+
+  pid = ntohl (msg->pid);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
+  fc->last_pid_recv = pid;
+  fwd = fc == &c->bck_fc;
+  GCC_send_ack (c, fwd, GNUNET_YES);
+  GCC_check_connections ();
+}
+
+
+/**
+ * Check the message against internal state and test if it goes FWD or BCK.
+ *
+ * Updates the PID, state and timeout values for the connection.
+ *
+ * @param message Message to check. It must belong to an existing connection.
+ * @param cid Connection ID (even if @a c is NULL, the ID is still needed).
+ * @param c Connection this message should belong. If NULL, check fails.
+ * @param sender Neighbor that sent the message.
+ *
+ * @return #GNUNET_YES if the message goes FWD.
+ *         #GNUNET_NO if it goes BCK.
+ *         #GNUNET_SYSERR if there is an error (unauthorized sender, ...).
+ */
+static int
+check_message (const struct GNUNET_MessageHeader *message,
+               const struct GNUNET_CADET_Hash* cid,
+               struct CadetConnection *c,
+               struct CadetPeer *sender,
+               uint32_t pid)
+{
+  struct CadetFlowControl *fc;
+  struct CadetPeer *hop;
+  int fwd;
+  uint16_t type;
+
+  /* Check connection */
+  if (NULL == c)
+  {
+    GNUNET_STATISTICS_update (stats,
+                              "# unknown connection",
+                              1, GNUNET_NO);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%s on unknown connection %s\n",
+         GC_m2s (ntohs (message->type)),
+         GNUNET_h2s (GC_h2hc (cid)));
+    send_broken_unknown (cid,
+                         &my_full_id,
+                         NULL,
+                         sender);
+    return GNUNET_SYSERR;
   }
 
   /* Check if origin is as expected */
-  neighbor_id = GNUNET_PEER_search (neighbor);
   hop = get_prev_hop (c);
-  if (neighbor_id == GCP_get_short_id (hop))
+  if (sender == hop)
   {
     fwd = GNUNET_YES;
   }
@@ -2442,7 +2474,7 @@ check_message (const struct GNUNET_MessageHeader *message,
   {
     hop = get_next_hop (c);
     GNUNET_break (hop == c->next_peer);
-    if (neighbor_id == GCP_get_short_id (hop))
+    if (sender == hop)
     {
       fwd = GNUNET_NO;
     }
@@ -2508,123 +2540,111 @@ check_message (const struct GNUNET_MessageHeader *message,
 
 
 /**
- * Generic handler for cadet network encrypted traffic.
+ * Handler for key exchange traffic (Axolotl KX).
  *
- * @param peer Peer identity this notification is about.
- * @param msg Encrypted message.
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-static int
-handle_cadet_encrypted (const struct GNUNET_PeerIdentity *peer,
-                        const struct GNUNET_MessageHeader *message)
+void
+GCC_handle_kx (struct CadetPeer *peer,
+               const struct GNUNET_CADET_KX *msg)
 {
-  const struct GNUNET_CADET_AX *ax_msg;
   const struct GNUNET_CADET_Hash* cid;
   struct CadetConnection *c;
-  size_t minimum_size;
-  size_t overhead;
-  uint32_t pid;
   int fwd;
 
   GCC_check_connections ();
-  GNUNET_assert (GNUNET_MESSAGE_TYPE_CADET_AX == ntohs (message->type));
-  overhead = sizeof (struct GNUNET_CADET_AX);
-  ax_msg = (const struct GNUNET_CADET_AX *) message;
-  cid = &ax_msg->cid;
-  pid = ntohl (ax_msg->pid);
-  log_message (message, peer, cid);
-
-  minimum_size = sizeof (struct GNUNET_MessageHeader) + overhead;
+  cid = &msg->cid;
+  log_message (&msg->header, peer, cid);
+
   c = connection_get (cid);
-  fwd = check_message (message,
-                       minimum_size,
+  fwd = check_message (&msg->header,
                        cid,
                        c,
                        peer,
-                       pid);
+                       0);
 
   /* If something went wrong, discard message. */
   if (GNUNET_SYSERR == fwd)
   {
+    GNUNET_break_op (0);
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
 
   /* Is this message for us? */
   if (GCC_is_terminal (c, fwd))
   {
-    GNUNET_STATISTICS_update (stats, "# received encrypted", 1, GNUNET_NO);
-
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
+    GNUNET_STATISTICS_update (stats, "# received KX", 1, GNUNET_NO);
     if (NULL == c->t)
     {
-      GNUNET_break (GNUNET_NO != c->destroy);
-      return GNUNET_OK;
+      GNUNET_break (0);
+      return;
     }
-    GCT_handle_encrypted (c->t, message);
-    GCC_send_ack (c, fwd, GNUNET_NO);
+    GCT_handle_kx (c->t, &msg[1].header);
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
 
   /* Message not for us: forward to next hop */
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  not for us, retransmitting...\n");
   GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
-  GNUNET_assert (NULL == GCC_send_prebuilt_message (message, 0, 0, c, fwd,
+  GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
                                                     GNUNET_NO, NULL, NULL));
   GCC_check_connections ();
-  return GNUNET_OK;
 }
 
 
 /**
- * Generic handler for cadet network encrypted traffic.
+ * Handler for encrypted cadet network traffic (channel mgmt, data).
  *
- * @param peer Peer identity this notification is about.
- * @param msg Encrypted message.
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-static int
-handle_cadet_kx (const struct GNUNET_PeerIdentity *peer,
-                 const struct GNUNET_CADET_KX *msg)
+void
+GCC_handle_encrypted (struct CadetPeer *peer,
+                      const struct GNUNET_CADET_AX *msg)
 {
   const struct GNUNET_CADET_Hash* cid;
   struct CadetConnection *c;
-  size_t expected_size;
+  uint32_t pid;
   int fwd;
 
   GCC_check_connections ();
   cid = &msg->cid;
+  pid = ntohl (msg->pid);
   log_message (&msg->header, peer, cid);
 
-  expected_size = sizeof (struct GNUNET_CADET_KX)
-                  + sizeof (struct GNUNET_MessageHeader);
   c = connection_get (cid);
   fwd = check_message (&msg->header,
-                       expected_size,
                        cid,
                        c,
                        peer,
-                       0);
+                       pid);
 
   /* If something went wrong, discard message. */
   if (GNUNET_SYSERR == fwd)
-    return GNUNET_OK;
+  {
+    GNUNET_break_op (0);
+    GCC_check_connections ();
+    return;
+  }
 
   /* Is this message for us? */
   if (GCC_is_terminal (c, fwd))
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  message for us!\n");
-    GNUNET_STATISTICS_update (stats, "# received KX", 1, GNUNET_NO);
+    GNUNET_STATISTICS_update (stats, "# received encrypted", 1, GNUNET_NO);
+
     if (NULL == c->t)
     {
-      GNUNET_break (0);
-      return GNUNET_OK;
+      GNUNET_break (GNUNET_NO != c->destroy);
+      return;
     }
-    GCT_handle_kx (c->t, &msg[1].header);
+    GCT_handle_encrypted (c->t, &msg->header);
+    GCC_send_ack (c, fwd, GNUNET_NO);
     GCC_check_connections ();
-    return GNUNET_OK;
+    return;
   }
 
   /* Message not for us: forward to next hop */
@@ -2633,259 +2653,6 @@ handle_cadet_kx (const struct GNUNET_PeerIdentity *peer,
   GNUNET_assert (NULL == GCC_send_prebuilt_message (&msg->header, 0, 0, c, fwd,
                                                     GNUNET_NO, NULL, NULL));
   GCC_check_connections ();
-  return GNUNET_OK;
-}
-
-
-/**
- * Core handler for key exchange traffic (ephemeral key, ping, pong).
- *
- * @param cls Closure (unused).
- * @param message Message received.
- * @param peer Peer who sent the message.
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_kx (void *cls,
-               const struct GNUNET_PeerIdentity *peer,
-               const struct GNUNET_MessageHeader *message)
-{
-  GCC_check_connections ();
-  return handle_cadet_kx (peer, (struct GNUNET_CADET_KX *) message);
-}
-
-
-/**
- * Core handler for encrypted cadet network traffic (channel mgmt, data).
- *
- * @param cls Closure (unused).
- * @param message Message received.
- * @param peer Peer who sent the message.
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
-                      const struct GNUNET_MessageHeader *message)
-{
-  GCC_check_connections ();
-  return handle_cadet_encrypted (peer, message);
-}
-
-
-/**
- * Core handler for cadet network traffic point-to-point acks.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
-                const struct GNUNET_MessageHeader *message)
-{
-  struct GNUNET_CADET_ACK *msg;
-  struct CadetConnection *c;
-  struct CadetFlowControl *fc;
-  GNUNET_PEER_Id id;
-  uint32_t ack;
-  int fwd;
-
-  GCC_check_connections ();
-  msg = (struct GNUNET_CADET_ACK *) message;
-  log_message (message, peer, &msg->cid);
-  c = connection_get (&msg->cid);
-  if (NULL == c)
-  {
-    GNUNET_STATISTICS_update (stats,
-                              "# ack on unknown connection",
-                              1,
-                              GNUNET_NO);
-    send_broken_unknown (&msg->cid,
-                         &my_full_id,
-                         NULL,
-                         peer);
-    GCC_check_connections ();
-    return GNUNET_OK;
-  }
-
-  /* Is this a forward or backward ACK? */
-  id = GNUNET_PEER_search (peer);
-  if (GCP_get_short_id (get_next_hop (c)) == id)
-  {
-    fc = &c->fwd_fc;
-    fwd = GNUNET_YES;
-  }
-  else if (GCP_get_short_id (get_prev_hop (c)) == id)
-  {
-    fc = &c->bck_fc;
-    fwd = GNUNET_NO;
-  }
-  else
-  {
-    GNUNET_break_op (0);
-    return GNUNET_OK;
-  }
-
-  ack = ntohl (msg->ack);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, " %s ACK %u (was %u)\n",
-       GC_f2s (fwd), ack, fc->last_ack_recv);
-  if (GC_is_pid_bigger (ack, fc->last_ack_recv))
-    fc->last_ack_recv = ack;
-
-  /* Cancel polling if the ACK is big enough. */
-  if (NULL != fc->poll_task &&
-      GC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  Cancel poll\n");
-    GNUNET_SCHEDULER_cancel (fc->poll_task);
-    fc->poll_task = NULL;
-    fc->poll_time = GNUNET_TIME_UNIT_SECONDS;
-  }
-
-  connection_unlock_queue (c, fwd);
-  GCC_check_connections ();
-  return GNUNET_OK;
-}
-
-
-/**
- * Core handler for cadet network traffic point-to-point ack polls.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_poll (void *cls,
-                 const struct GNUNET_PeerIdentity *peer,
-                 const struct GNUNET_MessageHeader *message)
-{
-  struct GNUNET_CADET_Poll *msg;
-  struct CadetConnection *c;
-  struct CadetFlowControl *fc;
-  GNUNET_PEER_Id id;
-  uint32_t pid;
-  int fwd;
-
-  GCC_check_connections ();
-  msg = (struct GNUNET_CADET_Poll *) message;
-  log_message (message, peer, &msg->cid);
-  c = connection_get (&msg->cid);
-  if (NULL == c)
-  {
-    GNUNET_STATISTICS_update (stats, "# poll on unknown connection", 1,
-                              GNUNET_NO);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "POLL message on unknown connection %s!\n",
-         GNUNET_h2s (GC_h2hc (&msg->cid)));
-    send_broken_unknown (&msg->cid,
-                         &my_full_id,
-                         NULL,
-                         peer);
-    GCC_check_connections ();
-    return GNUNET_OK;
-  }
-
-  /* Is this a forward or backward ACK?
-   * Note: a poll should never be needed in a loopback case,
-   * since there is no possiblility of packet loss there, so
-   * this way of discerining FWD/BCK should not be a problem.
-   */
-  id = GNUNET_PEER_search (peer);
-  if (GCP_get_short_id (get_next_hop (c)) == id)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  FWD FC\n");
-    fc = &c->fwd_fc;
-  }
-  else if (GCP_get_short_id (get_prev_hop (c)) == id)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  BCK FC\n");
-    fc = &c->bck_fc;
-  }
-  else
-  {
-    GNUNET_break_op (0);
-    return GNUNET_OK;
-  }
-
-  pid = ntohl (msg->pid);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "  PID %u, OLD %u\n", pid, fc->last_pid_recv);
-  fc->last_pid_recv = pid;
-  fwd = fc == &c->bck_fc;
-  GCC_send_ack (c, fwd, GNUNET_YES);
-  GCC_check_connections ();
-
-  return GNUNET_OK;
-}
-
-
-/**
- * Send an ACK on the appropriate connection/channel, depending on
- * the direction and the position of the peer.
- *
- * @param c Which connection to send the hop-by-hop ACK.
- * @param fwd Is this a fwd ACK? (will go dest->root).
- * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
- */
-void
-GCC_send_ack (struct CadetConnection *c, int fwd, int force)
-{
-  unsigned int buffer;
-
-  GCC_check_connections ();
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "GCC send %s ACK on %s\n",
-       GC_f2s (fwd), GCC_2s (c));
-
-  if (NULL == c)
-  {
-    GNUNET_break (0);
-    return;
-  }
-
-  if (GNUNET_NO != c->destroy)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
-    GCC_check_connections ();
-    return;
-  }
-
-  /* Get available buffer space */
-  if (GCC_is_terminal (c, fwd))
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
-    buffer = GCT_get_channels_buffer (c->t);
-  }
-  else
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
-    buffer = GCC_get_buffer (c, fwd);
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
-  if (0 == buffer && GNUNET_NO == force)
-  {
-    GCC_check_connections ();
-    return;
-  }
-
-  /* Send available buffer space */
-  if (GCC_is_origin (c, fwd))
-  {
-    GNUNET_assert (NULL != c->t);
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
-    GCT_unchoke_channels (c->t);
-  }
-  else
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
-    send_ack (c, buffer, fwd, force);
-  }
-  GCC_check_connections ();
 }
 
 
@@ -2974,12 +2741,13 @@ GCC_shutdown (void)
  * Create a connection.
  *
  * @param cid Connection ID (either created locally or imposed remotely).
- * @param t Tunnel this connection belongs to (or NULL);
+ * @param t Tunnel this connection belongs to (or NULL for transit connections);
  * @param path Path this connection has to use (copy is made).
  * @param own_pos Own position in the @c path path.
  *
- * @return Newly created connection, NULL in case of error (own id not in path).
- */
+ * @return Newly created connection.
+ *         NULL in case of error: own id not in path, wrong neighbors, ...
+*/
 struct CadetConnection *
 GCC_new (const struct GNUNET_CADET_Hash *cid,
          struct CadetTunnel *t,
@@ -3036,6 +2804,14 @@ GCC_new (const struct GNUNET_CADET_Hash *cid,
 }
 
 
+/**
+ * Connection is no longer needed: destroy it.
+ *
+ * Cancels all pending traffic (including possible DESTROY messages), all
+ * maintenance tasks and removes the connection from neighbor peers and tunnel.
+ *
+ * @param c Connection to destroy.
+ */
 void
 GCC_destroy (struct CadetConnection *c)
 {
@@ -3428,6 +3204,7 @@ GCC_is_direct (struct CadetConnection *c)
  * @param message Message to send. Function makes a copy of it.
  *                If message is not hop-by-hop, decrements TTL of copy.
  * @param payload_type Type of payload, in case the message is encrypted.
+ * @param payload_id ID of the payload (PID, ACK, ...).
  * @param c Connection on which this message is transmitted.
  * @param fwd Is this a fwd message?
  * @param force Force the connection to accept the message (buffer overfill).
@@ -3446,7 +3223,7 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
 {
   struct CadetFlowControl *fc;
   struct CadetConnectionQueue *q;
-  void *data;
+  struct GNUNET_MessageHeader *copy;
   size_t size;
   uint16_t type;
   int droppable;
@@ -3460,8 +3237,8 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
   }
 
   size = ntohs (message->size);
-  data = GNUNET_malloc (size);
-  GNUNET_memcpy (data, message, size);
+  copy = GNUNET_malloc (size);
+  GNUNET_memcpy (copy, message, size);
   type = ntohs (message->type);
   LOG (GNUNET_ERROR_TYPE_INFO,
        "--> %s (%s %4u) on conn %s (%p) %s [%5u]\n",
@@ -3478,7 +3255,7 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
     struct GNUNET_CADET_ConnectionBroken  *bmsg;
 
     case GNUNET_MESSAGE_TYPE_CADET_AX:
-      axmsg = (struct GNUNET_CADET_AX *) data;
+      axmsg = (struct GNUNET_CADET_AX *) copy;
       axmsg->cid = c->id;
       LOG (GNUNET_ERROR_TYPE_DEBUG, "  Q_N+ %p %u\n", fc, fc->queue_n);
       LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid sent %u\n", fc->last_pid_sent);
@@ -3494,41 +3271,42 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
       break;
 
     case GNUNET_MESSAGE_TYPE_CADET_KX:
-      kmsg = (struct GNUNET_CADET_KX *) data;
+      kmsg = (struct GNUNET_CADET_KX *) copy;
       kmsg->cid = c->id;
       break;
 
     case GNUNET_MESSAGE_TYPE_CADET_ACK:
-      amsg = (struct GNUNET_CADET_ACK *) data;
+      amsg = (struct GNUNET_CADET_ACK *) copy;
       amsg->cid = c->id;
       LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack));
       droppable = GNUNET_NO;
       break;
 
     case GNUNET_MESSAGE_TYPE_CADET_POLL:
-      pmsg = (struct GNUNET_CADET_Poll *) data;
+      pmsg = (struct GNUNET_CADET_Poll *) copy;
       pmsg->cid = c->id;
       LOG (GNUNET_ERROR_TYPE_DEBUG, " POLL %u\n", ntohl (pmsg->pid));
       droppable = GNUNET_NO;
       break;
 
     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
-      dmsg = (struct GNUNET_CADET_ConnectionDestroy *) data;
+      dmsg = (struct GNUNET_CADET_ConnectionDestroy *) copy;
       dmsg->cid = c->id;
       break;
 
     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
-      bmsg = (struct GNUNET_CADET_ConnectionBroken *) data;
+      bmsg = (struct GNUNET_CADET_ConnectionBroken *) copy;
       bmsg->cid = c->id;
       break;
 
     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
     case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
+      GNUNET_break (0); /* Should've used specific functions. */
       break;
 
     default:
       GNUNET_break (0);
-      GNUNET_free (data);
+      GNUNET_free (copy);
       return NULL;
   }
 
@@ -3543,7 +3321,7 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
     {
       fc->queue_n--;
     }
-    GNUNET_free (data);
+    GNUNET_free (copy);
     return NULL; /* Drop this message */
   }
 
@@ -3553,12 +3331,14 @@ GCC_send_prebuilt_message (const struct GNUNET_MessageHeader *message,
 
   q = GNUNET_new (struct CadetConnectionQueue);
   q->forced = !droppable;
-  q->q = GCP_queue_add (get_hop (c, fwd), data, type, payload_type, payload_id,
-                        size, c, fwd, &conn_message_sent, q);
-  if (NULL == q->q)
+  q->peer_q = GCP_send (get_hop (c, fwd), copy,
+                        payload_type, payload_id,
+                        c, fwd,
+                        &conn_message_sent, q);
+  if (NULL == q->peer_q)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "dropping msg on %s, NULL q\n", GCC_2s (c));
-    GNUNET_free (data);
+    GNUNET_free (copy);
     GNUNET_free (q);
     GCC_check_connections ();
     return NULL;
@@ -3584,8 +3364,8 @@ GCC_cancel (struct CadetConnectionQueue *q)
 {
   LOG (GNUNET_ERROR_TYPE_DEBUG, "!  GCC cancel message\n");
 
-  /* queue destroy calls message_sent, which calls q->cont and frees q */
-  GCP_queue_destroy (q->q, GNUNET_YES, GNUNET_NO, 0);
+  /* send_cancel calls message_sent, which calls q->cont and frees q */
+  GCP_send_cancel (q->peer_q);
   GCC_check_connections ();
 }
 
@@ -3594,35 +3374,116 @@ GCC_cancel (struct CadetConnectionQueue *q)
  * Sends a CREATE CONNECTION message for a path to a peer.
  * Changes the connection and tunnel states if necessary.
  *
- * @param connection Connection to create.
+ * @param c Connection to create.
  */
 void
-GCC_send_create (struct CadetConnection *connection)
+GCC_send_create (struct CadetConnection *c)
 {
   enum CadetTunnelCState state;
   size_t size;
 
   GCC_check_connections ();
   size = sizeof (struct GNUNET_CADET_ConnectionCreate);
-  size += connection->path->length * sizeof (struct GNUNET_PeerIdentity);
+  size += c->path->length * sizeof (struct GNUNET_PeerIdentity);
+  {
+    /* Allocate message on the stack */
+    unsigned char cbuf[size];
+    struct GNUNET_CADET_ConnectionCreate *msg;
+    struct GNUNET_PeerIdentity *peers;
+
+    msg = (struct GNUNET_CADET_ConnectionCreate *) cbuf;
+    msg->header.size = htons (size);
+    msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
+    msg->cid = *GCC_get_id (c);
+    peers = (struct GNUNET_PeerIdentity *) &msg[1];
+    for (int i = 0; i < c->path->length; i++)
+    {
+      GNUNET_PEER_resolve (c->path->peers[i], peers++);
+    }
+    GNUNET_assert (NULL == c->maintenance_q);
+    c->maintenance_q = GCP_send (get_next_hop (c),
+                                 &msg->header,
+                                 GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0,
+                                 c, GNUNET_YES,
+                                 &conn_message_sent, NULL);
+  }
 
   LOG (GNUNET_ERROR_TYPE_INFO, "==> %s %19s on conn %s (%p) FWD [%5u]\n",
        GC_m2s (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE), "",
-       GCC_2s (connection), connection, size);
+       GCC_2s (c), c, size);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  C_P+ %p %u (create)\n",
-       connection, connection->pending_messages);
-  connection->pending_messages++;
-
-  connection->maintenance_q =
-    GCP_queue_add (get_next_hop (connection), NULL,
-                   GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, UINT16_MAX, 0,
-                   size, connection, GNUNET_YES, &conn_message_sent, NULL);
+         c, c->pending_messages);
+  c->pending_messages++;
 
-  state = GCT_get_cstate (connection->t);
+  state = GCT_get_cstate (c->t);
   if (CADET_TUNNEL_SEARCHING == state || CADET_TUNNEL_NEW == state)
-    GCT_change_cstate (connection->t, CADET_TUNNEL_WAITING);
-  if (CADET_CONNECTION_NEW == connection->state)
-    connection_change_state (connection, CADET_CONNECTION_SENT);
+    GCT_change_cstate (c->t, CADET_TUNNEL_WAITING);
+  if (CADET_CONNECTION_NEW == c->state)
+    connection_change_state (c, CADET_CONNECTION_SENT);
+  GCC_check_connections ();
+}
+
+
+/**
+ * Send an ACK on the appropriate connection/channel, depending on
+ * the direction and the position of the peer.
+ *
+ * @param c Which connection to send the hop-by-hop ACK.
+ * @param fwd Is this a fwd ACK? (will go dest->root).
+ * @param force Send the ACK even if suboptimal (e.g. requested by POLL).
+ */
+void
+GCC_send_ack (struct CadetConnection *c, int fwd, int force)
+{
+  unsigned int buffer;
+
+  GCC_check_connections ();
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "GCC send %s ACK on %s\n",
+       GC_f2s (fwd), GCC_2s (c));
+
+  if (NULL == c)
+  {
+    GNUNET_break (0);
+    return;
+  }
+
+  if (GNUNET_NO != c->destroy)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  being destroyed, why bother...\n");
+    GCC_check_connections ();
+    return;
+  }
+
+  /* Get available buffer space */
+  if (GCC_is_terminal (c, fwd))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from all channels\n");
+    buffer = GCT_get_channels_buffer (c->t);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  getting from one connection\n");
+    buffer = GCC_get_buffer (c, fwd);
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "  buffer available: %u\n", buffer);
+  if (0 == buffer && GNUNET_NO == force)
+  {
+    GCC_check_connections ();
+    return;
+  }
+
+  /* Send available buffer space */
+  if (GNUNET_YES == GCC_is_origin (c, fwd))
+  {
+    GNUNET_assert (NULL != c->t);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on channels...\n");
+    GCT_unchoke_channels (c->t);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "  sending on connection\n");
+    send_ack (c, buffer, fwd, force);
+  }
   GCC_check_connections ();
 }
 
index e96e2f24cd1cc1305917e70e66788fd5d05d3697..6302cd898a8bbf3a073420d2600638a20d2c2eed 100644 (file)
@@ -118,90 +118,86 @@ typedef void
 
 
 /**
- * Core handler for connection creation.
+ * Handler for connection creation.
  *
- * @param cls Closure (unused).
- * @param peer Sender (neighbor).
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_create (void *cls,
-                   const struct GNUNET_PeerIdentity *peer,
-                   const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_create (struct CadetPeer *peer,
+                   const struct GNUNET_CADET_ConnectionCreate *msg);
 
 
 /**
- * Core handler for path confirmations.
+ * Handler for connection confirmations.
  *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_confirm (void *cls,
-                    const struct GNUNET_PeerIdentity *peer,
-                    const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_confirm (struct CadetPeer *peer,
+                    const struct GNUNET_CADET_ConnectionACK *msg);
 
 
 /**
- * Core handler for notifications of broken paths
+ * Handler for notifications of broken connections.
  *
- * @param cls Closure (unused).
- * @param id Peer identity of sending neighbor.
- * @param message Message.
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_broken (void* cls,
-                   const struct GNUNET_PeerIdentity* id,
-                   const struct GNUNET_MessageHeader* message);
+void
+GCC_handle_broken (struct CadetPeer *peer,
+                   const struct GNUNET_CADET_ConnectionBroken *msg);
 
 /**
- * Core handler for tunnel destruction
+ * Handler for notifications of destroyed connections.
  *
- * @param cls Closure (unused).
- * @param peer Peer identity of sending neighbor.
- * @param message Message.
- *
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer,
-                    const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_destroy (struct CadetPeer *peer,
+                    const struct GNUNET_CADET_ConnectionDestroy *msg);
 
 /**
- * Core handler for key exchange traffic (ephemeral key, ping, pong).
+ * Handler for cadet network traffic hop-by-hop acks.
  *
- * @param cls Closure (unused).
- * @param message Message received.
- * @param peer Peer who sent the message.
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
+ */
+void
+GCC_handle_ack (struct CadetPeer *peer,
+                const struct GNUNET_CADET_ACK *msg);
+
+/**
+ * Handler for cadet network traffic hop-by-hop data counter polls.
  *
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_kx (void *cls, const struct GNUNET_PeerIdentity *peer,
-               const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_poll (struct CadetPeer *peer,
+                 const struct GNUNET_CADET_Poll *msg);
 
 /**
- * Core handler for encrypted cadet network traffic (channel mgmt, data).
+ * Handler for key exchange traffic (Axolotl KX).
  *
- * @param cls Closure (unused).
- * @param message Message received.
- * @param peer Peer who sent the message.
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
+ */
+void
+GCC_handle_kx (struct CadetPeer *peer,
+               const struct GNUNET_CADET_KX *msg);
+
+/**
+ * Handler for encrypted cadet network traffic (channel mgmt, data).
  *
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
+ * @param peer Message sender (neighbor).
+ * @param msg Message itself.
  */
-int
-GCC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer,
-                      const struct GNUNET_MessageHeader *message);
+void
+GCC_handle_encrypted (struct CadetPeer *peer,
+                      const struct GNUNET_CADET_AX *msg);
 
 /**
  * Core handler for axolotl key exchange traffic.
@@ -229,34 +225,6 @@ int
 GCC_handle_ax (void *cls, const struct GNUNET_PeerIdentity *peer,
                struct GNUNET_MessageHeader *message);
 
-/**
- * Core handler for cadet network traffic point-to-point acks.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- *
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer,
-                const struct GNUNET_MessageHeader *message);
-
-/**
- * Core handler for cadet network traffic point-to-point ack polls.
- *
- * @param cls closure
- * @param message message
- * @param peer peer identity this notification is about
- *
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
- */
-int
-GCC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer,
-                 const struct GNUNET_MessageHeader *message);
-
 /**
  * Core handler for cadet keepalives.
  *
@@ -301,11 +269,12 @@ GCC_shutdown (void);
  * Create a connection.
  *
  * @param cid Connection ID (either created locally or imposed remotely).
- * @param t Tunnel this connection belongs to (or NULL);
+ * @param t Tunnel this connection belongs to (or NULL for transit connections);
  * @param path Path this connection has to use (copy is made).
  * @param own_pos Own position in the @c path path.
  *
- * @return Newly created connection, NULL in case of error (own id not in path).
+ * @return Newly created connection.
+ *         NULL in case of error: own id not in path, wrong neighbors, ...
  */
 struct CadetConnection *
 GCC_new (const struct GNUNET_CADET_Hash *cid,
@@ -525,6 +494,7 @@ GCC_cancel (struct CadetConnectionQueue *q);
  * @param message Message to send. Function makes a copy of it.
  *                If message is not hop-by-hop, decrements TTL of copy.
  * @param payload_type Type of payload, in case the message is encrypted.
+ * @param payload_id ID of the payload (PID, ACK, ...).
  * @param c Connection on which this message is transmitted.
  * @param fwd Is this a fwd message?
  * @param force Force the connection to accept the message (buffer overfill).
index 303eaee867cfbc5f04c06420d9fb5a7bef5a7251..9be1224c1393863ebfb60ba085e07d5815ac4022 100644 (file)
@@ -720,8 +720,6 @@ show_peer_iterator (void *cls,
   struct CadetPeer *p = value;
   struct CadetTunnel *t;
 
-  GCP_debug (p, GNUNET_ERROR_TYPE_ERROR);
-
   t = GCP_get_tunnel (p);
   if (NULL != t)
     GCT_debug (t, GNUNET_ERROR_TYPE_ERROR);
index 64d9168fd1e8cde002c7c513c037182f35eab618..5ccd8f01453921399be6b3c499f087926c811ba5 100644 (file)
 /********************************   STRUCTS  **********************************/
 /******************************************************************************/
 
+
 /**
- * Struct containing info about a queued transmission to this peer
+ * Struct containing all information regarding a given peer
  */
-struct CadetPeerQueue
+struct CadetPeer
 {
   /**
-   * DLL next
+   * ID of the peer
    */
-  struct CadetPeerQueue *next;
+  GNUNET_PEER_Id id;
 
   /**
-   * DLL previous
+   * Last time we heard from this peer
    */
-  struct CadetPeerQueue *prev;
+  struct GNUNET_TIME_Absolute last_contact;
 
   /**
-   * Peer this transmission is directed to.
+   * Paths to reach the peer, ordered by ascending hop count
    */
-  struct CadetPeer *peer;
+  struct CadetPeerPath *path_head;
 
   /**
-   * Connection this message belongs to.
+   * Paths to reach the peer, ordered by ascending hop count
    */
-  struct CadetConnection *c;
+  struct CadetPeerPath *path_tail;
 
   /**
-   * Is FWD in c?
+   * Handle to stop the DHT search for paths to this peer
    */
-  int fwd;
+  struct GCD_search_handle *search_h;
 
   /**
-   * Pointer to info stucture used as cls.
+   * Handle to stop the DHT search for paths to this peer
    */
-  void *cls;
+  struct GNUNET_SCHEDULER_Task *search_delayed;
 
   /**
-   * Type of message
+   * Tunnel to this peer, if any.
    */
-  uint16_t type;
+  struct CadetTunnel *tunnel;
 
   /**
-   * Type of message
+   * Connections that go through this peer; indexed by tid.
    */
-  uint16_t payload_type;
+  struct GNUNET_CONTAINER_MultiHashMap *connections;
 
   /**
-   * Type of message
+   * Handle for core transmissions.
    */
-  uint32_t payload_id;
+  struct GNUNET_MQ_Handle *core_mq;
 
   /**
-   * Size of the message
+   * How many messages are in the queue to this peer.
    */
-  size_t size;
+  unsigned int queue_n;
 
   /**
-   * Set when this message starts waiting for CORE.
+   * Hello message.
    */
-  struct GNUNET_TIME_Absolute start_waiting;
+  struct GNUNET_HELLO_Message* hello;
 
   /**
-   * Function to call on sending.
+   * Handle to us offering the HELLO to the transport.
    */
-  GCP_sent cont;
+  struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer;
 
   /**
-   * Closure for callback.
+   * Handle to our ATS request asking ATS to suggest an address
+   * to TRANSPORT for this peer (to establish a direct link).
    */
-  void *cont_cls;
+  struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion;
+
 };
 
 
 /**
- * Struct containing all information regarding a given peer
+ * Information about a queued message on the peer level.
  */
-struct CadetPeer
-{
-  /**
-   * ID of the peer
-   */
-  GNUNET_PEER_Id id;
-
-  /**
-   * Last time we heard from this peer
-   */
-  struct GNUNET_TIME_Absolute last_contact;
+struct CadetPeerQueue {
 
   /**
-   * Paths to reach the peer, ordered by ascending hop count
-   */
-  struct CadetPeerPath *path_head;
-
-  /**
-   * Paths to reach the peer, ordered by ascending hop count
-   */
-  struct CadetPeerPath *path_tail;
-
-  /**
-   * Handle to stop the DHT search for paths to this peer
+   * Envelope to cancel message before MQ sends it.
    */
-  struct GCD_search_handle *search_h;
+  struct GNUNET_MQ_Envelope *env;
 
   /**
-   * Handle to stop the DHT search for paths to this peer
+   * Peer (neighbor) this message is being sent to.
    */
-  struct GNUNET_SCHEDULER_Task *search_delayed;
+  struct CadetPeer *peer;
 
   /**
-   * Tunnel to this peer, if any.
+   * Continuation to call to notify higher layers about message sent.
    */
-  struct CadetTunnel *tunnel;
+  GCP_sent cont;
 
   /**
-   * Connections that go through this peer; indexed by tid.
+   * Closure for @a cont.
    */
-  struct GNUNET_CONTAINER_MultiHashMap *connections;
+  void *cont_cls;
 
   /**
-   * Handle for queued transmissions
+   * Time when message was queued for sending.
    */
-  struct GNUNET_CORE_TransmitHandle *core_transmit;
+  struct GNUNET_TIME_Absolute queue_timestamp;
 
   /**
-   * Timestamp
+   * #GNUNET_YES if message was management traffic (POLL, ACK, ...).
    */
-  struct GNUNET_TIME_Absolute tmt_time;
+  int management_traffic;
 
   /**
-   * Transmission queue to core DLL head
+   * Message type.
    */
-  struct CadetPeerQueue *queue_head;
+  uint16_t type;
 
   /**
-   * Transmission queue to core DLL tail
+   * Message size.
    */
-  struct CadetPeerQueue *queue_tail;
+  uint16_t size;
 
   /**
-   * How many messages are in the queue to this peer.
+   * Type of the message's payload, if it was encrypted data.
    */
-  unsigned int queue_n;
+  uint16_t payload_type;
 
   /**
-   * Hello message.
+   *ID of the payload (PID, ACK #, ...).
    */
-  struct GNUNET_HELLO_Message* hello;
+  uint16_t payload_id;
 
   /**
-   * Handle to us offering the HELLO to the transport.
+   * Connection this message was sent on.
    */
-  struct GNUNET_TRANSPORT_OfferHelloHandle *hello_offer;
+  struct CadetConnection *c;
 
   /**
-   * Handle to our ATS request asking ATS to suggest an address
-   * to TRANSPORT for this peer (to establish a direct link).
+   * Direction in @a c this message was send on (#GNUNET_YES = FWD).
    */
-  struct GNUNET_ATS_ConnectivitySuggestHandle *connectivity_suggestion;
-
+  int c_fwd;
 };
 
 
@@ -260,98 +241,6 @@ static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
 static int in_shutdown;
 
 
-/******************************************************************************/
-/*****************************     DEBUG      *********************************/
-/******************************************************************************/
-
-/**
- * Log all kinds of info about the queueing status of a peer.
- *
- * @param p Peer whose queue to show.
- * @param level Error level to use for logging.
- */
-static void
-queue_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level)
-{
-  struct GNUNET_TIME_Relative core_wait_time;
-  struct CadetPeerQueue *q;
-  int do_log;
-
-  do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
-                                       "cadet-p2p",
-                                       __FILE__, __FUNCTION__, __LINE__);
-  if (0 == do_log)
-    return;
-
-  LOG2 (level, "QQQ Message queue towards %s\n", GCP_2s (p));
-  LOG2 (level, "QQQ  queue length: %u\n", p->queue_n);
-  LOG2 (level, "QQQ  core tmt rdy: %p\n", p->core_transmit);
-  if (NULL != p->core_transmit)
-  {
-    core_wait_time = GNUNET_TIME_absolute_get_duration (p->tmt_time);
-    LOG2 (level, "QQQ  core called %s ago\n",
-          GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_NO));
-  }
-  for (q = p->queue_head; NULL != q; q = q->next)
-  {
-    LOG2 (level, "QQQ  - %s %s on %s\n",
-         GC_m2s (q->type), GC_f2s (q->fwd), GCC_2s (q->c));
-    LOG2 (level, "QQQ    payload %s, %u\n",
-         GC_m2s (q->payload_type), q->payload_id);
-    LOG2 (level, "QQQ    size: %u bytes\n", q->size);
-  }
-
-  LOG2 (level, "QQQ End queue towards %s\n", GCP_2s (p));
-}
-
-
-/**
- * Log all kinds of info about a peer.
- *
- * @param peer Peer.
- */
-void
-GCP_debug (const struct CadetPeer *p, enum GNUNET_ErrorType level)
-{
-  struct CadetPeerPath *path;
-  unsigned int conns;
-  int do_log;
-
-  do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
-                                       "cadet-p2p",
-                                       __FILE__, __FUNCTION__, __LINE__);
-  if (0 == do_log)
-    return;
-
-  if (NULL == p)
-  {
-    LOG2 (level, "PPP DEBUG PEER NULL\n");
-    return;
-  }
-
-  LOG2 (level, "PPP DEBUG PEER %s\n", GCP_2s (p));
-  LOG2 (level, "PPP last contact %s\n",
-       GNUNET_STRINGS_absolute_time_to_string (p->last_contact));
-  for (path = p->path_head; NULL != path; path = path->next)
-  {
-    char *s;
-
-    s = path_2s (path);
-    LOG2 (level, "PPP path: %s\n", s);
-    GNUNET_free (s);
-  }
-
-  LOG2 (level, "PPP core transmit handle %p\n", p->core_transmit);
-  LOG2 (level, "PPP DHT GET handle %p\n", p->search_h);
-  conns = 0;
-  if (NULL != p->connections)
-    conns += GNUNET_CONTAINER_multihashmap_size (p->connections);
-  LOG2 (level, "PPP # connections over link to peer: %u\n", conns);
-  queue_debug (p, level);
-  LOG2 (level, "PPP DEBUG END\n");
-}
-
-
 /******************************************************************************/
 /*****************************  CORE HELPERS  *********************************/
 /******************************************************************************/
@@ -415,12 +304,16 @@ pop_direct_path (struct CadetPeer *peer)
 /**
  * Method called whenever a given peer connects.
  *
- * @param cls closure
- * @param peer peer identity this notification is about
+ * @param cls Core closure (unused).
+ * @param peer Peer identity this notification is about
+ * @param mq Message Queue to this peer.
+ *
+ * @return Internal closure for handlers (CadetPeer struct).
  */
-static void
-core_connect (void *cls,
-              const struct GNUNET_PeerIdentity *peer)
+static void *
+core_connect_handler (void *cls,
+                      const struct GNUNET_PeerIdentity *peer,
+                      struct GNUNET_MQ_Handle *mq)
 {
   struct CadetPeer *neighbor;
   struct CadetPeerPath *path;
@@ -431,6 +324,8 @@ core_connect (void *cls,
                    sizeof (own_id),
                    "%s",
                    GNUNET_i2s (&my_full_id));
+
+  /* Save a path to the neighbor */
   neighbor = GCP_get (peer, GNUNET_YES);
   if (myid == neighbor->id)
   {
@@ -448,11 +343,14 @@ core_connect (void *cls,
     path = path_new (2);
     path->peers[1] = neighbor->id;
     GNUNET_PEER_change_rc (neighbor->id, 1);
+    GNUNET_assert (NULL == neighbor->core_mq);
+    neighbor->core_mq = mq;
   }
   path->peers[0] = myid;
   GNUNET_PEER_change_rc (myid, 1);
   GCP_add_path (neighbor, path, GNUNET_YES);
 
+  /* Create the connections hashmap */
   GNUNET_assert (NULL == neighbor->connections);
   neighbor->connections = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO);
   GNUNET_STATISTICS_update (stats,
@@ -462,42 +360,47 @@ core_connect (void *cls,
 
   if ( (NULL != GCP_get_tunnel (neighbor)) &&
        (0 > GNUNET_CRYPTO_cmp_peer_identity (&my_full_id, peer)) )
+  {
     GCP_connect (neighbor);
+  }
   GCC_check_connections ();
+
+  return neighbor;
 }
 
 
 /**
  * Method called whenever a peer disconnects.
  *
- * @param cls closure
- * @param peer peer identity this notification is about
+ * @param cls Core closure (unused).
+ * @param peer Peer identity this notification is about.
+ * @param internal_cls Internal closure (CadetPeer struct).
  */
 static void
-core_disconnect (void *cls,
-                 const struct GNUNET_PeerIdentity *peer)
+core_disconnect_handler (void *cls,
+                         const struct GNUNET_PeerIdentity *peer,
+                         void *internal_cls)
 {
-  struct CadetPeer *p;
+  struct CadetPeer *p = internal_cls;
   struct CadetPeerPath *direct_path;
   char own_id[16];
 
   GCC_check_connections ();
   strncpy (own_id, GNUNET_i2s (&my_full_id), 16);
   own_id[15] = '\0';
-  p = GNUNET_CONTAINER_multipeermap_get (peers, peer);
-  if (NULL == p)
-  {
-    GNUNET_break (GNUNET_YES == in_shutdown);
-    return;
-  }
   if (myid == p->id)
+  {
     LOG (GNUNET_ERROR_TYPE_INFO,
          "DISCONNECTED %s (self)\n",
          own_id);
+  }
   else
+  {
     LOG (GNUNET_ERROR_TYPE_INFO,
          "DISCONNECTED %s <= %s\n",
          own_id, GNUNET_i2s (peer));
+    p->core_mq = NULL;
+  }
   direct_path = pop_direct_path (p);
   if (NULL != p->connections)
   {
@@ -507,12 +410,6 @@ core_disconnect (void *cls,
     GNUNET_CONTAINER_multihashmap_destroy (p->connections);
     p->connections = NULL;
   }
-  if (NULL != p->core_transmit)
-  {
-    GNUNET_CORE_notify_transmit_ready_cancel (p->core_transmit);
-    p->core_transmit = NULL;
-    p->tmt_time.abs_value_us = 0;
-  }
   GNUNET_STATISTICS_update (stats,
                             "# peers",
                             -1,
@@ -522,230 +419,349 @@ core_disconnect (void *cls,
 }
 
 
-/**
- * Functions to handle messages from core
- */
-static struct GNUNET_CORE_MessageHandler core_handlers[] = {
-  {&GCC_handle_create, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, 0},
-  {&GCC_handle_confirm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK,
-    sizeof (struct GNUNET_CADET_ConnectionACK)},
-  {&GCC_handle_broken, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
-    sizeof (struct GNUNET_CADET_ConnectionBroken)},
-  {&GCC_handle_destroy, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
-    sizeof (struct GNUNET_CADET_ConnectionDestroy)},
-  {&GCC_handle_ack, GNUNET_MESSAGE_TYPE_CADET_ACK,
-    sizeof (struct GNUNET_CADET_ACK)},
-  {&GCC_handle_poll, GNUNET_MESSAGE_TYPE_CADET_POLL,
-    sizeof (struct GNUNET_CADET_Poll)},
-  {&GCC_handle_kx, GNUNET_MESSAGE_TYPE_CADET_KX, 0},
-  {&GCC_handle_encrypted, GNUNET_MESSAGE_TYPE_CADET_AX, 0},
-  {NULL, 0, 0}
-};
-
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
 
 /**
- * To be called on core init/fail.
+ * Check if the create_connection message has the appropriate size.
  *
- * @param cls Closure (config)
- * @param identity the public identity of this peer
+ * @param cls Closure (unused).
+ * @param msg Message to check.
+ *
+ * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
  */
-static void
-core_init (void *cls,
-           const struct GNUNET_PeerIdentity *identity)
+static int
+check_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg)
 {
-  const struct GNUNET_CONFIGURATION_Handle *c = cls;
-  static int i = 0;
+  uint16_t size;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n");
-  if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id)))
+  size = ntohs (msg->header.size);
+  if (size < sizeof (*msg))
   {
-    LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n"));
-    LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (identity));
-    LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id));
-    GNUNET_CORE_disconnect (core_handle);
-    core_handle = GNUNET_CORE_connect (c, /* Main configuration */
-                                       NULL,      /* Closure passed to CADET functions */
-                                       &core_init,        /* Call core_init once connected */
-                                       &core_connect,     /* Handle connects */
-                                       &core_disconnect,  /* remove peers on disconnects */
-                                       NULL,      /* Don't notify about all incoming messages */
-                                       GNUNET_NO, /* For header only in notification */
-                                       NULL,      /* Don't notify about all outbound messages */
-                                       GNUNET_NO, /* For header-only out notification */
-                                       core_handlers);    /* Register these handlers */
-    if (10 < i++)
-      GNUNET_assert (0);
+    GNUNET_break_op (0);
+    return GNUNET_NO;
   }
-  GML_start ();
+  return GNUNET_YES;
 }
 
-
 /**
-  * Core callback to write a pre-constructed data packet to core buffer
-  *
-  * @param cls Closure (CadetTransmissionDescriptor with data in "data" member).
-  * @param size Number of bytes available in buf.
-  * @param buf Where the to write the message.
-  *
-  * @return number of bytes written to buf
-  */
-static size_t
-send_core_data_raw (void *cls, size_t size, void *buf)
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_create (void *cls, const struct GNUNET_CADET_ConnectionCreate *msg)
 {
-  struct GNUNET_MessageHeader *msg = cls;
-  size_t total_size;
-
-  GNUNET_assert (NULL != msg);
-  total_size = ntohs (msg->size);
-
-  if (total_size > size)
-  {
-    GNUNET_break (0);
-    return 0;
-  }
-  GNUNET_memcpy (buf, msg, total_size);
-  GNUNET_free (cls);
-  return total_size;
+  struct CadetPeer *peer = cls;
+  GCC_handle_create (peer, msg);
 }
 
 
 /**
- * Function to send a create connection message to a peer.
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK
  *
- * @param c Connection to create.
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
  */
-static size_t
-send_core_connection_create (struct CadetConnection *c, size_t size, void *buf)
+static void
+handle_confirm (void *cls, const struct GNUNET_CADET_ConnectionACK *msg)
 {
-  struct GNUNET_CADET_ConnectionCreate *msg;
-  struct GNUNET_PeerIdentity *peer_ptr;
-  const struct CadetPeerPath *p = GCC_get_path (c);
-  size_t size_needed;
-  int i;
-
-  if (NULL == p)
-    return 0;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION CREATE...\n");
-  size_needed =
-      sizeof (struct GNUNET_CADET_ConnectionCreate) +
-      p->length * sizeof (struct GNUNET_PeerIdentity);
-
-  if (size < size_needed || NULL == buf)
-  {
-    GNUNET_break (0);
-    return 0;
-  }
-  msg = (struct GNUNET_CADET_ConnectionCreate *) buf;
-  msg->header.size = htons (size_needed);
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
-  msg->cid = *GCC_get_id (c);
+  struct CadetPeer *peer = cls;
+  GCC_handle_confirm (peer, msg);
+}
 
-  peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1];
-  for (i = 0; i < p->length; i++)
-  {
-    GNUNET_PEER_resolve (p->peers[i], peer_ptr++);
-  }
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "CONNECTION CREATE (%u bytes long) sent!\n",
-       size_needed);
-  return size_needed;
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_broken (void *cls, const struct GNUNET_CADET_ConnectionBroken *msg)
+{
+  struct CadetPeer *peer = cls;
+  GCC_handle_broken (peer, msg);
 }
 
 
 /**
- * Creates a path ack message in buf and frees all unused resources.
- *
- * @param c Connection to send an ACK on.
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
  *
- * @return number of bytes written to buf
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
  */
-static size_t
-send_core_connection_ack (struct CadetConnection *c, size_t size, void *buf)
+static void
+handle_destroy (void *cls, const struct GNUNET_CADET_ConnectionDestroy *msg)
 {
-  struct GNUNET_CADET_ConnectionACK *msg = buf;
+  struct CadetPeer *peer = cls;
+  GCC_handle_destroy (peer, msg);
+}
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending CONNECTION ACK...\n");
-  if (sizeof (struct GNUNET_CADET_ConnectionACK) > size)
-  {
-    GNUNET_break (0);
-    return 0;
-  }
-  msg->header.size = htons (sizeof (struct GNUNET_CADET_ConnectionACK));
-  msg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK);
-  msg->cid = *GCC_get_id (c);
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "CONNECTION ACK sent!\n");
-  return sizeof (struct GNUNET_CADET_ConnectionACK);
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_ACK
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_ack (void *cls, const struct GNUNET_CADET_ACK *msg)
+{
+  struct CadetPeer *peer = cls;
+  GCC_handle_ack (peer, msg);
 }
 
 
-/******************************************************************************/
-/********************************   STATIC  ***********************************/
-/******************************************************************************/
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_POLL
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_poll (void *cls, const struct GNUNET_CADET_Poll *msg)
+{
+  struct CadetPeer *peer = cls;
+  GCC_handle_poll (peer, msg);
+}
 
 
 /**
- * Get priority for a queued message.
+ * Check if the Key eXchange message has the appropriate size.
  *
- * @param q Queued message
+ * @param cls Closure (unused).
+ * @param msg Message to check.
  *
- * @return CORE priority to use.
+ * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
  */
-static enum GNUNET_CORE_Priority
-get_priority (struct CadetPeerQueue *q)
+static int
+check_kx (void *cls, const struct GNUNET_CADET_KX *msg)
 {
-  enum GNUNET_CORE_Priority low;
-  enum GNUNET_CORE_Priority high;
+  uint16_t size;
+  uint16_t expected_size;
 
-  if (NULL == q)
-  {
-    GNUNET_break (0);
-    return GNUNET_CORE_PRIO_BACKGROUND;
-  }
+  size = ntohs (msg->header.size);
+  expected_size = sizeof (struct GNUNET_CADET_KX)
+                  + sizeof (struct GNUNET_MessageHeader);
 
-  /* Relayed traffic has lower priority, our own traffic has higher */
-  if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->fwd))
-  {
-    low = GNUNET_CORE_PRIO_BEST_EFFORT;
-    high = GNUNET_CORE_PRIO_URGENT;
-  }
-  else
+  if (size < expected_size)
   {
-    low = GNUNET_CORE_PRIO_URGENT;
-    high = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
+    GNUNET_break_op (0);
+    return GNUNET_NO;
   }
+  return GNUNET_YES;
+}
 
-  /* Bulky payload has lower priority, control traffic has higher. */
-  if (GNUNET_MESSAGE_TYPE_CADET_AX == q->type)
-    return low;
-  return high;
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_KX
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_kx (void *cls, const struct GNUNET_CADET_KX *msg)
+{
+  struct CadetPeer *peer = cls;
+  GCC_handle_kx (peer, msg);
 }
 
 
 /**
- * Destroy the peer_info and free any allocated resources linked to it
+ * Check if the encrypted message has the appropriate size.
  *
- * @param peer The peer_info to destroy.
- * @return #GNUNET_OK on success
+ * @param cls Closure (unused).
+ * @param msg Message to check.
+ *
+ * @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
  */
 static int
-peer_destroy (struct CadetPeer *peer)
+check_encrypted (void *cls, const struct GNUNET_CADET_AX *msg)
 {
-  struct GNUNET_PeerIdentity id;
-  struct CadetPeerPath *p;
-  struct CadetPeerPath *nextp;
+  uint16_t size;
+  uint16_t minimum_size;
 
-  GNUNET_PEER_resolve (peer->id, &id);
-  GNUNET_PEER_change_rc (peer->id, -1);
+  size = ntohs (msg->header.size);
+  minimum_size = sizeof (struct GNUNET_CADET_AX)
+                 + sizeof (struct GNUNET_MessageHeader);
 
-  LOG (GNUNET_ERROR_TYPE_INFO,
+  if (size < minimum_size)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_NO;
+  }
+  return GNUNET_YES;
+}
+
+/**
+ * Handle for #GNUNET_MESSAGE_TYPE_CADET_AX (AXolotl encrypted traffic).
+ *
+ * @param cls Closure (CadetPeer for neighbor that sent the message).
+ * @param msg Message itself.
+ */
+static void
+handle_encrypted (void *cls, const struct GNUNET_CADET_AX *msg)
+{
+  struct CadetPeer *peer = cls;
+  GCC_handle_encrypted (peer, msg);
+}
+
+
+/**
+ * To be called on core init/fail.
+ *
+ * @param cls Closure (config)
+ * @param identity The public identity of this peer.
+ */
+static void
+core_init_notify (void *cls,
+                  const struct GNUNET_PeerIdentity *identity);
+
+
+static void
+connect_to_core (const struct GNUNET_CONFIGURATION_Handle *c)
+{
+  struct GNUNET_MQ_MessageHandler core_handlers[] = {
+    GNUNET_MQ_hd_var_size (create,
+                           GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
+                           struct GNUNET_CADET_ConnectionCreate,
+                           NULL),
+    GNUNET_MQ_hd_fixed_size (confirm,
+                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK,
+                             struct GNUNET_CADET_ConnectionACK,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (broken,
+                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
+                             struct GNUNET_CADET_ConnectionBroken,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (destroy,
+                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
+                             struct GNUNET_CADET_ConnectionDestroy,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (ack,
+                             GNUNET_MESSAGE_TYPE_CADET_ACK,
+                             struct GNUNET_CADET_ACK,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (poll,
+                             GNUNET_MESSAGE_TYPE_CADET_POLL,
+                             struct GNUNET_CADET_Poll,
+                             NULL),
+    GNUNET_MQ_hd_var_size (kx,
+                           GNUNET_MESSAGE_TYPE_CADET_KX,
+                           struct GNUNET_CADET_KX,
+                           NULL),
+    GNUNET_MQ_hd_var_size (encrypted,
+                           GNUNET_MESSAGE_TYPE_CADET_AX,
+                           struct GNUNET_CADET_AX,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
+    core_handle = GNUNET_CORE_connecT (c, NULL,
+                                     &core_init_notify,
+                                     &core_connect_handler,
+                                     &core_disconnect_handler,
+                                     core_handlers);
+}
+
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+/******************************************************************************/
+
+/**
+ * To be called on core init/fail.
+ *
+ * @param cls Closure (config)
+ * @param identity The public identity of this peer.
+ */
+static void
+core_init_notify (void *cls,
+                  const struct GNUNET_PeerIdentity *core_identity)
+{
+  const struct GNUNET_CONFIGURATION_Handle *c = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Core init\n");
+  if (0 != memcmp (core_identity, &my_full_id, sizeof (my_full_id)))
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n"));
+    LOG (GNUNET_ERROR_TYPE_ERROR, " core id %s\n", GNUNET_i2s (core_identity));
+    LOG (GNUNET_ERROR_TYPE_ERROR, " my id %s\n", GNUNET_i2s (&my_full_id));
+    GNUNET_CORE_disconnecT (core_handle);
+    connect_to_core (c);
+    return;
+  }
+  GML_start ();
+}
+
+
+/******************************************************************************/
+/********************************   STATIC  ***********************************/
+/******************************************************************************/
+
+
+/**
+ * Get priority for a queued message.
+ *
+ * @param q Queued message
+ *
+ * @return CORE priority to use.
+ *
+ * FIXME make static
+ * FIXME use when sending
+ */
+enum GNUNET_CORE_Priority
+get_priority (struct CadetPeerQueue *q)
+{
+  enum GNUNET_CORE_Priority low;
+  enum GNUNET_CORE_Priority high;
+
+  if (NULL == q)
+  {
+    GNUNET_break (0);
+    return GNUNET_CORE_PRIO_BACKGROUND;
+  }
+
+  /* Relayed traffic has lower priority, our own traffic has higher */
+  if (NULL == q->c || GNUNET_NO == GCC_is_origin (q->c, q->c_fwd))
+  {
+    low = GNUNET_CORE_PRIO_BEST_EFFORT;
+    high = GNUNET_CORE_PRIO_URGENT;
+  }
+  else
+  {
+    low = GNUNET_CORE_PRIO_URGENT;
+    high = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
+  }
+
+  /* Bulky payload has lower priority, control traffic has higher. */
+  if (GNUNET_MESSAGE_TYPE_CADET_AX == q->type)
+    return low;
+  return high;
+}
+
+
+/**
+ * Destroy the peer_info and free any allocated resources linked to it
+ *
+ * @param peer The peer_info to destroy.
+ * @return #GNUNET_OK on success
+ */
+static int
+peer_destroy (struct CadetPeer *peer)
+{
+  struct GNUNET_PeerIdentity id;
+  struct CadetPeerPath *p;
+  struct CadetPeerPath *nextp;
+
+  GNUNET_PEER_resolve (peer->id, &id);
+  GNUNET_PEER_change_rc (peer->id, -1);
+
+  LOG (GNUNET_ERROR_TYPE_INFO,
        "destroying peer %s\n",
        GNUNET_i2s (&id));
 
@@ -784,20 +800,6 @@ peer_destroy (struct CadetPeer *peer)
     GNUNET_ATS_connectivity_suggest_cancel (peer->connectivity_suggestion);
     peer->connectivity_suggestion = NULL;
   }
-  while (NULL != peer->queue_head)
-  {
-    /* This function destroys the current peer->queue_head but
-     * replaces it with the next in the queue, so it is correct
-     * to while() here.
-     */
-    GCP_queue_destroy (peer->queue_head, GNUNET_YES, GNUNET_NO, 0);
-  }
-  if (NULL != peer->core_transmit)
-  {
-    GNUNET_break (0); /* GCP_queue_destroy should've cancelled it! */
-    GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
-    peer->core_transmit = NULL;
-  }
 
   GNUNET_free_non_null (peer->hello);
   GNUNET_free (peer);
@@ -831,7 +833,6 @@ shutdown_peer (void *cls,
 }
 
 
-
 /**
  * Check if peer is searching for a path (either active or delayed search).
  *
@@ -995,64 +996,6 @@ peer_get_best_path (const struct CadetPeer *peer)
 }
 
 
-/**
- * Is this queue element sendable?
- *
- * - All management traffic is always sendable.
- * - For payload traffic, check the connection flow control.
- *
- * @param q Queue element to inspect.
- * @return #GNUNET_YES if it is sendable, #GNUNET_NO otherwise.
- */
-static int
-queue_is_sendable (struct CadetPeerQueue *q)
-{
-  /* Is PID-independent? */
-  switch (q->type)
-  {
-    case GNUNET_MESSAGE_TYPE_CADET_ACK:
-    case GNUNET_MESSAGE_TYPE_CADET_POLL:
-    case GNUNET_MESSAGE_TYPE_CADET_KX:
-    case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
-    case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
-    case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
-    case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
-      return GNUNET_YES;
-
-    case GNUNET_MESSAGE_TYPE_CADET_AX:
-      break;
-
-    default:
-      GNUNET_break (0);
-  }
-
-  return GCC_is_sendable (q->c, q->fwd);
-}
-
-
-/**
- * Get first sendable message.
- *
- * @param peer The destination peer.
- *
- * @return First transmittable message, if any. Otherwise, NULL.
- */
-static struct CadetPeerQueue *
-peer_get_first_message (const struct CadetPeer *peer)
-{
-  struct CadetPeerQueue *q;
-
-  for (q = peer->queue_head; NULL != q; q = q->next)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Checking q:%p on c:%s\n", q, GCC_2s (q->c));
-    if (queue_is_sendable (q))
-      return q;
-  }
-
-  return NULL;
-}
-
-
 /**
  * Function to process paths received for a new peer addition. The recorded
  * paths form the initial tunnel, which can be optimized later.
@@ -1089,19 +1032,6 @@ search_handler (void *cls, const struct CadetPeerPath *path)
 }
 
 
-/**
- * Adjust core requested size to accomodate an ACK.
- *
- * @param message_size Requested size.
- *
- * @return Size enough to fit @c message_size and an ACK.
- */
-static size_t
-get_core_size (size_t message_size)
-{
-  return message_size + sizeof (struct GNUNET_CADET_ACK);
-}
-
 /**
  * Test if a message type is connection management traffic
  * or regular payload traffic.
@@ -1118,86 +1048,14 @@ is_connection_management (uint16_t type)
 }
 
 
-/**
- * Fill a core buffer with the appropriate data for the queued message.
- *
- * @param queue Queue element for the message.
- * @param buf Core buffer to fill.
- * @param size Size remaining in @c buf.
- * @param[out] pid In case its an encrypted payload, set payload.
- *
- * @return Bytes written to @c buf.
- */
-static size_t
-fill_buf (struct CadetPeerQueue *queue, void *buf, size_t size, uint32_t *pid)
-{
-  struct CadetConnection *c = queue->c;
-  size_t msg_size;
-
-  switch (queue->type)
-  {
-    case GNUNET_MESSAGE_TYPE_CADET_AX:
-      *pid = GCC_get_pid (queue->c, queue->fwd);
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  ax payload ID %u\n", *pid);
-      msg_size = send_core_data_raw (queue->cls, size, buf);
-      ((struct GNUNET_CADET_AX *) buf)->pid = htonl (*pid);
-      break;
-    case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
-    case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
-    case GNUNET_MESSAGE_TYPE_CADET_KX:
-    case GNUNET_MESSAGE_TYPE_CADET_ACK:
-    case GNUNET_MESSAGE_TYPE_CADET_POLL:
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  raw %s\n", GC_m2s (queue->type));
-      msg_size = send_core_data_raw (queue->cls, size, buf);
-      break;
-    case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  path create\n");
-      if (GCC_is_origin (c, GNUNET_YES))
-        msg_size = send_core_connection_create (c, size, buf);
-      else
-        msg_size = send_core_data_raw (queue->cls, size, buf);
-      break;
-    case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  path ack\n");
-      if (GCC_is_origin (c, GNUNET_NO) ||
-          GCC_is_origin (c, GNUNET_YES))
-      {
-        msg_size = send_core_connection_ack (c, size, buf);
-      }
-      else
-      {
-        msg_size = send_core_data_raw (queue->cls, size, buf);
-      }
-      break;
-    case GNUNET_MESSAGE_TYPE_CADET_DATA:
-    case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
-    case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY:
-      /* This should be encapsulted */
-      msg_size = 0;
-      GNUNET_assert (0);
-      break;
-    default:
-      GNUNET_break (0);
-      LOG (GNUNET_ERROR_TYPE_WARNING, "  type unknown: %u\n", queue->type);
-      msg_size = 0;
-  }
-
-  GNUNET_assert (size >= msg_size);
-
-  return msg_size;
-}
-
-
 /**
  * Debug function should NEVER return true in production code, useful to
  * simulate losses for testcases.
  *
- * @param q Queue handle with info about the message.
- *
  * @return #GNUNET_YES or #GNUNET_NO with the decision to drop.
  */
 static int
-should_I_drop (struct CadetPeerQueue *q)
+should_I_drop (void)
 {
   if (0 == drop_percent)
     return GNUNET_NO;
@@ -1209,297 +1067,87 @@ should_I_drop (struct CadetPeerQueue *q)
 }
 
 
-/**
- * Core callback to write a queued packet to core buffer
- *
- * @param cls Closure (peer info).
- * @param size Number of bytes available in buf.
- * @param buf Where the to write the message.
- *
- * @return number of bytes written to buf
- */
-static size_t
-queue_send (void *cls, size_t size, void *buf)
-{
-  struct CadetPeer *peer = cls;
-  struct CadetConnection *c;
-  struct CadetPeerQueue *queue;
-  struct GNUNET_TIME_Relative core_wait_time;
-  const char *wait_s;
-  const struct GNUNET_PeerIdentity *dst_id;
-  size_t msg_size;
-  size_t total_size;
-  size_t rest;
-  char *dst;
-  uint32_t pid;
-
-  GCC_check_connections ();
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue send towards %s (max %u)\n",
-       GCP_2s (peer), size);
-
-  /* Sanity checking */
-  if (NULL == buf || 0 == size)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  not allowed/\n");
-    if (GNUNET_NO == in_shutdown)
-    {
-      queue = peer_get_first_message (peer);
-      if (NULL == queue)
-      {
-        peer->core_transmit = NULL;
-        peer->tmt_time.abs_value_us = 0;
-        GCC_check_connections ();
-        return 0;
-      }
-      dst_id = GNUNET_PEER_resolve2 (peer->id);
-      peer->core_transmit =
-          GNUNET_CORE_notify_transmit_ready (core_handle,
-                                             GNUNET_NO, get_priority (queue),
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             dst_id,
-                                             get_core_size (queue->size),
-                                             &queue_send,
-                                             peer);
-      peer->tmt_time = GNUNET_TIME_absolute_get ();
-    }
-    else
-    {
-      peer->core_transmit = NULL;
-      peer->tmt_time.abs_value_us = 0;
-    }
-    GCC_check_connections ();
-    return 0;
-  }
-
-  /* Init */
-  rest = size;
-  total_size = 0;
-  dst = (char *) buf;
-  pid = 0;
-  peer->core_transmit = NULL;
-  queue = peer_get_first_message (peer);
-  if (NULL == queue)
-  {
-    GNUNET_break (0); /* Core tmt_rdy should've been canceled */
-    peer->tmt_time.abs_value_us = 0;
-    return 0;
-  }
-  core_wait_time = GNUNET_TIME_absolute_get_duration (peer->tmt_time);
-  wait_s = GNUNET_STRINGS_relative_time_to_string (core_wait_time, GNUNET_YES);
-  if (core_wait_time.rel_value_us >= 1000000)
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-         " %s: core wait time %s (> 1 second) for %u bytes\n",
-         GCP_2s (peer), wait_s, queue->size);
-  }
-  peer->tmt_time.abs_value_us = 0;
-
-  /* Copy all possible messages to the core buffer */
-  while (NULL != queue && rest >= queue->size)
-  {
-    c = queue->c;
-
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  on conn %s %s\n",
-         GCC_2s (c), GC_f2s(queue->fwd));
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  size %u ok (%u/%u)\n",
-         queue->size, total_size, size);
-
-    msg_size = fill_buf (queue, (void *) dst, size, &pid);
-
-    if (should_I_drop (queue))
-    {
-      LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n",
-           GC_m2s (queue->type), GC_m2s (queue->payload_type),
-           queue->payload_id, GCC_2s (c), GC_f2s (queue->fwd));
-      msg_size = 0;
-    }
-    else
-    {
-      LOG (GNUNET_ERROR_TYPE_INFO,
-           ">>> %s (%s %4u) on conn %s (%p) %s [%5u], after %s\n",
-           GC_m2s (queue->type), GC_m2s (queue->payload_type),
-           queue->payload_id, GCC_2s (c), c,
-           GC_f2s (queue->fwd), msg_size, wait_s);
-    }
-    total_size += msg_size;
-    rest -= msg_size;
-    dst = &dst[msg_size];
-    msg_size = 0;
-
-    /* Free queue, but cls was freed by send_core_* in fill_buf. */
-    (void) GCP_queue_destroy (queue, GNUNET_NO, GNUNET_YES, pid);
-
-    /* Next! */
-    queue = peer_get_first_message (peer);
-  }
-
-  /* If more data in queue, send next */
-  if (NULL != queue)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  more data! (%u)\n", queue->size);
-    if (NULL == peer->core_transmit)
-    {
-      dst_id = GNUNET_PEER_resolve2 (peer->id);
-      peer->core_transmit =
-          GNUNET_CORE_notify_transmit_ready (core_handle,
-                                             GNUNET_NO, get_priority (queue),
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             dst_id,
-                                             get_core_size (queue->size),
-                                             &queue_send,
-                                             peer);
-      peer->tmt_time = GNUNET_TIME_absolute_get ();
-      queue->start_waiting = GNUNET_TIME_absolute_get ();
-    }
-    else
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "*   tmt rdy called somewhere else\n");
-    }
-//     GCC_start_poll (); FIXME needed?
-  }
-  else
-  {
-//     GCC_stop_poll(); FIXME needed?
-  }
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "  return %d\n", total_size);
-  queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
-  GCC_check_connections ();
-  return total_size;
-}
-
-
 /******************************************************************************/
 /********************************    API    ***********************************/
 /******************************************************************************/
 
-
 /**
- * Free a transmission that was already queued with all resources
- * associated to the request.
+ * Call the continuation after a message has been sent or dropped.
  *
- * If connection was marked to be destroyed, and this was the last queued
- * message on it, the connection will be free'd as a result.
- *
- * @param queue Queue handler to cancel.
- * @param clear_cls Is it necessary to free associated cls?
- * @param sent Was it really sent? (Could have been canceled)
- * @param pid PID, if relevant (was sent and was a payload message).
- *
- * @return #GNUNET_YES if connection was destroyed as a result,
- *         #GNUNET_NO otherwise.
+ * @param q Queue handle.
+ * @param sent #GNUNET_YES if was sent to CORE, #GNUNET_NO if dropped.
  */
-int
-GCP_queue_destroy (struct CadetPeerQueue *queue,
-                   int clear_cls,
-                   int sent,
-                   uint32_t pid)
+static void
+call_peer_cont (const struct CadetPeerQueue *q, int sent)
 {
-  struct CadetPeer *peer;
-  int connection_destroyed;
-
-  GCC_check_connections ();
-  peer = queue->peer;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "queue destroy %s\n", GC_m2s (queue->type));
-  if (GNUNET_YES == clear_cls)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, " free cls\n");
-    switch (queue->type)
-    {
-      case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
-        LOG (GNUNET_ERROR_TYPE_INFO, "destroying a DESTROY message\n");
-        /* fall through */
-      case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
-      case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
-      case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
-      case GNUNET_MESSAGE_TYPE_CADET_KX:
-      case GNUNET_MESSAGE_TYPE_CADET_AX:
-      case GNUNET_MESSAGE_TYPE_CADET_ACK:
-      case GNUNET_MESSAGE_TYPE_CADET_POLL:
-        GNUNET_free_non_null (queue->cls);
-        break;
-
-      default:
-        GNUNET_break (0);
-        LOG (GNUNET_ERROR_TYPE_ERROR, " type %s unknown!\n",
-             GC_m2s (queue->type));
-    }
-  }
-  GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue);
-
-  if (!is_connection_management (queue->type))
-  {
-    peer->queue_n--;
-  }
-
-  if (NULL != queue->cont)
+  LOG (GNUNET_ERROR_TYPE_DEBUG, " core mq just sent %s\n", GC_m2s (q->type));
+  if (NULL != q->cont)
   {
     struct GNUNET_TIME_Relative wait_time;
 
-    wait_time = GNUNET_TIME_absolute_get_duration (queue->start_waiting);
+    wait_time = GNUNET_TIME_absolute_get_duration (q->queue_timestamp);
     LOG (GNUNET_ERROR_TYPE_DEBUG, " calling callback, time elapsed %s\n",
          GNUNET_STRINGS_relative_time_to_string (wait_time, GNUNET_NO));
-    connection_destroyed = queue->cont (queue->cont_cls,
-                                        queue->c, sent, queue->type, pid,
-                                        queue->fwd, queue->size, wait_time);
-  }
-  else
-  {
-    connection_destroyed = GNUNET_NO;
+    q->cont (q->cont_cls,
+             q->c, q->c_fwd, sent,
+             q->type, q->payload_type, q->payload_id,
+             q->size, wait_time);
   }
+}
+
+
+/**
+ * Function called by MQ when a message is sent to CORE.
+ *
+ * @param cls Closure (queue handle).
+ */
+static void
+mq_sent (void *cls)
+{
+  struct CadetPeerQueue *q = cls;
 
-  if (NULL == peer_get_first_message (peer) && NULL != peer->core_transmit)
+  if (GNUNET_NO == q->management_traffic)
   {
-    GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
-    peer->core_transmit = NULL;
-    peer->tmt_time.abs_value_us = 0;
+    q->peer->queue_n--;
   }
-
-  GNUNET_free (queue);
-  GCC_check_connections ();
-  return connection_destroyed;
+  call_peer_cont (q, GNUNET_YES);
+  GNUNET_free (q);
 }
 
 
 /**
- * @brief Queue and pass message to core when possible.
+ * @brief Send a message to another peer (using CORE).
  *
  * @param peer Peer towards which to queue the message.
- * @param cls Closure (@c type dependant). It will be used by queue_send to
- *            build the message to be sent if not already prebuilt.
- * @param type Type of the message.
- * @param payload_type Type of the message's payload
+ * @param message Message to send.
+ * @param payload_type Type of the message's payload, for debug messages.
  *                     0 if the message is a retransmission (unknown payload).
  *                     UINT16_MAX if the message does not have payload.
  * @param payload_id ID of the payload (MID, ACK #, etc)
- * @param size Size of the message.
  * @param c Connection this message belongs to (can be NULL).
  * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!)
- * @param cont Continuation to be called once CORE has taken the message.
+ * @param cont Continuation to be called once CORE has sent the message.
  * @param cont_cls Closure for @c cont.
  *
- * @return Handle to cancel the message before it is sent. Once cont is called
- *         message has been sent and therefore the handle is no longer valid.
+ * @return A handle to the message in the queue or NULL (if dropped).
  */
 struct CadetPeerQueue *
-GCP_queue_add (struct CadetPeer *peer,
-               void *cls,
-               uint16_t type,
-               uint16_t payload_type,
-               uint32_t payload_id,
-               size_t size,
-               struct CadetConnection *c,
-               int fwd,
-               GCP_sent cont,
-               void *cont_cls)
+GCP_send (struct CadetPeer *peer,
+          const struct GNUNET_MessageHeader *message,
+          uint16_t payload_type,
+          uint32_t payload_id,
+          struct CadetConnection *c,
+          int fwd,
+          GCP_sent cont,
+          void *cont_cls)
 {
   struct CadetPeerQueue *q;
-  int priority;
-  int call_core;
+  uint16_t type;
+  uint16_t size;
 
   GCC_check_connections ();
+  type = ntohs (message->type);
+  size = ntohs (message->size);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "que %s (%s %4u) on conn %s (%p) %s towards %s (size %u)\n",
        GC_m2s (type), GC_m2s (payload_type), payload_id,
@@ -1508,282 +1156,68 @@ GCP_queue_add (struct CadetPeer *peer,
   if (NULL == peer->connections)
   {
     /* We are not connected to this peer, ignore request. */
+    GNUNET_break (0);
     LOG (GNUNET_ERROR_TYPE_INFO, "%s not a neighbor\n", GCP_2s (peer));
     GNUNET_STATISTICS_update (stats, "# messages dropped due to wrong hop", 1,
                               GNUNET_NO);
     return NULL;
   }
 
-  priority = 0;
-  if (is_connection_management (type))
-  {
-    priority = 100;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority);
-
-  call_core = (NULL == c || GNUNET_MESSAGE_TYPE_CADET_KX == type) ?
-               GNUNET_YES : GCC_is_sendable (c, fwd);
   q = GNUNET_new (struct CadetPeerQueue);
-  q->cls = cls;
+  q->env = GNUNET_MQ_msg_copy (message);
+  q->peer = peer;
+  q->cont = cont;
+  q->cont_cls = cont_cls;
+  q->queue_timestamp = GNUNET_TIME_absolute_get ();
+  q->management_traffic = is_connection_management (type);
   q->type = type;
+  q->size = size;
   q->payload_type = payload_type;
   q->payload_id = payload_id;
-  q->size = size;
-  q->peer = peer;
   q->c = c;
-  q->fwd = fwd;
-  q->cont = cont;
-  q->cont_cls = cont_cls;
-  if (100 > priority)
-  {
-    GNUNET_CONTAINER_DLL_insert_tail (peer->queue_head, peer->queue_tail, q);
-    peer->queue_n++;
-  }
-  else
-  {
-    GNUNET_CONTAINER_DLL_insert (peer->queue_head, peer->queue_tail, q);
-    call_core = GNUNET_YES;
-  }
+  q->c_fwd = fwd;
+  GNUNET_MQ_notify_sent (q->env, mq_sent, q);
 
-  q->start_waiting = GNUNET_TIME_absolute_get ();
-  if (NULL == peer->core_transmit && GNUNET_YES == call_core)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "calling core tmt rdy towards %s for %u bytes\n",
-         GCP_2s (peer), size);
-    peer->core_transmit =
-        GNUNET_CORE_notify_transmit_ready (core_handle,
-                                           GNUNET_NO, get_priority (q),
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           GNUNET_PEER_resolve2 (peer->id),
-                                           get_core_size (size),
-                                           &queue_send, peer);
-    peer->tmt_time = GNUNET_TIME_absolute_get ();
-  }
-  else if (GNUNET_NO == call_core)
+  if (GNUNET_YES == q->management_traffic)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s not needed\n",
-         GCP_2s (peer));
-
+    GNUNET_MQ_send (peer->core_mq, q->env);  // FIXME implement "_urgent", use
   }
   else
   {
-    struct GNUNET_TIME_Relative elapsed;
-    elapsed = GNUNET_TIME_absolute_get_duration (peer->tmt_time);
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "core tmt rdy towards %s already called %s\n",
-         GCP_2s (peer),
-         GNUNET_STRINGS_relative_time_to_string (elapsed, GNUNET_NO));
-
-  }
-  queue_debug (peer, GNUNET_ERROR_TYPE_DEBUG);
-  GCC_check_connections ();
-  return q;
-}
-
-
-/**
- * Cancel all queued messages to a peer that belong to a certain connection.
- *
- * @param peer Peer towards whom to cancel.
- * @param c Connection whose queued messages to cancel. Might be destroyed by
- *          the sent continuation call.
- */
-void
-GCP_queue_cancel (struct CadetPeer *peer,
-                  struct CadetConnection *c)
-{
-  struct CadetPeerQueue *q;
-  struct CadetPeerQueue *next;
-  struct CadetPeerQueue *prev;
-  int connection_destroyed;
-
-  GCC_check_connections ();
-  connection_destroyed = GNUNET_NO;
-  for (q = peer->queue_head; NULL != q; q = next)
-  {
-    prev = q->prev;
-    if (q->c == c)
+    if (GNUNET_YES == should_I_drop())
     {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "GMP queue cancel %s\n",
-           GC_m2s (q->type));
-      GNUNET_assert (GNUNET_NO == connection_destroyed);
-      if (GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY == q->type)
-      {
-        q->c = NULL;
-      }
-      else
-      {
-        connection_destroyed = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0);
-      }
-
-      /* Get next from prev, q->next might be already freed:
-       * queue destroy -> callback -> GCC_destroy -> cancel_queues -> here
-       */
-      if (NULL == prev)
-        next = peer->queue_head;
-      else
-        next = prev->next;
-    }
-    else
-    {
-      next = q->next;
-    }
-  }
-
-  if ( (NULL == peer->queue_head) &&
-       (NULL != peer->core_transmit) )
-  {
-    GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
-    peer->core_transmit = NULL;
-    peer->tmt_time.abs_value_us = 0;
-  }
-  GCC_check_connections ();
-}
-
-
-/**
- * Get the first transmittable message for a connection.
- *
- * @param peer Neighboring peer.
- * @param c Connection.
- *
- * @return First transmittable message.
- */
-static struct CadetPeerQueue *
-connection_get_first_message (struct CadetPeer *peer, struct CadetConnection *c)
-{
-  struct CadetPeerQueue *q;
-
-  for (q = peer->queue_head; NULL != q; q = q->next)
-  {
-    if (q->c != c)
-      continue;
-    if (queue_is_sendable (q))
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "  sendable!!\n");
-      return q;
+      LOG (GNUNET_ERROR_TYPE_WARNING, "DD %s (%s %u) on conn %s %s\n",
+           GC_m2s (q->type), GC_m2s (q->payload_type),
+           q->payload_id, GCC_2s (c), GC_f2s (q->c_fwd));
+      GNUNET_MQ_discard (q->env);
+      call_peer_cont (q, GNUNET_NO);
+      GNUNET_free (q);
+      return NULL;
     }
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  not sendable\n");
+    GNUNET_MQ_send (peer->core_mq, q->env);
+    peer->queue_n++;
   }
 
-  return NULL;
-}
-
-
-/**
- * Get the first message for a connection and unqueue it.
- *
- * Only tunnel (or higher) level messages are unqueued. Connection specific
- * messages are silently destroyed upon encounter.
- *
- * @param peer Neighboring peer.
- * @param c Connection.
- * @param destroyed[in/out] Was the connection destroyed (prev/as a result)?.
- *                          Can NOT be NULL.
- *
- * @return First message for this connection.
- */
-struct GNUNET_MessageHeader *
-GCP_connection_pop (struct CadetPeer *peer,
-                    struct CadetConnection *c,
-                    int *destroyed)
-{
-  struct CadetPeerQueue *q;
-  struct CadetPeerQueue *next;
-  struct GNUNET_MessageHeader *msg;
-  int dest;
-
   GCC_check_connections ();
-  GNUNET_assert (NULL != destroyed);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_pop on conn %p\n", c);
-  for (q = peer->queue_head; NULL != q; q = next)
-  {
-    next = q->next;
-    if (q->c != c)
-      continue;
-    LOG (GNUNET_ERROR_TYPE_DEBUG, " - queued: %s (%s %u), cont: %p\n",
-         GC_m2s (q->type), GC_m2s (q->payload_type), q->payload_id,
-         q->cont);
-    switch (q->type)
-    {
-      case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE:
-      case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_ACK:
-      case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
-      case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
-      case GNUNET_MESSAGE_TYPE_CADET_ACK:
-      case GNUNET_MESSAGE_TYPE_CADET_POLL:
-        dest = GCP_queue_destroy (q, GNUNET_YES, GNUNET_NO, 0);
-        if (GNUNET_YES == dest)
-        {
-          GNUNET_break (GNUNET_NO == *destroyed);
-          *destroyed = GNUNET_YES;
-        }
-        continue;
-
-      case GNUNET_MESSAGE_TYPE_CADET_KX:
-      case GNUNET_MESSAGE_TYPE_CADET_AX:
-      case GNUNET_MESSAGE_TYPE_CADET_AX_KX:
-        msg = (struct GNUNET_MessageHeader *) q->cls;
-        dest = GCP_queue_destroy (q, GNUNET_NO, GNUNET_NO, 0);
-        if (GNUNET_YES == dest)
-        {
-          GNUNET_break (GNUNET_NO == *destroyed);
-          *destroyed = GNUNET_YES;
-        }
-        return msg;
-
-      default:
-        GNUNET_break (0);
-        LOG (GNUNET_ERROR_TYPE_DEBUG, "Unknown message %s\n", GC_m2s (q->type));
-    }
-  }
-  GCC_check_connections ();
-  return NULL;
+  return q;
 }
 
 
 /**
- * Unlock a possibly locked queue for a connection.
+ * Cancel sending a message. Message must have been sent with
+ * #GCP_send before.  May not be called after the notify sent
+ * callback has been called.
  *
- * If there is a message that can be sent on this connection, call core for it.
- * Otherwise (if core transmit is already called or there is no sendable
- * message) do nothing.
+ * It DOES call the continuation given to #GCP_send.
  *
- * @param peer Peer who keeps the queue.
- * @param c Connection whose messages to unlock.
+ * @param q Queue handle to cancel
  */
 void
-GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c)
+GCP_send_cancel (struct CadetPeerQueue *q)
 {
-  struct CadetPeerQueue *q;
-  size_t size;
-
-  GCC_check_connections ();
-  if (NULL != peer->core_transmit)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  already unlocked!\n");
-    return; /* Already unlocked */
-  }
-
-  q = connection_get_first_message (peer, c);
-  if (NULL == q)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "  queue empty!\n");
-    return; /* Nothing to transmit */
-  }
-
-  size = q->size;
-  peer->core_transmit =
-      GNUNET_CORE_notify_transmit_ready (core_handle,
-                                         GNUNET_NO, get_priority (q),
-                                         GNUNET_TIME_UNIT_FOREVER_REL,
-                                         GNUNET_PEER_resolve2 (peer->id),
-                                         get_core_size (size),
-                                         &queue_send,
-                                         peer);
-  peer->tmt_time = GNUNET_TIME_absolute_get ();
-  GCC_check_connections ();
+  call_peer_cont (q, GNUNET_NO);
+  GNUNET_MQ_send_cancel (q->env);
+  GNUNET_free (q);
 }
 
 
@@ -1824,23 +1258,12 @@ GCP_init (const struct GNUNET_CONFIGURATION_Handle *c)
     LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
   }
   ats_ch = GNUNET_ATS_connectivity_init (c);
-  core_handle = GNUNET_CORE_connect (c, /* Main configuration */
-                                     NULL,      /* Closure passed to CADET functions */
-                                     &core_init,        /* Call core_init once connected */
-                                     &core_connect,     /* Handle connects */
-                                     &core_disconnect,  /* remove peers on disconnects */
-                                     NULL,      /* Don't notify about all incoming messages */
-                                     GNUNET_NO, /* For header only in notification */
-                                     NULL,      /* Don't notify about all outbound messages */
-                                     GNUNET_NO, /* For header-only out notification */
-                                     core_handlers);    /* Register these handlers */
+  connect_to_core (c);
   if (NULL == core_handle)
   {
     GNUNET_break (0);
     GNUNET_SCHEDULER_shutdown ();
-    return;
   }
-
 }
 
 
@@ -1853,13 +1276,10 @@ GCP_shutdown (void)
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Shutting down peer subsystem\n");
   in_shutdown = GNUNET_YES;
-  GNUNET_CONTAINER_multipeermap_iterate (peers,
-                                         &shutdown_peer,
-                                         NULL);
   if (NULL != core_handle)
   {
-    GNUNET_CORE_disconnect (core_handle);
-    core_handle = NULL;
+    GNUNET_CORE_disconnecT (core_handle);
+        core_handle = NULL;
   }
   if (NULL != ats_ch)
   {
@@ -1867,6 +1287,12 @@ GCP_shutdown (void)
     ats_ch = NULL;
   }
   GNUNET_PEER_change_rc (myid, -1);
+  /* With MQ API, CORE calls the disconnect handler for every peer
+   * after calling GNUNET_CORE_disconnecT, shutdown must occur *after* that.
+   */
+  GNUNET_CONTAINER_multipeermap_iterate (peers,
+                                         &shutdown_peer,
+                                         NULL);
   GNUNET_CONTAINER_multipeermap_destroy (peers);
   peers = NULL;
 }
@@ -2054,7 +1480,6 @@ GCP_is_neighbor (const struct CadetPeer *peer)
   }
 
   /* Is not a neighbor but connections is not NULL, probably disconnecting */
-  GNUNET_break (0);
   return GNUNET_NO;
 }
 
@@ -2254,7 +1679,8 @@ GCP_add_path_to_all (const struct CadetPeerPath *p, int confirmed)
 {
   unsigned int i;
 
-  /* TODO: invert and add */
+  /* TODO: invert and add to origin */
+  /* TODO: replace all "GCP_add_path" with this, make the other one static */
   GCC_check_connections ();
   for (i = 0; i < p->length && p->peers[i] != myid; i++) /* skip'em */ ;
   for (i++; i < p->length; i++)
index 950c68fb63d514b6fb86936aad6608e904ab7dd5..093cfa21aa94ff61283fac3bc6baf05c2e21fa04 100644 (file)
@@ -47,7 +47,7 @@ extern "C"
 struct CadetPeer;
 
 /**
- * Struct containing info about a queued transmission to this peer
+ * Handle to queued messages on a peer level.
  */
 struct CadetPeerQueue;
 
@@ -59,18 +59,19 @@ struct CadetPeerQueue;
  *
  * @param cls Closure.
  * @param c Connection this message was on.
+ * @param fwd Was this a FWD going message?
  * @param sent Was it really sent? (Could have been canceled)
  * @param type Type of message sent.
- * @param pid Packet ID, or 0 if not applicable (create, destroy, etc).
- * @param fwd Was this a FWD going message?
+ * @param payload_type Type of payload, if applicable.
+ * @param pid Message ID, or 0 if not applicable (create, destroy, etc).
  * @param size Size of the message.
  * @param wait Time spent waiting for core (only the time for THIS message)
- * @return #GNUNET_YES if connection was destroyed, #GNUNET_NO otherwise.
  */
-typedef int
+typedef void
 (*GCP_sent) (void *cls,
-             struct CadetConnection *c, int sent,
-             uint16_t type, uint32_t pid, int fwd, size_t size,
+             struct CadetConnection *c, int fwd, int sent,
+             uint16_t type, uint16_t payload_type, uint32_t pid,
+             size_t size,
              struct GNUNET_TIME_Relative wait);
 
 /**
@@ -146,97 +147,40 @@ void
 GCP_connect (struct CadetPeer *peer);
 
 /**
- * Free a transmission that was already queued with all resources
- * associated to the request.
- *
- * If connection was marked to be destroyed, and this was the last queued
- * message on it, the connection will be free'd as a result.
- *
- * @param queue Queue handler to cancel.
- * @param clear_cls Is it necessary to free associated cls?
- * @param sent Was it really sent? (Could have been canceled)
- * @param pid PID, if relevant (was sent and was a payload message).
- *
- * @return #GNUNET_YES if connection was destroyed as a result,
- *         #GNUNET_NO otherwise.
- */
-int
-GCP_queue_destroy (struct CadetPeerQueue *queue, int clear_cls,
-                   int sent, uint32_t pid);
-
-/**
- * @brief Queue and pass message to core when possible.
+ * @brief Send a message to another peer (using CORE).
  *
  * @param peer Peer towards which to queue the message.
- * @param cls Closure (@c type dependant). It will be used by queue_send to
- *            build the message to be sent if not already prebuilt.
- * @param type Type of the message.
- * @param payload_type Type of the message's payload
+ * @param message Message to send.
+ * @param payload_type Type of the message's payload, for debug messages.
  *                     0 if the message is a retransmission (unknown payload).
  *                     UINT16_MAX if the message does not have payload.
  * @param payload_id ID of the payload (MID, ACK #, etc)
- * @param size Size of the message.
  * @param c Connection this message belongs to (can be NULL).
  * @param fwd Is this a message going root->dest? (FWD ACK are NOT FWD!)
- * @param cont Continuation to be called once CORE has taken the message.
+ * @param cont Continuation to be called once CORE has sent the message.
  * @param cont_cls Closure for @c cont.
- *
- * @return Handle to cancel the message before it is sent. Once cont is called
- *         message has been sent and therefore the handle is no longer valid.
  */
 struct CadetPeerQueue *
-GCP_queue_add (struct CadetPeer *peer,
-               void *cls,
-               uint16_t type,
-               uint16_t payload_type,
-               uint32_t payload_id,
-               size_t size,
-               struct CadetConnection *c,
-               int fwd,
-               GCP_sent cont,
-               void *cont_cls);
-
-/**
- * Cancel all queued messages to a peer that belong to a certain connection.
- *
- * @param peer Peer towards whom to cancel.
- * @param c Connection whose queued messages to cancel. Might be destroyed by
- *          the sent continuation call.
- */
-void
-GCP_queue_cancel (struct CadetPeer *peer, struct CadetConnection *c);
-
-/**
- * Get the first message for a connection and unqueue it.
- *
- * Only tunnel (or higher) level messages are unqueued. Connection specific
- * messages are silently destroyed upon encounter.
- *
- * @param peer Neighboring peer.
- * @param c Connection.
- * @param destroyed[in/out] Was the connection destroyed as a result?.
- *                          Can NOT be NULL.
- *
- *
- * @return First message for this connection.
- */
-struct GNUNET_MessageHeader *
-GCP_connection_pop (struct CadetPeer *peer,
-                    struct CadetConnection *c,
-                    int *destroyed);
+GCP_send (struct CadetPeer *peer,
+          const struct GNUNET_MessageHeader *message,
+          uint16_t payload_type,
+          uint32_t payload_id,
+          struct CadetConnection *c,
+          int fwd,
+          GCP_sent cont,
+          void *cont_cls);
 
 /**
- * Unlock a possibly locked queue for a connection.
+ * Cancel sending a message. Message must have been sent with
+ * #GCP_send before.  May not be called after the notify sent
+ * callback has been called.
  *
- * If there is a message that can be sent on this connection, call core for it.
- * Otherwise (if core transmit is already called or there is no sendable
- * message) do nothing.
+ * It does NOT call the continuation given to #GCP_send.
  *
- * @param peer Peer who keeps the queue.
- * @param c Connection whose messages to unlock.
+ * @param q Queue handle to cancel
  */
 void
-GCP_queue_unlock (struct CadetPeer *peer, struct CadetConnection *c);
+GCP_send_cancel (struct CadetPeerQueue *q);
 
 /**
  * Set tunnel.
index 0ede4a8861a918fddf03fa31dccfc62fabc06799..e60c3c0239b70d2b4f074ef17902cb0462b4e32f 100644 (file)
@@ -1600,7 +1600,6 @@ send_kx (struct CadetTunnel *t,
     {
       GNUNET_break (0);
       GCT_debug (t, GNUNET_ERROR_TYPE_ERROR);
-      GCP_debug (t->peer, GNUNET_ERROR_TYPE_ERROR);
     }
     return NULL;
   }
@@ -2245,6 +2244,10 @@ GCT_handle_encrypted (struct CadetTunnel *t,
  *
  * @param t Tunnel on which the message came.
  * @param message Payload of KX message.
+ *
+ * FIXME: not needed anymore
+ *  - substitute with call to kx_ax
+ *  - eliminate encapsulation
  */
 void
 GCT_handle_kx (struct CadetTunnel *t,
@@ -3366,34 +3369,6 @@ GCT_send_ax_kx (struct CadetTunnel *t, int force_reply)
 }
 
 
-/**
- * Sends an already built and encrypted message on a tunnel, choosing the best
- * connection. Useful for re-queueing messages queued on a destroyed connection.
- *
- * @param message Message to send. Function modifies it.
- * @param t Tunnel on which this message is transmitted.
- */
-void
-GCT_resend_message (const struct GNUNET_MessageHeader *message,
-                    struct CadetTunnel *t)
-{
-  struct CadetConnection *c;
-  int fwd;
-
-  c = tunnel_get_connection (t);
-  if (NULL == c)
-  {
-    /* TODO queue in tunnel, marked as encrypted */
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "No connection available, dropping.\n");
-    return;
-  }
-  fwd = GCC_is_origin (c, GNUNET_YES);
-  GNUNET_break (NULL == GCC_send_prebuilt_message (message, UINT16_MAX, 0,
-                                                   c, fwd,
-                                                   GNUNET_YES, NULL, NULL));
-}
-
-
 /**
  * Is the tunnel directed towards the local peer?
  *
index ca553a7d359b1908509081f8557cac426fa28a6a..8d65cbebd08cbb729b1a6b2a840fdcea4d75dc8a 100644 (file)
@@ -503,18 +503,6 @@ void
 GCT_send_ax_kx (struct CadetTunnel *t, int force_reply);
 
 
-/**
- * Sends an already built and encrypted message on a tunnel, choosing the best
- * connection. Useful for re-queueing messages queued on a destroyed connection.
- *
- * @param message Message to send. Function modifies it.
- * @param t Tunnel on which this message is transmitted.
- */
-void
-GCT_resend_message (const struct GNUNET_MessageHeader *message,
-                    struct CadetTunnel *t);
-
-
 /**
  * Is the tunnel directed towards the local peer?
  *