much work on connection/route/peer-level queue management
authorChristian Grothoff <christian@grothoff.org>
Thu, 19 Jan 2017 14:52:22 +0000 (15:52 +0100)
committerChristian Grothoff <christian@grothoff.org>
Thu, 19 Jan 2017 14:52:22 +0000 (15:52 +0100)
src/cadet/gnunet-service-cadet-new.h
src/cadet/gnunet-service-cadet-new_connection.c
src/cadet/gnunet-service-cadet-new_connection.h
src/cadet/gnunet-service-cadet-new_core.c
src/cadet/gnunet-service-cadet-new_paths.c
src/cadet/gnunet-service-cadet-new_paths.h
src/cadet/gnunet-service-cadet-new_peer.c
src/cadet/gnunet-service-cadet-new_peer.h
src/cadet/gnunet-service-cadet-new_tunnels.c

index 862a0f08846c0c0661d8a79fe422aed640da48a9..9f4667e23e4e5bb3120a11a0578b6b581f1d606e 100644 (file)
@@ -105,7 +105,44 @@ struct CadetPeerPathEntry
 /**
  * Entry in list of connections used by tunnel, with metadata.
  */
-struct CadetTConnection;
+struct CadetTConnection
+{
+  /**
+   * Next in DLL.
+   */
+  struct CadetTConnection *next;
+
+  /**
+   * Prev in DLL.
+   */
+  struct CadetTConnection *prev;
+
+  /**
+   * Connection handle.
+   */
+  struct CadetConnection *cc;
+
+  /**
+   * Tunnel this connection belongs to.
+   */
+  struct CadetTunnel *t;
+
+  /**
+   * Creation time, to keep oldest connection alive.
+   */
+  struct GNUNET_TIME_Absolute created;
+
+  /**
+   * Connection throughput, to keep fastest connection alive.
+   */
+  uint32_t throughput;
+
+  /**
+   * Is the connection currently ready for transmission?
+   */
+  int is_ready;
+};
+
 
 /**
  * Active path through the network (used by a tunnel).  There may
index 5123f9d458223612242d3833849408be03a70c7a..bf88d78e15d353f0f7a77a809ada9a85c7150a18 100644 (file)
  * @author Christian Grothoff
  *
  * TODO:
- * - congestion control
- * - GCC_debug()
  * - keepalive messages
- * - performance metrics
- * - back-off reset
+ * - keep performance metrics (?)
  */
 #include "platform.h"
 #include "gnunet-service-cadet-new_channel.h"
@@ -64,19 +61,16 @@ enum CadetConnectionState
   CADET_CONNECTION_SENT,
 
   /**
-   * Connection confirmed, ready to carry traffic.
+   * We are an inbound connection, and received a CREATE. Need to
+   * send an CREATE_ACK back.
    */
-  CADET_CONNECTION_READY,
+  CADET_CONNECTION_CREATE_RECEIVED,
 
   /**
-   * Connection to be destroyed, just waiting to empty queues.
+   * Connection confirmed, ready to carry traffic.
    */
-  CADET_CONNECTION_DESTROYED,
+  CADET_CONNECTION_READY
 
-  /**
-   * Connection to be destroyed because of a distant peer, same as DESTROYED.
-   */
-  CADET_CONNECTION_BROKEN
 };
 
 
@@ -111,11 +105,6 @@ struct CadetConnection
    */
   struct GNUNET_MQ_Envelope *env;
 
-  /**
-   * Message queue to the first hop, or NULL if we have no connection yet.
-   */
-  struct GNUNET_MQ_Handle *mq;
-
   /**
    * Handle for calling #GCP_request_mq_cancel() once we are finished.
    */
@@ -129,7 +118,7 @@ struct CadetConnection
   /**
    * Function to call once we are ready to transmit.
    */
-  GNUNET_SCHEDULER_TaskCallback ready_cb;
+  GCC_ReadyCallback ready_cb;
 
   /**
    * Closure for @e ready_cb.
@@ -151,22 +140,12 @@ struct CadetConnection
    */
   unsigned int off;
 
-};
-
+  /**
+   * Are we ready to transmit via @e mq_man right now?
+   */
+  int mqm_ready;
 
-/**
- * Is the given connection currently ready for transmission?
- *
- * @param cc connection to transmit on
- * @return #GNUNET_YES if we could transmit
- */
-int
-GCC_is_ready (struct CadetConnection *cc)
-{
-  return ( (NULL != cc->mq) &&
-           (CADET_CONNECTION_READY == cc->state) &&
-           (NULL == cc->env) ) ? GNUNET_YES : GNUNET_NO;
-}
+};
 
 
 /**
@@ -177,29 +156,19 @@ GCC_is_ready (struct CadetConnection *cc)
 void
 GCC_destroy (struct CadetConnection *cc)
 {
-  if (NULL != cc->env)
-  {
-    if (NULL != cc->mq)
-      GNUNET_MQ_send_cancel (cc->env);
-    else
-      GNUNET_MQ_discard (cc->env);
-    cc->env = NULL;
-  }
-  if ( (NULL != cc->mq) &&
-       (CADET_CONNECTION_SENDING_CREATE != cc->state) )
+  struct GNUNET_MQ_Envelope *env = NULL;
+
+  if (CADET_CONNECTION_SENDING_CREATE != cc->state)
   {
-    /* Need to notify next hop that we are down. */
-    struct GNUNET_MQ_Envelope *env;
     struct GNUNET_CADET_ConnectionDestroyMessage *destroy_msg;
 
+    /* Need to notify next hop that we are down. */
     env = GNUNET_MQ_msg (destroy_msg,
                          GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY);
     destroy_msg->cid = cc->cid;
-    GNUNET_MQ_send (cc->mq,
-                    env);
   }
-  cc->mq = NULL;
-  GCP_request_mq_cancel (cc->mq_man);
+  GCP_request_mq_cancel (cc->mq_man,
+                         env);
   cc->mq_man = NULL;
   GCPP_del_connection (cc->path,
                        cc->off,
@@ -234,14 +203,20 @@ GCC_get_ct (struct CadetConnection *cc)
 void
 GCC_handle_connection_ack (struct CadetConnection *cc)
 {
-  GNUNET_SCHEDULER_cancel (cc->task);
-#if FIXME
+  if (NULL != cc->task)
+  {
+    GNUNET_SCHEDULER_cancel (cc->task);
+    cc->task = NULL;
+  }
+#if FIXME_KEEPALIVE
   cc->task = GNUNET_SCHEDULER_add_delayed (cc->keepalive_period,
                                            &send_keepalive,
                                            cc);
 #endif
   cc->state = CADET_CONNECTION_READY;
-  cc->ready_cb (cc->ready_cb_cls);
+  if (GNUNET_YES == cc->mqm_ready)
+    cc->ready_cb (cc->ready_cb_cls,
+                  GNUNET_YES);
 }
 
 
@@ -255,6 +230,12 @@ void
 GCC_handle_kx (struct CadetConnection *cc,
                const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
 {
+  if (CADET_CONNECTION_SENT == cc->state)
+  {
+    /* We didn't get the CREATE_ACK, but instead got payload. That's fine,
+       clearly something is working, so pretend we got an ACK. */
+    GCC_handle_connection_ack (cc);
+  }
   GCT_handle_kx (cc->ct,
                  msg);
 }
@@ -270,6 +251,12 @@ void
 GCC_handle_encrypted (struct CadetConnection *cc,
                       const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
 {
+  if (CADET_CONNECTION_SENT == cc->state)
+  {
+    /* We didn't get the CREATE_ACK, but instead got payload. That's fine,
+       clearly something is working, so pretend we got an ACK. */
+    GCC_handle_connection_ack (cc);
+  }
   GCT_handle_encrypted (cc->ct,
                         msg);
 }
@@ -281,37 +268,40 @@ GCC_handle_encrypted (struct CadetConnection *cc,
  * @param cls the `struct CadetConnection` to initiate
  */
 static void
-send_create (void *cls);
-
-
-/**
- * We finished transmission of the create message, now wait for
- * ACK or retransmit.
- *
- * @param cls the `struct CadetConnection` that sent the create message
- */
-static void
-transmit_create_done_cb (void *cls)
+send_create (void *cls)
 {
   struct CadetConnection *cc = cls;
+  struct GNUNET_CADET_ConnectionCreateMessage *create_msg;
+  struct GNUNET_PeerIdentity *pids;
+  struct GNUNET_MQ_Envelope *env;
+  unsigned int path_length;
 
+  cc->task = NULL;
+  GNUNET_assert (GNUNET_YES == cc->mqm_ready);
+  path_length = GCPP_get_length (cc->path);
+  env = GNUNET_MQ_msg_extra (create_msg,
+                             path_length * sizeof (struct GNUNET_PeerIdentity),
+                             GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE);
+  create_msg->cid = cc->cid;
+  pids = (struct GNUNET_PeerIdentity *) &create_msg[1];
+  for (unsigned int i=0;i<path_length;i++)
+    pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path,
+                                                    i));
+  cc->env = env;
+  cc->mqm_ready = GNUNET_NO;
   cc->state = CADET_CONNECTION_SENT;
-  cc->env = NULL;
-  /* FIXME: at some point, we need to reset the delay back to 0! */
-  cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay);
-  cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay,
-                                           &send_create,
-                                           cc);
+  GCP_send (cc->mq_man,
+            env);
 }
 
 
 /**
- * Send a CREATE message to the first hop.
+ * Send a CREATE_ACK message towards the origin.
  *
  * @param cls the `struct CadetConnection` to initiate
  */
 static void
-send_create (void *cls)
+send_create_ack (void *cls)
 {
   struct CadetConnection *cc = cls;
   struct GNUNET_CADET_ConnectionCreateMessage *create_msg;
@@ -320,7 +310,7 @@ send_create (void *cls)
   unsigned int path_length;
 
   cc->task = NULL;
-  GNUNET_assert (NULL != cc->mq);
+  GNUNET_assert (GNUNET_YES == cc->mqm_ready);
   path_length = GCPP_get_length (cc->path);
   env = GNUNET_MQ_msg_extra (create_msg,
                              path_length * sizeof (struct GNUNET_PeerIdentity),
@@ -331,11 +321,42 @@ send_create (void *cls)
     pids[i] = *GCP_get_id (GCPP_get_peer_at_offset (cc->path,
                                                     i));
   cc->env = env;
-  GNUNET_MQ_notify_sent (env,
-                         &transmit_create_done_cb,
-                         cc);
-  GNUNET_MQ_send (cc->mq,
-                  env);
+  cc->mqm_ready = GNUNET_NO;
+  cc->state = CADET_CONNECTION_READY;
+  GCP_send (cc->mq_man,
+            env);
+}
+
+
+/**
+ * We got a #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE for a
+ * connection that we already have.  Either our ACK got lost
+ * or something is fishy.  Consider retransmitting the ACK.
+ *
+ * @param cc connection that got the duplicate CREATE
+ */
+void
+GCC_handle_duplicate_create (struct CadetConnection *cc)
+{
+  if (GNUNET_YES == cc->mqm_ready)
+  {
+    /* Tell tunnel that we are not ready for transmission anymore
+       (until CREATE_ACK is done) */
+    cc->ready_cb (cc->ready_cb_cls,
+                  GNUNET_NO);
+
+    /* Revert back to the state of having only received the 'CREATE',
+       and immediately proceed to send the CREATE_ACK. */
+    cc->state = CADET_CONNECTION_CREATE_RECEIVED;
+    cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
+                                         cc);
+  }
+  else
+  {
+    /* We are currently sending something else back, which
+       can only be an ACK or payload, either of which would
+       do. So actually no need to do anything. */
+  }
 }
 
 
@@ -344,34 +365,62 @@ send_create (void *cls)
  * peer at the first hop.  Adjust accordingly.
  *
  * @param cls the `struct CadetConnection`
- * @param mq NULL if the CORE connection was lost, non-NULL if
- *           it became available
+ * @param available #GNUNET_YES if sending is now possible,
+ *                  #GNUNET_NO if sending is no longer possible
+ *                  #GNUNET_SYSERR if sending is no longer possible
+ *                                 and the last envelope was discarded
  */
 static void
 manage_first_hop_mq (void *cls,
-                     struct GNUNET_MQ_Handle *mq)
+                     int available)
 {
   struct CadetConnection *cc = cls;
 
-  if (NULL == mq)
+  if (GNUNET_YES != available)
   {
     /* Connection is down, for now... */
-    cc->mq = NULL;
+    cc->mqm_ready = GNUNET_NO;
+    cc->state = CADET_CONNECTION_NEW;
+    cc->retry_delay = GNUNET_TIME_UNIT_ZERO;
     if (NULL != cc->task)
     {
       GNUNET_SCHEDULER_cancel (cc->task);
       cc->task = NULL;
     }
+    cc->ready_cb (cc->ready_cb_cls,
+                  GNUNET_NO);
     return;
   }
 
-  cc->mq = mq;
-  cc->state = CADET_CONNECTION_SENDING_CREATE;
-
-  /* Now repeat sending connection creation messages
-     down the path, until we get an ACK! */
-  cc->task = GNUNET_SCHEDULER_add_now (&send_create,
-                                       cc);
+  cc->mqm_ready = GNUNET_YES;
+  switch (cc->state)
+  {
+  case CADET_CONNECTION_NEW:
+    /* Transmit immediately */
+    cc->task = GNUNET_SCHEDULER_add_now (&send_create,
+                                         cc);
+    break;
+  case CADET_CONNECTION_SENDING_CREATE:
+    /* Should not be possible to be called in this state. */
+    GNUNET_assert (0);
+    break;
+  case CADET_CONNECTION_SENT:
+    /* Retry a bit later... */
+    cc->retry_delay = GNUNET_TIME_STD_BACKOFF (cc->retry_delay);
+    cc->task = GNUNET_SCHEDULER_add_delayed (cc->retry_delay,
+                                             &send_create,
+                                             cc);
+    break;
+  case CADET_CONNECTION_CREATE_RECEIVED:
+    /* We got the 'CREATE' (incoming connection), should send the CREATE_ACK */
+    cc->task = GNUNET_SCHEDULER_add_now (&send_create_ack,
+                                         cc);
+    break;
+  case CADET_CONNECTION_READY:
+    cc->ready_cb (cc->ready_cb_cls,
+                  GNUNET_YES);
+    break;
+  }
 }
 
 
