working on channel passing data to clients
authorChristian Grothoff <christian@grothoff.org>
Fri, 20 Jan 2017 19:39:51 +0000 (20:39 +0100)
committerChristian Grothoff <christian@grothoff.org>
Fri, 20 Jan 2017 19:39:51 +0000 (20:39 +0100)
src/cadet/cadet.h
src/cadet/cadet_api.c
src/cadet/gnunet-service-cadet-new.c
src/cadet/gnunet-service-cadet-new.h
src/cadet/gnunet-service-cadet-new_channel.c
src/cadet/gnunet-service-cadet_local.c

index c16fb29174891c5c7ecbe9128924196f098a544a..9d154fb99a5e8e813fa6226233f7691abbe824fe 100644 (file)
@@ -198,7 +198,7 @@ struct GNUNET_CADET_LocalData
   /**
    * ID of the channel
    */
-  struct GNUNET_CADET_ClientChannelNumber id;
+  struct GNUNET_CADET_ClientChannelNumber channel_id;
 
   /**
    * Payload follows
index 8f1274d6332fc7a9c6878782385ad83ce4f30042..5dcf43e46027b8896a897d33368aad252777ba03 100644 (file)
@@ -569,7 +569,7 @@ request_data (void *cls)
   env = GNUNET_MQ_msg_extra (msg,
                              th->size,
                              GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
-  msg->id = th->channel->chid;
+  msg->channel_id = th->channel->chid;
   osize = th->notify (th->notify_cls,
                       th->size,
                       &msg[1]);
@@ -697,7 +697,7 @@ check_local_data (void *cls,
   }
 
   ch = retrieve_channel (h,
-                         message->id);
+                         message->channel_id);
   if (NULL == ch)
   {
     GNUNET_break_op (0);
@@ -727,7 +727,7 @@ handle_local_data (void *cls,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Got a data message!\n");
-  ch = retrieve_channel (h, message->id);
+  ch = retrieve_channel (h, message->channel_id);
   GNUNET_assert (NULL != ch);
 
   payload = (struct GNUNET_MessageHeader *) &message[1];
@@ -735,7 +735,7 @@ handle_local_data (void *cls,
        GC_f2s (ntohl (ch->chid.channel_of_client) >=
                GNUNET_CADET_LOCAL_CHANNEL_ID_CLI),
        GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
-       ntohl (message->id.channel_of_client));
+       ntohl (message->channel_id.channel_of_client));
 
   type = ntohs (payload->type);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  payload type %s\n", GC_m2s (type));
index 7b4a0e95b28a7574ce1c3de209add1db8ee6d8c1..7801708c194d8616c8b40fa2d97cf18de28af64d 100644 (file)
@@ -628,7 +628,7 @@ handle_data (void *cls,
   struct CadetChannel *ch;
   const struct GNUNET_MessageHeader *payload;
 
-  chid = msg->id;
+  chid = msg->channel_id;
   map = get_map_by_chid (c,
                          chid);
   ch = GNUNET_CONTAINER_multihashmap32_get (map,
index 9f4667e23e4e5bb3120a11a0578b6b581f1d606e..b3bb85d85eb1beae155697275396153edb834dcf 100644 (file)
@@ -220,7 +220,6 @@ extern unsigned long long ratchet_messages;
 extern struct GNUNET_TIME_Relative ratchet_time;
 
 
-
 /**
  * Send a message to a client.
  *
index 5d2eba61808bd5f2269d14cb3f156a21f8dac4e3..75ec81992f2b1e4db3d92665bb6df716c648aa55 100644 (file)
  * @author Christian Grothoff
  *
  * TODO:
- * - handle CREATE_ACK
- * - handle plaintext data
- * - handle plaintext ACK
  * - handle destroy
  * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
  * - check that '0xFFULL' really is sufficient for flow control!
- * - what about the 'no buffer' option?
- * - what about the 'out-of-order' option?
+ * - revisit handling of 'unreliable' traffic!
+ * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
+ * - figure out flow control without ACKs (unreliable traffic!)
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
@@ -147,7 +145,8 @@ struct CadetOutOfOrderMessage
   struct CadetOutOfOrderMessage *prev;
 
   /**
-   * ID of the message (ACK needed to free)
+   * ID of the message (messages up to this point needed
+   * before we give this one to the client).
    */
   struct ChannelMessageIdentifier mid;
 
@@ -311,7 +310,6 @@ struct CadetChannel
 };
 
 
-
 /**
  * Get the static string for identification of the channel.
  *
@@ -480,8 +478,10 @@ GCCH_channel_local_new (struct CadetClient *owner,
   struct CadetChannel *ch;
 
   ch = GNUNET_new (struct CadetChannel);
-  ch->max_pending_messages = 32; /* FIXME: allow control via options
-                                    or adjust dynamically... */
+  ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
+  ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
+  ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
+  ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
   ch->owner = owner;
   ch->lid = owner_id;
   ch->port = *port;
@@ -490,9 +490,6 @@ GCCH_channel_local_new (struct CadetClient *owner,
   ch->chid = GCT_add_channel (ch->t,
                               ch);
   ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
-  ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
-  ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
-  ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create,
                                              ch);
   GNUNET_STATISTICS_update (stats,
@@ -538,8 +535,6 @@ GCCH_channel_incoming_new (struct CadetTunnel *t,
   struct CadetClient *c;
 
   ch = GNUNET_new (struct CadetChannel);
-  ch->max_pending_messages = 32; /* FIXME: allow control via options
-                                    or adjust dynamically... */
   ch->port = *port;
   ch->t = t;
   ch->chid = chid;
@@ -547,6 +542,7 @@ GCCH_channel_incoming_new (struct CadetTunnel *t,
   ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
   ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
   ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
+  ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
   GNUNET_STATISTICS_update (stats,
                             "# channels",
                             1,
@@ -634,6 +630,27 @@ send_connect_ack (void *cls)
 }
 
 
+/**
+ * Send a LOCAL ACK to the client to solicit more messages.
+ *
+ * @param ch channel the ack is for
+ * @param c client to send the ACK to
+ */
+static void
+send_ack_to_client (struct CadetChannel *ch,
+                    struct CadetClient *c)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_CADET_LocalAck *ack;
+
+  env = GNUNET_MQ_msg (ack,
+                       GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
+  ack->channel_id = ch->lid;
+  GSC_send_to_client (c,
+                      env);
+}
+
+
 /**
  * A client is bound to the port that we have a channel
  * open to.  Send the acknowledgement for the connection
@@ -672,6 +689,10 @@ GCCH_bind (struct CadetChannel *ch,
   /* notify other peer that we accepted the connection */
   ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
                                              ch);
+  /* give client it's initial supply of ACKs */
+  for (unsigned int i=0;i<ch->max_pending_messages;i++)
+    send_ack_to_client (ch,
+                        ch->owner);
 }
 
 
@@ -742,12 +763,75 @@ GCCH_channel_incoming_destroy (struct CadetChannel *ch)
 void
 GCCH_handle_channel_create_ack (struct CadetChannel *ch)
 {
-  GNUNET_break (0); // FIXME!
+  switch (ch->state)
+  {
+  case CADET_CHANNEL_NEW:
+    /* this should be impossible */
+    GNUNET_break (0);
+    break;
+  case CADET_CHANNEL_CREATE_SENT:
+    if (NULL == ch->owner)
+    {
+      /* We're not the owner, wrong direction! */
+      GNUNET_break_op (0);
+      return;
+    }
+    ch->state = CADET_CHANNEL_READY;
+    /* On first connect, send client as many ACKs as we allow messages
+       to be buffered! */
+    for (unsigned int i=0;i<ch->max_pending_messages;i++)
+      send_ack_to_client (ch,
+                          ch->owner);
+    break;
+  case CADET_CHANNEL_READY:
+    /* duplicate ACK, maybe we retried the CREATE. Ignore. */
+    GNUNET_STATISTICS_update (stats,
+                              "# duplicate CREATE_ACKs",
+                              1,
+                              GNUNET_NO);
+    break;
+  }
 }
 
 
 /**
- * We got payload data for a channel.  Pass it on to the client.
+ * Test if element @a e1 comes before element @a e2.
+ *
+ * TODO: use opportunity to create generic list insertion sort
+ * logic in container!
+ *
+ * @param cls closure, our `struct CadetChannel`
+ * @param e1 an element of to sort
+ * @param e2 another element to sort
+ * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
+ */
+static int
+is_before (void *cls,
+           void *e1,
+           void *e2)
+{
+  struct CadetOutOfOrderMessage *m1 = e1;
+  struct CadetOutOfOrderMessage *m2 = e2;
+  uint32_t v1 = ntohl (m1->mid.mid);
+  uint32_t v2 = ntohl (m2->mid.mid);
+  uint32_t delta;
+
+  delta = v1 - v2;
+  if (delta > (uint32_t) INT_MAX)
+  {
+    /* in overflow range, we can safely assume we wrapped around */
+    return GNUNET_NO;
+  }
+  else
+  {
+    return GNUNET_YES;
+  }
+}
+
+
+/**
+ * We got payload data for a channel.  Pass it on to the client
+ * and send an ACK to the other end (once flow control allows it!)
  *
  * @param ch channel that got data
  */
@@ -755,7 +839,70 @@ void
 GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
                                     const struct GNUNET_CADET_ChannelAppDataMessage *msg)
 {
-  GNUNET_break (0); // FIXME!
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_CADET_LocalData *ld;
+  struct CadetOutOfOrderMessage *com;
+  size_t payload_size;
+
+  payload_size = ntohs (msg->header.size) - sizeof (*msg);
+  env = GNUNET_MQ_msg_extra (ld,
+                             payload_size,
+                             GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
+  ld->channel_id = ch->lid;
+  GNUNET_memcpy (&ld[1],
+                 &msg[1],
+                 payload_size);
+  if ( (GNUNET_YES == ch->client_ready) &&
+       ( (GNUNET_YES == ch->out_of_order) ||
+         (msg->mid.mid == ch->mid_recv.mid) ) )
+  {
+    GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
+                        env);
+    ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
+    ch->mid_futures >>= 1;
+  }
+  else
+  {
+    /* FIXME-SECURITY: if the element is WAY too far ahead,
+       drop it (can't buffer too much!) */
+    com = GNUNET_new (struct CadetOutOfOrderMessage);
+    com->mid = msg->mid;
+    com->env = env;
+    /* sort into list ordered by "is_before" */
+    if ( (NULL == ch->head_recv) ||
+         (GNUNET_YES == is_before (ch,
+                                   com,
+                                   ch->head_recv)) )
+    {
+      GNUNET_CONTAINER_DLL_insert (ch->head_recv,
+                                   ch->tail_recv,
+                                   com);
+    }
+    else
+    {
+      struct CadetOutOfOrderMessage *pos;
+
+      for (pos = ch->head_recv;
+           NULL != pos;
+           pos = pos->next)
+      {
+        if (GNUNET_YES !=
+            is_before (ch,
+                       pos,
+                       com))
+          break;
+      }
+      if (NULL == pos)
+        GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
+                                          ch->tail_recv,
+                                          com);
+      else
+        GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
+                                           ch->tail_recv,
+                                           com,
+                                           pos->prev);
+    }
+  }
 }
 
 
@@ -770,7 +917,37 @@ void
 GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
                                         const struct GNUNET_CADET_ChannelDataAckMessage *ack)
 {
-  GNUNET_break (0); // FIXME!
+  struct CadetReliableMessage *crm;
+
+  if (GNUNET_NO == ch->reliable)
+  {
+    /* not expecting ACKs on unreliable channel, odd */
+    GNUNET_break_op (0);
+    return;
+  }
+  for (crm = ch->head_sent;
+        NULL != crm;
+       crm = crm->next)
+    if (ack->mid.mid == crm->data_message.mid.mid)
+      break;
+  if (NULL == crm)
+  {
+    /* ACK for message we already dropped, might have been a
+       duplicate ACK? Ignore. */
+    GNUNET_STATISTICS_update (stats,
+                              "# duplicate CHANNEL_DATA_ACKs",
+                              1,
+                              GNUNET_NO);
+    return;
+  }
+  GNUNET_CONTAINER_DLL_remove (ch->head_sent,
+                               ch->tail_sent,
+                               crm);
+  ch->pending_messages--;
+  GNUNET_free (crm);
+  GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
+  send_ack_to_client (ch,
+                      (NULL == ch->owner) ? ch->dest : ch->owner);
 }
 
 
@@ -1026,6 +1203,8 @@ send_client_buffered_data (struct CadetChannel *ch)
   GNUNET_CONTAINER_DLL_remove (ch->head_recv,
                                ch->tail_recv,
                                com);
+  /* FIXME: if unreliable, this is not aggressive
+     enough, as it would be OK to have lost some! */
   ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
   ch->mid_futures >>= 1; /* equivalent to division by 2 */
   GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
index e1f6ac4c37db39414a05e29c1149c4bf1d82f574..c476f6ac269d33a58cc75b87de9cb8213143f03c 100644 (file)
@@ -586,7 +586,7 @@ handle_data (void *cls, struct GNUNET_SERVER_Client *client,
     return;
   }
 
-  chid = msg->id;
+  chid = msg->channel_id;
   LOG (GNUNET_ERROR_TYPE_DEBUG, "  %u bytes (%u payload) by client %u\n",
        payload_size, payload_claimed_size, c->id);
 
@@ -1531,7 +1531,7 @@ GML_send_data (struct CadetClient *c,
   GNUNET_memcpy (&copy[1], &msg[1], size);
   copy->header.size = htons (sizeof (struct GNUNET_CADET_LocalData) + size);
   copy->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
-  copy->id = id;
+  copy->channel_id = id;
   GNUNET_SERVER_notification_context_unicast (nc, c->handle,
                                               &copy->header, GNUNET_NO);
 }