Implement data ack in CADET MQ API
authorBart Polot <bart.polot+voyager@gmail.com>
Thu, 2 Feb 2017 16:16:20 +0000 (17:16 +0100)
committerBart Polot <bart.polot+voyager@gmail.com>
Thu, 2 Feb 2017 16:16:20 +0000 (17:16 +0100)
src/cadet/cadet_api.c
src/include/gnunet_cadet_service.h

index 7640a924a1560204e22cfcaccfdfe15654cad5cd..d3bb1abd6f10dce703526a19f373407400978ddb 100644 (file)
@@ -290,6 +290,11 @@ struct GNUNET_CADET_Channel
    */
   struct GNUNET_MQ_Handle *mq;
 
+  /**
+   * Task to allow mq to send more traffic.
+   */
+  struct GNUNET_SCHEDULER_Task *mq_cont;
+
   /**
    * Window change handler.
    */
@@ -630,10 +635,36 @@ remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
 }
 
 
+/**
+ * Notify the application about a change in the window size (if needed).
+ *
+ * @param ch Channel to notify about.
+ */
+static void
+notify_window_size (struct GNUNET_CADET_Channel *ch)
+{
+  if (NULL != ch->window_changes)
+  {
+    ch->window_changes (ch->ctx, ch, ch->allow_send);
+  }
+}
+
 /******************************************************************************/
 /***********************      MQ API CALLBACKS     ****************************/
 /******************************************************************************/
 
+/**
+ * Allow the MQ implementation to send the next message.
+ *
+ * @param cls Closure (channel whose mq to activate).
+ */
+static void
+cadet_mq_send_continue (void *cls)
+{
+  struct GNUNET_CADET_Channel *ch = cls;
+
+  GNUNET_MQ_impl_send_continue (ch->mq);
+}
 
 /**
  * Implement sending functionality of a message queue for
@@ -680,7 +711,14 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
                                  msg);
   cadet_msg->ccn = ch->ccn;
   GNUNET_MQ_send (h->mq, env);
-  GNUNET_MQ_impl_send_continue (mq);
+
+  GNUNET_assert (0 < ch->allow_send);
+  ch->allow_send--;
+  notify_window_size (ch);
+  if (0 < ch->allow_send)
+  {
+    ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
+  } /* Otherwise it will be called upon ACK receipt. */
 }
 
 
@@ -1012,8 +1050,6 @@ handle_local_data (void *cls,
  *
  * @param h Cadet handle.
  * @param message Message itself.
- *
- * FIXME either delete or port to MQ
  */
 static void
 handle_local_ack (void *cls,
@@ -1038,6 +1074,18 @@ handle_local_ack (void *cls,
        "Got an ACK on channel %X, allow send now %u!\n",
        ntohl (ch->ccn.channel_of_client),
        ch->allow_send);
+  if (NULL != ch->mq)
+  {
+    notify_window_size (ch);
+    if (1 == ch->allow_send)
+    {
+      ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
+    }
+    return;
+  }
+
+  /** @deprecated */
+  /* Old style API */
   for (th = h->th_head; NULL != th; th = th->next)
   {
     if ( (th->channel == ch) &&
index 1434180f49dc8b5bbc930a0ace82a71b96070b81..1b3aac7c9315ac4c6f6a6cd73d9b2a193b6cf7d6 100644 (file)
@@ -702,9 +702,6 @@ GC_u2h (uint32_t port);
  * @param cls Closure from #GNUNET_CADET_open_porT.
  * @param channel New handle to the channel.
  * @param source Peer that started this channel.
- * FIXME: Add port that this channel is created for, or is cls enough?
- *        Port cannot be closed yet, #handle_channel_create would have
- *        rejected it.
  * @return Closure for the incoming @a channel. It's given to:
  *         - The #GNUNET_CADET_DisconnectEventHandler (given to
  *           #GNUNET_CADET_open_porT) when the channel dies.