@@ -383,6 +432,7 @@ manage_first_hop_mq (void *cls,
  * @param destination where to go
  * @param path which path to take (may not be the full path)
  * @param ct which tunnel uses this connection
+ * @param init_state initial state for the connection
  * @param ready_cb function to call when ready to transmit
  * @param ready_cb_cls closure for @a cb
  * @return handle to the connection
@@ -392,7 +442,8 @@ connection_create (struct CadetPeer *destination,
                    struct CadetPeerPath *path,
                    struct CadetTConnection *ct,
                    const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
-                   GNUNET_SCHEDULER_TaskCallback ready_cb,
+                   enum CadetConnectionState init_state,
+                   GCC_ReadyCallback ready_cb,
                    void *ready_cb_cls)
 {
   struct CadetConnection *cc;
@@ -403,6 +454,7 @@ connection_create (struct CadetPeer *destination,
                         destination);
   GNUNET_assert (UINT_MAX > off);
   cc = GNUNET_new (struct CadetConnection);
+  cc->state = init_state;
   cc->ct = ct;
   cc->cid = *cid;
   GNUNET_assert (GNUNET_OK ==
@@ -448,19 +500,16 @@ GCC_create_inbound (struct CadetPeer *destination,
                     struct CadetPeerPath *path,
                     struct CadetTConnection *ct,
                     const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
-                    GNUNET_SCHEDULER_TaskCallback ready_cb,
+                    GCC_ReadyCallback ready_cb,
                     void *ready_cb_cls)
 {
-  struct CadetConnection *cc;
-
-  cc = connection_create (destination,
-                          path,
-                          ct,
-                          cid,
-                          ready_cb,
-                          ready_cb_cls);
-  /* FIXME: send CREATE_ACK? */
-  return cc;
+  return connection_create (destination,
+                            path,
+                            ct,
+                            cid,
+                            CADET_CONNECTION_CREATE_RECEIVED,
+                            ready_cb,
+                            ready_cb_cls);
 }
 
 
@@ -479,41 +528,21 @@ struct CadetConnection *
 GCC_create (struct CadetPeer *destination,
             struct CadetPeerPath *path,
             struct CadetTConnection *ct,
-            GNUNET_SCHEDULER_TaskCallback ready_cb,
+            GCC_ReadyCallback ready_cb,
             void *ready_cb_cls)
 {
   struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
-  struct CadetConnection *cc;
 
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                               &cid,
                               sizeof (cid));
-  cc = connection_create (destination,
-                          path,
-                          ct,
-                          &cid,
-                          ready_cb,
-                          ready_cb_cls);
-  /* FIXME: send CREATE? */
-  return cc;
-}
-
-
-/**
- * We finished transmission of a message, if we are still ready, tell
- * the tunnel!
- *
- * @param cls our `struct CadetConnection`
- */
-static void
-transmit_done_cb (void *cls)
-{
-  struct CadetConnection *cc = cls;
-
-  cc->env = NULL;
-  if ( (NULL != cc->mq) &&
-       (CADET_CONNECTION_READY == cc->state) )
-    cc->ready_cb (cc->ready_cb_cls);
+  return connection_create (destination,
+                            path,
+                            ct,
+                            &cid,
+                            CADET_CONNECTION_NEW,
+                            ready_cb,
+                            ready_cb_cls);
 }
 
 
@@ -524,21 +553,18 @@ transmit_done_cb (void *cls)
  * connection is right now ready for transmission.
  *
  * @param cc connection identification
- * @param env envelope with message to transmit
+ * @param env envelope with message to transmit; must NOT
+ *            yet have a #GNUNET_MQ_notify_sent() callback attached to it
  */
 void
 GCC_transmit (struct CadetConnection *cc,
               struct GNUNET_MQ_Envelope *env)
 {
-  GNUNET_assert (NULL == cc->env);
-  cc->env = env;
-  GNUNET_MQ_notify_sent (env,
-                         &transmit_done_cb,
-                         cc);
-  if ( (NULL != cc->mq) &&
-       (CADET_CONNECTION_READY == cc->state) )
-    GNUNET_MQ_send (cc->mq,
-                    env);
+  GNUNET_assert (GNUNET_YES == cc->mqm_ready);
+  GNUNET_assert (CADET_CONNECTION_READY == cc->state);
+  cc->mqm_ready = GNUNET_NO;
+  GCP_send (cc->mq_man,
+            env);
 }
 
 
@@ -568,6 +594,39 @@ GCC_get_id (struct CadetConnection *cc)
 }
 
 
+/**
+ * Get a (static) string for a connection.
+ *
+ * @param cc Connection.
+ */
+const char *
+GCC_2s (const struct CadetConnection *cc)
+{
+  static char buf[128];
+
+  if (NULL == cc)
+    return "Connection(NULL)";
+
+  if (NULL != cc->ct)
+  {
+    GNUNET_snprintf (buf,
+                     sizeof (buf),
+                     "Connection(%s(Tunnel(%s)))",
+                     GNUNET_sh2s (&cc->cid.connection_of_tunnel),
+                     GCT_2s (cc->ct->t));
+    return buf;
+  }
+  GNUNET_snprintf (buf,
+                   sizeof (buf),
+                   "Connection(%s(Tunnel(NULL)))",
+                   GNUNET_sh2s (&cc->cid.connection_of_tunnel));
+  return buf;
+}
+
+
+#define LOG2(level, ...) GNUNET_log_from_nocheck(level,"cadet-con",__VA_ARGS__)
+
+
 /**
  * Log connection info.
  *
@@ -578,7 +637,29 @@ void
 GCC_debug (struct CadetConnection *cc,
            enum GNUNET_ErrorType level)
 {
-  GNUNET_break (0); // FIXME: implement...
+  int do_log;
+  char *s;
+
+  do_log = GNUNET_get_log_call_status (level & (~GNUNET_ERROR_TYPE_BULK),
+                                       "cadet-con",
+                                       __FILE__, __FUNCTION__, __LINE__);
+  if (0 == do_log)
+    return;
+  if (NULL == cc)
+  {
+    LOG2 (level,
+          "Connection (NULL)\n");
+    return;
+  }
+  s = GCPP_2s (cc->path);
+  LOG2 (level,
+        "Connection %s to %s via path %s in state %d is %s\n",
+        GCC_2s (cc),
+        GCP_2s (cc->destination),
+        s,
+        cc->state,
+        (GNUNET_YES == cc->mqm_ready) ? "ready" : "busy");
+  GNUNET_free (s);
 }
 
 /* end of gnunet-service-cadet-new_connection.c */
index f2364dea463a5de283906672cc8b644edcc70cec..99426776debbc8f5bb98bb8740e8ae8acae4ac8a 100644 (file)
 #include "gnunet-service-cadet-new_peer.h"
 #include "cadet_protocol.h"
 
+
 /**
- * Is the given connection currently ready for transmission?
+ * Function called to notify tunnel about change in our readyness.
  *
- * @param cc connection to transmit on
- * @return #GNUNET_YES if we could transmit
+ * @param cls closure
+ * @param is_ready #GNUNET_YES if the connection is now ready for transmission,
+ *                 #GNUNET_NO if the connection is no longer ready for transmission
  */
-int
-GCC_is_ready (struct CadetConnection *cc);
+typedef void
+(*GCC_ReadyCallback)(void *cls,
+                     int is_ready);
 
 
 /**
@@ -67,7 +70,7 @@ struct CadetConnection *
 GCC_create (struct CadetPeer *destination,
             struct CadetPeerPath *path,
             struct CadetTConnection *ct,
-            GNUNET_SCHEDULER_TaskCallback ready_cb,
+            GCC_ReadyCallback ready_cb,
             void *ready_cb_cls);
 
 
@@ -88,7 +91,7 @@ GCC_create_inbound (struct CadetPeer *destination,
                     struct CadetPeerPath *path,
                     struct CadetTConnection *ct,
                     const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
-                    GNUNET_SCHEDULER_TaskCallback ready_cb,
+                    GCC_ReadyCallback ready_cb,
                     void *ready_cb_cls);
 
 
@@ -170,6 +173,15 @@ const struct GNUNET_CADET_ConnectionTunnelIdentifier *
 GCC_get_id (struct CadetConnection *cc);
 
 
+/**
+ * Get a (static) string for a connection.
+ *
+ * @param cc Connection.
+ */
+const char *
+GCC_2s (const struct CadetConnection *cc);
+
+
 /**
  * Log connection info.
  *
index 3d8406dc9f8b5a47ec7f4bc6f8c9abbea4507f1c..9ce4418de7787fb298845d5ef3c32f231bd93e67 100644 (file)
  * @author Christian Grothoff
  *
  * All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
+ *
+ * TODO:
+ * - pass encrypted ACK to connection (!)
+ * - given BROKEN messages, destroy paths (?)
+ * -
+ * - handle POLL (if needed)
  */
 #include "platform.h"
 #include "gnunet-service-cadet-new_core.h"
 #include "gnunet_core_service.h"
 #include "cadet_protocol.h"
 
+/**
+ * Number of messages we are willing to buffer per route.
+ */
+#define ROUTE_BUFFER_SIZE 8
+
+
+/**
+ * Information we keep per direction for a route.
+ */
+struct RouteDirection
+{
+  /**
+   * Target peer.
+   */
+  struct CadetPeer *hop;
+
+  /**
+   * Route this direction is part of.
+   */
+  struct CadetRoute *my_route;
+
+  /**
+   * Message queue manager for @e hop.
+   */
+  struct GCP_MessageQueueManager *mqm;
+
+  /**
+   * Cyclic message buffer to @e hop.
+   */
+  struct GNUNET_MQ_Envelope *out_buffer[ROUTE_BUFFER_SIZE];
+
+  /**
+   * Next write offset to use to append messages to @e out_buffer.
+   */
+  unsigned int out_wpos;
+
+  /**
+   * Next read offset to use to retrieve messages from @e out_buffer.
+   */
+  unsigned int out_rpos;
+
+  /**
+   * Is @e mqm currently ready for transmission?
+   */
+  int is_ready;
+
+};
+
 
 /**
  * Description of a segment of a `struct CadetConnection` at the
@@ -48,24 +102,14 @@ struct CadetRoute
 {
 
   /**
-   * Previous hop on this route.
+   * Information about the next hop on this route.
    */
-  struct CadetPeer *prev_hop;
+  struct RouteDirection next;
 
   /**
-   * Next hop on this route.
+   * Information about the previous hop on this route.
    */
-  struct CadetPeer *next_hop;
-
-  /**
-   * Message queue notifications for @e prev_hop.
-   */
-  struct GCP_MessageQueueManager *prev_mqm;
-
-  /**
-   * Message queue notifications for @e next_hop.
-   */
-  struct GCP_MessageQueueManager *next_mqm;
+  struct RouteDirection prev;
 
   /**
    * Unique identifier for the connection that uses this route.
@@ -77,12 +121,6 @@ struct CadetRoute
    */
   struct GNUNET_TIME_Absolute last_use;
 
-  /**
-   * Counter, used to verify that both MQs are up when the route is
-   * initialized.
-   */
-  unsigned int up;
-
 };
 
 
