add basic handling of fragment acks
authorChristian Grothoff <christian@grothoff.org>
Mon, 15 Apr 2019 20:42:26 +0000 (22:42 +0200)
committerChristian Grothoff <christian@grothoff.org>
Mon, 15 Apr 2019 20:42:26 +0000 (22:42 +0200)
src/transport/gnunet-service-tng.c

index 568e5b1d7449429116d0fcafee265b9a6dab6485..6a8a3fc4d574b1fceaeb69291c8a305ef85d4343 100644 (file)
@@ -34,8 +34,6 @@
  *
  * Implement next:
  * - DV data structures:
- *   + initiation of DV learn (incl. RTT measurement logic!)
- *     - security considerations? add signatures to routes? initiator signature?
  *   + using DV routes!
  *     - handling of DV-boxed messages that need to be forwarded
  *     - route_message implementation, including using DV data structures
 #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
 
 /**
- * We only consider queues as "quality" connections when      
- * suppressing the generation of DV initiation messages if 
+ * We only consider queues as "quality" connections when
+ * suppressing the generation of DV initiation messages if
  * the latency of the queue is below this threshold.
  */
 #define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
  * do we need to have to suppress initiating DV learn messages?
  */
 #define DV_LEARN_QUALITY_THRESHOLD 100
-  
+
 /**
  * When do we forget an invalid address for sure?
  */
@@ -817,7 +815,7 @@ enum ClientType
 
 
 /**
- * When did we launch this DV learning activity? 
+ * When did we launch this DV learning activity?
  */
 struct LearnLaunchEntry
 {
@@ -3737,6 +3735,49 @@ handle_fragment_box (void *cls,
 }
 
 
+/**
+ * Check the @a fa against the fragments associated with @a pm.
+ * If it matches, remove the matching fragments from the transmission
+ * list.
+ *
+ * @param pm pending message to check against the ack
+ * @param fa the ack that was received
+ * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
+ */
+static int
+check_ack_against_pm (struct PendingMessage *pm,
+                      const struct TransportFragmentAckMessage *fa)
+{
+  int match;
+  struct PendingMessage *nxt;
+  uint32_t fs = ntohl (fa->frag_uuid);
+  uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
+
+  match = GNUNET_NO;
+  for (struct PendingMessage *frag = pm->head_frag;
+       NULL != frag;
+       frag = nxt)
+  {
+    const struct TransportFragmentBox *tfb
+      = (const struct TransportFragmentBox *) &pm[1];
+    uint32_t fu = ntohl (tfb->frag_uuid);
+
+    GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
+    nxt = frag->next_frag;
+    /* Check for exact match or match in the 'xtra' bitmask */
+    if ( (fu == fs) ||
+         ( (fu > fs) &&
+           (fu <= fs + 64) &&
+           (0 != (1LLU << (fu - fs - 1) & xtra)) ) )
+    {
+      match = GNUNET_YES;
+      free_fragment_tree (frag);
+    }
+  }
+  return match;
+}
+
+
 /**
  * Communicator gave us a fragment acknowledgement.  Process the request.
  *
@@ -3748,11 +3789,76 @@ handle_fragment_ack (void *cls,
                      const struct TransportFragmentAckMessage *fa)
 {
   struct CommunicatorMessageContext *cmc = cls;
+  struct Neighbour *n;
+  int matched;
 
-  // FIXME: do work: identify original message; then identify fragments being acked;
-  // remove those from the tree to prevent retransmission;
-  // compute RTT
-  // if entire message is ACKed, handle that as well.
+  n = GNUNET_CONTAINER_multipeermap_get (neighbours,
+                                         &cmc->im.sender);
+  if (NULL == n)
+  {
+    struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+
+    GNUNET_break (0);
+    finish_cmc_handling (cmc);
+    GNUNET_SERVICE_client_drop (client);
+    return;
+  }
+  /* FIXME-OPTIMIZE: maybe use another hash map here? */
+  matched = GNUNET_NO;
+  for (struct PendingMessage *pm = n->pending_msg_head;
+       NULL != pm;
+       pm = pm->prev_neighbour)
+  {
+    if (0 !=
+        GNUNET_memcmp (&fa->msg_uuid,
+                       &pm->msg_uuid))
+      continue;
+    matched = GNUNET_YES;
+    if (GNUNET_YES ==
+        check_ack_against_pm (pm,
+                              fa))
+    {
+      struct GNUNET_TIME_Relative avg_ack_delay
+        = GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
+      // FIXME: update RTT and other reliability data!
+      // ISSUE: we don't know which of n's queues the message(s)
+      // took (and in fact the different messages might have gone
+      // over different queues and possibly over multiple).
+      // => track queues with PendingMessages, and update RTT only if
+      //    the queue used is unique?
+      //    -> how can we get loss rates?
+      //    -> or, add extra state to Box and ACK to identify queue?
+      (void) avg_ack_delay;
+    }
+    else
+    {
+      GNUNET_STATISTICS_update (GST_stats,
+                                "# FRAGMENT_ACKS dropped, no matching fragment",
+                                1,
+                                GNUNET_NO);
+    }
+    if (NULL == pm->head_frag)
+    {
+      // if entire message is ACKed, handle that as well.
+      // => clean up PM, any post actions?
+      free_pending_message (pm);
+    }
+    else
+    {
+      struct GNUNET_TIME_Relative reassembly_timeout
+        = GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
+      // OPTIMIZE-FIXME: adjust retransmission strategy based on reassembly_timeout!
+      (void) reassembly_timeout;
+    }
+    break;
+  }
+  if (GNUNET_NO == matched)
+  {
+    GNUNET_STATISTICS_update (GST_stats,
+                              "# FRAGMENT_ACKS dropped, no matching pending message",
+                              1,
+                              GNUNET_NO);
+  }
   finish_cmc_handling (cmc);
 }
 