@@ -125,6 +163,8 @@ route_message (struct CadetPeer *prev,
                const struct GNUNET_MessageHeader *msg)
 {
   struct CadetRoute *route;
+  struct RouteDirection *dir;
+  struct GNUNET_MQ_Envelope *env;
 
   route = get_route (cid);
   if (NULL == route)
@@ -136,13 +176,33 @@ route_message (struct CadetPeer *prev,
                          GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
     bm->cid = *cid;
     bm->peer1 = my_full_id;
-    GCP_send (prev,
-              env);
+    GCP_send_ooo (prev,
+                  env);
     return;
   }
-  /* FIXME: support round-robin queue management here somewhere! */
-  GCP_send ((prev == route->prev_hop) ? route->next_hop : route->prev_hop,
-            GNUNET_MQ_msg_copy (msg));
+  dir = (prev == route->prev.hop) ? &route->next : &route->prev;
+  if (GNUNET_YES == dir->is_ready)
+  {
+    dir->is_ready = GNUNET_NO;
+    GCP_send (dir->mqm,
+              GNUNET_MQ_msg_copy (msg));
+    return;
+  }
+  env = dir->out_buffer[dir->out_wpos];
+  if (NULL != env)
+  {
+    /* Queue full, drop earliest message in queue */
+    GNUNET_assert (dir->out_rpos == dir->out_wpos);
+    GNUNET_MQ_discard (env);
+    dir->out_rpos++;
+    if (ROUTE_BUFFER_SIZE == dir->out_rpos)
+      dir->out_rpos = 0;
+  }
+  env = GNUNET_MQ_msg_copy (msg);
+  dir->out_buffer[dir->out_wpos] = env;
+  dir->out_wpos++;
+  if (ROUTE_BUFFER_SIZE == dir->out_wpos)
+    dir->out_wpos = 0;
 }
 
 
@@ -169,6 +229,29 @@ check_connection_create (void *cls,
 }
 
 
+/**
+ * Free internal data of a route direction.
+ *
+ * @param dir direction to destroy (do NOT free memory of 'dir' itself)
+ */
+static void
+destroy_direction (struct RouteDirection *dir)
+{
+  for (unsigned int i=0;i<ROUTE_BUFFER_SIZE;i++)
+    if (NULL != dir->out_buffer[i])
+    {
+      GNUNET_MQ_discard (dir->out_buffer[i]);
+      dir->out_buffer[i] = NULL;
+    }
+  if (NULL != dir->mqm)
+  {
+    GCP_request_mq_cancel (dir->mqm,
+                           NULL);
+    dir->mqm = NULL;
+  }
+}
+
+
 /**
  * Destroy our state for @a route.
  *
@@ -177,8 +260,8 @@ check_connection_create (void *cls,
 static void
 destroy_route (struct CadetRoute *route)
 {
-  GCP_request_mq_cancel (route->next_mqm);
-  GCP_request_mq_cancel (route->prev_mqm);
+  destroy_direction (&route->prev);
+  destroy_direction (&route->next);
   GNUNET_free (route);
 }
 
@@ -192,7 +275,7 @@ destroy_route (struct CadetRoute *route)
  * @param peer2 another one of the peers where a link is broken
  */
 static void
-send_broken (struct CadetPeer *target,
+send_broken (struct RouteDirection *target,
              const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
              const struct GNUNET_PeerIdentity *peer1,
              const struct GNUNET_PeerIdentity *peer2)
@@ -207,8 +290,9 @@ send_broken (struct CadetPeer *target,
     bm->peer1 = *peer1;
   if (NULL != peer2)
     bm->peer2 = *peer2;
-  GCP_send (target,
-            env);
+  GCP_request_mq_cancel (target->mqm,
+                         env);
+  target->mqm = NULL;
 }
 
 
@@ -218,53 +302,64 @@ send_broken (struct CadetPeer *target,
  * be called immediately when we register, and then again
  * later if the connection ever goes down.
  *
- * @param cls the `struct CadetRoute`
- * @param mq the message queue, NULL if connection went down
+ * @param cls the `struct RouteDirection`
+ * @param available #GNUNET_YES if sending is now possible,
+ *                  #GNUNET_NO if sending is no longer possible
+ *                  #GNUNET_SYSERR if sending is no longer possible
+ *                                 and the last envelope was discarded
  */
 static void
-mqm_cr_destroy_prev (void *cls,
-                     struct GNUNET_MQ_Handle *mq)
+dir_ready_cb (void *cls,
+              int ready)
 {
-  struct CadetRoute *route = cls;
+  struct RouteDirection *dir = cls;
+  struct CadetRoute *route = dir->my_route;
+  struct RouteDirection *odir;
 
-  if (NULL != mq)
+  if (GNUNET_YES == ready)
   {
-    route->up |= 1;
+    struct GNUNET_MQ_Envelope *env;
+
+    dir->is_ready = GNUNET_YES;
+    if (NULL != (env = dir->out_buffer[dir->out_rpos]))
+    {
+      dir->out_buffer[dir->out_rpos] = NULL;
+      dir->out_rpos++;
+      if (ROUTE_BUFFER_SIZE == dir->out_rpos)
+        dir->out_rpos = 0;
+      dir->is_ready = GNUNET_NO;
+      GCP_send (dir->mqm,
+                env);
+    }
     return;
   }
-  send_broken (route->next_hop,
+  odir = (dir == &route->next) ? &route->prev : &route->next;
+  send_broken (&route->next,
                &route->cid,
-               GCP_get_id (route->prev_hop),
+               GCP_get_id (odir->hop),
                &my_full_id);
   destroy_route (route);
 }
 
 
 /**
- * Function called when the message queue to the previous hop
- * becomes available/unavailable.  We expect this function to
- * be called immediately when we register, and then again
- * later if the connection ever goes down.
+ * Initialize one of the directions of a route.
  *
- * @param cls the `struct CadetRoute`
- * @param mq the message queue, NULL if connection went down
+ * @param route route the direction belongs to
+ * @param dir direction to initialize
+ * @param hop next hop on in the @a dir
  */
 static void
-mqm_cr_destroy_next (void *cls,
-                     struct GNUNET_MQ_Handle *mq)
+dir_init (struct RouteDirection *dir,
+          struct CadetRoute *route,
+          struct CadetPeer *hop)
 {
-  struct CadetRoute *route = cls;
-
-  if (NULL != mq)
-  {
-    route->up |= 2;
-    return;
-  }
-  send_broken (route->prev_hop,
-               &route->cid,
-               GCP_get_id (route->next_hop),
-               &my_full_id);
-  destroy_route (route);
+  dir->hop = hop;
+  dir->my_route = route;
+  dir->mqm = GCP_request_mq (hop,
+                             &dir_ready_cb,
+                             dir);
+  GNUNET_assert (GNUNET_YES == dir->is_ready);
 }
 
 
@@ -310,16 +405,28 @@ handle_connection_create (void *cls,
   if (NULL !=
       get_route (&msg->cid))
   {
-    /* CID not chosen at random, collides */
-    GNUNET_break_op (0);
+    /* Duplicate CREATE, pass it on, previous one might have been lost! */
+    route_message (sender,
+                   &msg->cid,
+                   &msg->header);
     return;
   }
   if (off == path_length - 1)
   {
     /* We are the destination, create connection */
+    struct CadetConnection *cc;
     struct CadetPeerPath *path;
     struct CadetPeer *origin;
 
+    cc = GNUNET_CONTAINER_multishortmap_get (connections,
+                                             &msg->cid.connection_of_tunnel);
+    if (NULL != cc)
+    {
+      /* Duplicate CREATE, likely our ACK got lost, retransmit the ACK! */
+      GNUNET_break (0); // FIXME: not implemented!
+      return;
+    }
+
     path = GCPP_get_path_from_route (path_length,
                                      pids);
     origin = GCP_get (&pids[0],
@@ -327,35 +434,37 @@ handle_connection_create (void *cls,
     GCT_add_inbound_connection (GCT_create_tunnel (origin),
                                 &msg->cid,
                                 path);
-
     return;
   }
   /* We are merely a hop on the way, check if we can support the route */
   next = GCP_get (&pids[off + 1],
                   GNUNET_NO);
   if ( (NULL == next) ||
-       (NULL == GCP_get_mq (next)) )
+       (GNUNET_NO == GCP_has_core_connection (next)) )
   {
     /* unworkable, send back BROKEN notification */
-    send_broken (sender,
-                 &msg->cid,
-                 &pids[off + 1],
-                 &my_full_id);
+    struct GNUNET_MQ_Envelope *env;
+    struct GNUNET_CADET_ConnectionBrokenMessage *bm;
+
+    env = GNUNET_MQ_msg (bm,
+                         GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+    bm->cid = msg->cid;
+    bm->peer1 = pids[off + 1];
+    bm->peer2 = my_full_id;
+    GCP_send_ooo (sender,
+                  env);
     return;
   }
 
   /* Workable route, create routing entry */
   route = GNUNET_new (struct CadetRoute);
   route->cid = msg->cid;
-  route->prev_mqm = GCP_request_mq (sender,
-                                    &mqm_cr_destroy_prev,
-                                    route);
-  route->next_mqm = GCP_request_mq (next,
-                                    &mqm_cr_destroy_next,
-                                    route);
-  route->prev_hop = sender;
-  route->next_hop = next;
-  GNUNET_assert ((1|2) == route->up);
+  dir_init (&route->prev,
+            route,
+            sender);
+  dir_init (&route->next,
+            route,
+            next);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multishortmap_put (routes,
                                                      &route->cid.connection_of_tunnel,
@@ -371,8 +480,8 @@ handle_connection_create (void *cls,
  * @param msg Message itself.
  */
 static void
-handle_connection_ack (void *cls,
-                       const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg)
+handle_connection_create_ack (void *cls,
+                              const struct GNUNET_CADET_ConnectionCreateMessageAckMessage *msg)
 {
   struct CadetPeer *peer = cls;
   struct CadetConnection *cc;
@@ -734,7 +843,7 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
                            GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
                            struct GNUNET_CADET_ConnectionCreateMessage,
                            NULL),
-    GNUNET_MQ_hd_fixed_size (connection_ack,
+    GNUNET_MQ_hd_fixed_size (connection_create_ack,
                              GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
                              struct GNUNET_CADET_ConnectionCreateMessageAckMessage,
                              NULL),
index 3f6edef3993d3ece8aadb50476eda125233e5098..3cfce337c692adc9f555f844522f174d2b5483cb 100644 (file)
@@ -525,4 +525,33 @@ GCPP_get_peer_at_offset (struct CadetPeerPath *path,
 }
 
 
+/**
+ * Convert a path to a human-readable string.
+ *
+ * @param path path to convert
+ * @return string, to be freed by caller (unlike other *_2s APIs!)
+ */
+char *
+GCPP_2s (struct CadetPeerPath *path)
+{
+  char *s;
+  char *old;
+
+  old = GNUNET_strdup ("");
+  for (unsigned int i = 0;
+       i < path->entries_length;
+       i++)
+  {
+    GNUNET_asprintf (&s,
+                     "%s %s",
+                     old,
+                     GCP_2s (GCPP_get_peer_at_offset (path,
+                                                      i)));
+    GNUNET_free_non_null (old);
+    old = s;
+  }
+  return old;
+}
+
+
 /* end of gnunet-service-cadet-new_paths.c */
index 6a864e8ecf14131f005c08dcb8a283d4c3a51d39..5714368c7d80c8222eadcfe0c343902e4dae6d24 100644 (file)
@@ -169,4 +169,14 @@ GCPP_get_peer_at_offset (struct CadetPeerPath *path,
                          unsigned int off);
 
 
+/**
+ * Convert a path to a human-readable string.
+ *
+ * @param path path to convert
+ * @return string, to be freed by caller (unlike other *_2s APIs!)
+ */
+char *
+GCPP_2s (struct CadetPeerPath *p);
+
+
 #endif
index c57622181ec751621a9f53296ca553d89e65db61..5b978ff7757b7941ba0d5135374a73ab88d6cb58 100644 (file)
@@ -86,6 +86,11 @@ struct GCP_MessageQueueManager
    */
   struct CadetPeer *cp;
 
+  /**
+   * Envelope this manager would like to transmit once it is its turn.
+   */
+  struct GNUNET_MQ_Envelope *env;
+
 };
 
 
@@ -188,6 +193,11 @@ struct CadetPeer
    */
   unsigned int num_paths;
 
+  /**
+   * Number of message queue managers of this peer that have a message in waiting.
+   */
+  unsigned int mqm_ready_counter;
+
   /**
    * Current length of the @e path_heads and @path_tails arrays.
    * The arrays should be grown as needed.
@@ -265,50 +275,121 @@ destroy_peer (void *cls)
 
 
 /**
- * Get the message queue for peer @a cp.
+ * Set the message queue to @a mq for peer @a cp and notify watchers.
  *
  * @param cp peer to modify
- * @return message queue (can be NULL)
+ * @param mq message queue to set (can be NULL)
  */
-struct GNUNET_MQ_Handle *
-GCP_get_mq (struct CadetPeer *cp)
+void
+GCP_set_mq (struct CadetPeer *cp,
+            struct GNUNET_MQ_Handle *mq)
 {
-  return cp->core_mq;
+  cp->core_mq = mq;
+
+  for (struct GCP_MessageQueueManager *mqm = cp->mqm_head;
+       NULL != mqm;
+       mqm = mqm->next)
+  {
+    if (NULL == mq)
+    {
+      if (NULL != mqm->env)
+      {
+        GNUNET_MQ_discard (mqm->env);
+        mqm->env = NULL;
+        mqm->cb (mqm->cb_cls,
+                 GNUNET_SYSERR);
+      }
+      else
+      {
+        mqm->cb (mqm->cb_cls,
+                 GNUNET_NO);
+      }
+    }
+    else
+    {
+      GNUNET_assert (NULL == mqm->env);
+      mqm->cb (mqm->cb_cls,
+               GNUNET_YES);
+    }
+  }
 }
 
 
 /**
- * Set the message queue to @a mq for peer @a cp and notify watchers.
+ * Transmit current envelope from this @a mqm.
  *
- * @param cp peer to modify
- * @param mq message queue to set (can be NULL)
+ * @param mqm mqm to transmit message for now
  */
-void
-GCP_set_mq (struct CadetPeer *cp,
-            struct GNUNET_MQ_Handle *mq)
+static void
+mqm_execute (struct GCP_MessageQueueManager *mqm)
 {
-  cp->core_mq = mq;
+  struct CadetPeer *cp = mqm->cp;
+
+  /* Move entry to the end of the DLL, to be fair. */
+  if (mqm != cp->mqm_tail)
+  {
+    GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
+                                 cp->mqm_tail,
+                                 mqm);
+    GNUNET_CONTAINER_DLL_insert_tail (cp->mqm_head,
+                                      cp->mqm_tail,
+                                      mqm);
+  }
+  GNUNET_MQ_send (cp->core_mq,
+                  mqm->env);
+  mqm->env = NULL;
+  cp->mqm_ready_counter--;
+}
+
+
+/**
+ * Function called when CORE took one of the messages from
+ * a message queue manager and transmitted it.
+ *
+ * @param cls the `struct CadetPeeer` where we made progress
+ */
+static void
+mqm_send_done (void *cls)
+{
+  struct CadetPeer *cp = cls;
+
+  if (0 == cp->mqm_ready_counter)
+    return; /* nothing to do */
   for (struct GCP_MessageQueueManager *mqm = cp->mqm_head;
        NULL != mqm;
        mqm = mqm->next)
-    mqm->cb (mqm->cb_cls,
-             mq);
+  {
+    if (NULL == mqm->env)
+      continue;
+    mqm_execute (mqm);
+    return;
+  }
 }
 
 
 /**
  * Send the message in @a env to @a cp.
  *
- * @param cp the peer
- * @param env envelope with the message to send
+ * @param mqm the message queue manager to use for transmission
+ * @param env envelope with the message to send; must NOT
+ *            yet have a #GNUNET_MQ_notify_sent() callback attached to it
  */
 void
-GCP_send (struct CadetPeer *cp,
+GCP_send (struct GCP_MessageQueueManager *mqm,
           struct GNUNET_MQ_Envelope *env)
 {
+  struct CadetPeer *cp = mqm->cp;
+
   GNUNET_assert (NULL != cp->core_mq);
-  GNUNET_MQ_send (cp->core_mq,
-                  env);
+  GNUNET_assert (NULL == mqm->env);
+  GNUNET_MQ_notify_sent (env,
+                         &mqm_send_done,
+                         cp);
+  mqm->env = env;
+  cp->mqm_ready_counter++;
+  if (0 != GNUNET_MQ_get_length (cp->core_mq))
+    return;
+  mqm_execute (mqm);
 }
 
 
@@ -864,6 +945,19 @@ GCP_drop_tunnel (struct CadetPeer *peer,
 }
 
 
+/**
+ * Test if @a cp has a core-level connection
+ *
+ * @param cp peer to test
+ * @return #GNUNET_YES if @a cp has a core-level connection
+ */
+int
+GCP_has_core_connection (struct CadetPeer *cp)
+{
+  return (NULL != cp->core_mq) ? GNUNET_YES : GNUNET_NO;
+}
+
+
 /**
  * Start message queue change notifications.
  *
@@ -888,7 +982,7 @@ GCP_request_mq (struct CadetPeer *cp,
                                mqm);
   if (NULL != cp->core_mq)
     cb (cb_cls,
-        cp->core_mq);
+        GNUNET_YES);
   return mqm;
 }
 
@@ -897,12 +991,24 @@ GCP_request_mq (struct CadetPeer *cp,
  * Stops message queue change notifications.
  *
  * @param mqm handle matching request to cancel
+ * @param last_env final message to transmit, or NULL
  */
 void
-GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm)
+GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm,
+                       struct GNUNET_MQ_Envelope *last_env)
 {
   struct CadetPeer *cp = mqm->cp;
 
+  if (NULL != mqm->env)
+    GNUNET_MQ_discard (mqm->env);
+  if (NULL != last_env)
+  {
+    if (NULL != cp->core_mq)
+      GNUNET_MQ_send (cp->core_mq,
+                      last_env);
+    else
+      GNUNET_MQ_discard (last_env);
+  }
   GNUNET_CONTAINER_DLL_remove (cp->mqm_head,
                                cp->mqm_tail,
                                mqm);
@@ -910,5 +1016,29 @@ GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm)
 }
 
 
+/**
+ * Send the message in @a env to @a cp, overriding queueing logic.
+ * This function should only be used to send error messages outside
+ * of flow and congestion control, similar to ICMP.  Note that
+ * the envelope may be silently discarded as well.
+ *
+ * @param cp peer to send the message to
+ * @param env envelope with the message to send
+ */
+void
+GCP_send_ooo (struct CadetPeer *cp,
+              struct GNUNET_MQ_Envelope *env)
+{
+  if (NULL == cp->core_mq)
+  {
+    GNUNET_MQ_discard (env);
+    return;
+  }
+  GNUNET_MQ_send (cp->core_mq,
+                  env);
+}
+
+
+
 
 /* end of gnunet-service-cadet-new_peer.c */
index 6b4ee1b5669f4e27663f427492b0e7ce6e4e5143..c633f47e5d28e75d5512126b7416e1fbd73a92e9 100644 (file)
@@ -262,7 +262,12 @@ GCP_destroy_all_peers (void);
 
 /**
  * Data structure used to track whom we have to notify about changes
- * to our message queue.
+ * in our ability to transmit to a given peer.
+ *
+ * All queue managers will be given equal chance for sending messages
+ * to @a cp.  This construct this guarantees fairness for access to @a
+ * cp among the different message queues.  Each connection or route
+ * will have its respective message queue managers for each direction.
  */
 struct GCP_MessageQueueManager;
 
@@ -271,15 +276,19 @@ struct GCP_MessageQueueManager;
  * Function to call with updated message queue object.
  *
  * @param cls closure
- * @param mq NULL if MQ is gone, otherwise an active message queue
+ * @param available #GNUNET_YES if sending is now possible,
+ *                  #GNUNET_NO if sending is no longer possible
+ *                  #GNUNET_SYSERR if sending is no longer possible
+ *                                 and the last envelope was discarded
  */
 typedef void
 (*GCP_MessageQueueNotificationCallback)(void *cls,
-                                        struct GNUNET_MQ_Handle *mq);
+                                        int available);
 
 
 /**
- * Start message queue change notifications.
+ * Start message queue change notifications.  Will create a new slot
+ * to manage the message queue to the given @a cp.
  *
  * @param cp peer to notify for
  * @param cb function to call if mq becomes available or unavailable
@@ -293,44 +302,67 @@ GCP_request_mq (struct CadetPeer *cp,
 
 
 /**
- * Stops message queue change notifications.
+ * Test if @a cp has a core-level connection
  *
- * @param mqm handle matching request to cancel
+ * @param cp peer to test
+ * @return #GNUNET_YES if @a cp has a core-level connection
+ */
+int
+GCP_has_core_connection (struct CadetPeer *cp);
+
+
+/**
+ * Send the message in @a env via a @a mqm.  Must only be called at
+ * most once after the respective
+ * #GCP_MessageQueueNotificationCallback was called with `available`
+ * set to #GNUNET_YES, and not after the callback was called with
+ * `available` set to #GNUNET_NO or #GNUNET_SYSERR.
+ *
+ * @param mqm message queue manager for the transmission
+ * @param env envelope with the message to send; must NOT
+ *            yet have a #GNUNET_MQ_notify_sent() callback attached to it
  */
 void
-GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm);
+GCP_send (struct GCP_MessageQueueManager *mqm,
+          struct GNUNET_MQ_Envelope *env);
 
 
 /**
- * Set the message queue to @a mq for peer @a cp and notify watchers.
+ * Send the message in @a env to @a cp, overriding queueing logic.
+ * This function should only be used to send error messages outside
+ * of flow and congestion control, similar to ICMP.  Note that
+ * the envelope may be silently discarded as well.
  *
- * @param cp peer to modify
- * @param mq message queue to set (can be NULL)
+ * @param cp peer to send the message to
+ * @param env envelope with the message to send
  */
 void
-GCP_set_mq (struct CadetPeer *cp,
-            struct GNUNET_MQ_Handle *mq);
+GCP_send_ooo (struct CadetPeer *cp,
+              struct GNUNET_MQ_Envelope *env);
 
 
 /**
- * Get the message queue for peer @a cp.
+ * Stops message queue change notifications and sends a last message.
+ * In practice, this is implemented by sending that @a last_env
+ * message immediately (if any), ignoring queue order.
  *
- * @param cp peer to modify
- * @return message queue (can be NULL)
+ * @param mqm handle matching request to cancel
+ * @param last_env final message to transmit, or NULL
  */
-struct GNUNET_MQ_Handle *
-GCP_get_mq (struct CadetPeer *cp);
+void
+GCP_request_mq_cancel (struct GCP_MessageQueueManager *mqm,
+                       struct GNUNET_MQ_Envelope *last_env);
 
 
 /**
- * Send the message in @a env to @a cp.
+ * Set the message queue to @a mq for peer @a cp and notify watchers.
  *
- * @param cp the peer
- * @param env envelope with the message to send
+ * @param cp peer to modify
+ * @param mq message queue to set (can be NULL)
  */
 void
-GCP_send (struct CadetPeer *cp,
-          struct GNUNET_MQ_Envelope *env);
+GCP_set_mq (struct CadetPeer *cp,
+            struct GNUNET_MQ_Handle *mq);
 
 
 #endif
index 9161c41f7d0686aedaf21e176dcac33a6fbec61f..23b270b820afce363b7ab73b873aa70c430210b4 100644 (file)
@@ -226,43 +226,6 @@ struct CadetTunnelAxolotl
 };
 
 
-/**
- * Entry in list of connections used by tunnel, with metadata.
- */
-struct CadetTConnection
-{
-  /**
-   * Next in DLL.
-   */
-  struct CadetTConnection *next;
-
-  /**
-   * Prev in DLL.
-   */
-  struct CadetTConnection *prev;
-
-  /**
-   * Connection handle.
-   */
-  struct CadetConnection *cc;
-
-  /**
-   * Tunnel this connection belongs to.
-   */
-  struct CadetTunnel *t;
-
-  /**
-   * Creation time, to keep oldest connection alive.
-   */
-  struct GNUNET_TIME_Absolute created;
-
-  /**
-   * Connection throughput, to keep fastest connection alive.
-   */
-  uint32_t throughput;
-};
-
-
 /**
  * Struct used to save messages in a non-ready tunnel to send once connected.
  */
@@ -1418,18 +1381,27 @@ destroy_tunnel (void *cls)
 
 
 /**
- * A connection is ready for transmission.  Looks at our message queue
- * and if there is a message, sends it out via the connection.
+ * A connection is @a is_ready for transmission.  Looks at our message
+ * queue and if there is a message, sends it out via the connection.
  *
- * @param cls the `struct CadetTConnection` that is ready
+ * @param cls the `struct CadetTConnection` that is @a is_ready
+ * @param is_ready #GNUNET_YES if connection are now ready,
+ *                 #GNUNET_NO if connection are no longer ready
  */
 static void
-connection_ready_cb (void *cls)
+connection_ready_cb (void *cls,
+                     int is_ready)
 {
   struct CadetTConnection *ct = cls;
   struct CadetTunnel *t = ct->t;
   struct CadetTunnelQueueEntry *tq = t->tq_head;
 
+  if (GNUNET_NO == ct->is_ready)
+  {
+    ct->is_ready = GNUNET_NO;
+    return;
+  }
+  ct->is_ready = GNUNET_YES;
   if (NULL == tq)
     return; /* no messages pending right now */
 
@@ -1440,6 +1412,7 @@ connection_ready_cb (void *cls)
                                tq);
   if (NULL != tq->cid)
     *tq->cid = *GCC_get_id (ct->cc);
+  ct->is_ready = GNUNET_NO;
   GCC_transmit (ct->cc,
                 tq->env);
   tq->cont (tq->cont_cls);
@@ -1453,6 +1426,8 @@ connection_ready_cb (void *cls)
  * at our message queue and if there is a message, picks a connection
  * to send it on.
  *
+ * FIXME: yuck... Need better selection logic!
+ *
  * @param t tunnel to process messages on
  */
 static void
@@ -1465,11 +1440,14 @@ trigger_transmissions (struct CadetTunnel *t)
   for (ct = t->connection_head;
        NULL != ct;
        ct = ct->next)
-    if (GNUNET_YES == GCC_is_ready (ct->cc))
+    if (GNUNET_YES == ct->is_ready)
       break;
   if (NULL == ct)
     return; /* no connections ready */
-  connection_ready_cb (ct);
+
+  /* FIXME: a bit hackish to do it like this... */
+  connection_ready_cb (ct,
+                       GNUNET_YES);
 }
 
 
@@ -1567,7 +1545,7 @@ consider_path_cb (void *cls,
                        path,
                        ct,
                        &connection_ready_cb,
-                       t);
+                       ct);
   /* FIXME: schedule job to kill connection (and path?)  if it takes
      too long to get ready! (And track performance data on how long
      other connections took with the tunnel!)