@@ -4921,8 +5027,8 @@ set_pending_message_uuid (struct PendingMessage *pm)
   if (pm->msg_uuid_set)
     return;
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
-                             &pm->msg_uuid,
-                             sizeof (pm->msg_uuid));
+                              &pm->msg_uuid,
+                              sizeof (pm->msg_uuid));
   pm->msg_uuid_set = GNUNET_YES;
 }
 
@@ -4939,7 +5045,7 @@ set_pending_message_uuid (struct PendingMessage *pm)
  */
 static struct PendingMessage *
 fragment_message (struct PendingMessage *pm,
-                 uint16_t mtu)
+                  uint16_t mtu)
 {
   struct PendingMessage *ff;
 
@@ -4952,15 +5058,15 @@ fragment_message (struct PendingMessage *pm,
      been expanded until we are at a leaf or at a fragment that is small enough */
   ff = pm;
   while ( ( (ff->bytes_msg > mtu) ||
-           (pm == ff) ) &&
-         (ff->frag_off == ff->bytes_msg) &&
-         (NULL != ff->head_frag) )
+            (pm == ff) ) &&
+          (ff->frag_off == ff->bytes_msg) &&
+          (NULL != ff->head_frag) )
   {
     ff = ff->head_frag; /* descent into fragmented fragments */
   }
 
   if ( ( (ff->bytes_msg > mtu) ||
-        (pm == ff) ) &&
+         (pm == ff) ) &&
        (pm->frag_off < pm->bytes_msg) )
   {
     /* Did not yet calculate all fragments, calculate next fragment */
@@ -4986,10 +5092,10 @@ fragment_message (struct PendingMessage *pm,
     }
     fragmax = mtu - sizeof (struct TransportFragmentBox);
     fragsize = GNUNET_MIN (msize - ff->frag_off,
-                          fragmax);
+                           fragmax);
     frag = GNUNET_malloc (sizeof (struct PendingMessage) +
-                         sizeof (struct TransportFragmentBox) +
-                         fragsize);
+                          sizeof (struct TransportFragmentBox) +
+                          fragsize);
     frag->target = pm->target;
     frag->frag_parent = ff;
     frag->timeout = pm->timeout;
@@ -4998,21 +5104,21 @@ fragment_message (struct PendingMessage *pm,
     msg = (char *) &frag[1];
     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
     tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
-                            fragsize);
+                             fragsize);
     tfb.frag_uuid = htonl (pm->frag_uuidgen++);
     tfb.msg_uuid = pm->msg_uuid;
     tfb.frag_off = htons (ff->frag_off + xoff);
     tfb.msg_size = htons (pm->bytes_msg);
     memcpy (msg,
-           &tfb,
-           sizeof (tfb));
+            &tfb,
+            sizeof (tfb));
     memcpy (&msg[sizeof (tfb)],
-           &orig[ff->frag_off],
-           fragsize);
+            &orig[ff->frag_off],
+            fragsize);
     GNUNET_CONTAINER_MDLL_insert (frag,
-                                 ff->head_frag,
-                                 ff->tail_frag,
-                                 frag);
+                                  ff->head_frag,
+                                  ff->tail_frag,
+                                  frag);
     ff->frag_off += fragsize;
     ff = frag;
   }
@@ -5322,7 +5428,7 @@ static void
 tracker_excess_out_cb (void *cls)
 {
   (void) cls;
-    
+
   /* FIXME: trigger excess bandwidth report to core? Right now,
      this is done internally within transport_api2_core already,
      but we probably want to change the logic and trigger it
@@ -5719,7 +5825,7 @@ struct QueueQualityContext
 {
   /**
    * Set to the @e k'th queue encountered.
-   */ 
+   */
   struct Queue *q;
 
   /**
@@ -5729,7 +5835,7 @@ struct QueueQualityContext
 
   /**
    * Set to the total number of queues encountered.
-   */ 
+   */
   unsigned int num_queues;
 
   /**
@@ -5784,7 +5890,7 @@ check_connection_quality (void *cls,
 
 
 /**
- * Task run when we CONSIDER initiating a DV learn 
+ * Task run when we CONSIDER initiating a DV learn
  * process. We first check that sending out a message is
  * even possible (queues exist), then that it is desirable
  * (if not, reschedule the task for later), and finally
@@ -5882,15 +5988,15 @@ start_dv_learn (void *cls)
                                          &check_connection_quality,
                                          &qqc);
   GNUNET_assert (NULL != qqc.q);
-  
+
   /* Do this as close to transmission time as possible! */
-  lle->launch_time = GNUNET_TIME_absolute_get (); 
+  lle->launch_time = GNUNET_TIME_absolute_get ();
   // FIXME: not so easy, need to BOX this message
   // in a transmission request! (mistake also done elsewhere!)
   GNUNET_MQ_send (qqc.q->tc->mq,
                   env);
 
-  /* reschedule this job, randomizing the time it runs (but no 
+  /* reschedule this job, randomizing the time it runs (but no
      actual backoff!) */
   dvlearn_task
     = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY),