Merge branch 'master' of git+ssh://gnunet.org/gnunet
authorBart Polot <bart.polot+voyager@gmail.com>
Wed, 8 Feb 2017 01:28:25 +0000 (02:28 +0100)
committerBart Polot <bart.polot+voyager@gmail.com>
Wed, 8 Feb 2017 01:28:25 +0000 (02:28 +0100)
src/cadet/cadet_api.c
src/include/gnunet_cadet_service.h

index 7640a924a1560204e22cfcaccfdfe15654cad5cd..c07b10f5be7e86b36bed03076c2a9c5fcd1f4558 100644 (file)
@@ -290,6 +290,16 @@ struct GNUNET_CADET_Channel
    */
   struct GNUNET_MQ_Handle *mq;
 
+  /**
+   * Task to allow mq to send more traffic.
+   */
+  struct GNUNET_SCHEDULER_Task *mq_cont;
+
+  /**
+   * Pending envelope in case we don't have an ACK from the service.
+   */
+  struct GNUNET_MQ_Envelope *pending_env;
+
   /**
    * Window change handler.
    */
@@ -547,7 +557,11 @@ destroy_channel (struct GNUNET_CADET_Channel *ch)
   GNUNET_CONTAINER_DLL_remove (h->channels_head,
                                h->channels_tail,
                                ch);
-
+  if (NULL != ch->mq_cont)
+  {
+    GNUNET_SCHEDULER_cancel (ch->mq_cont);
+    ch->mq_cont = NULL;
+  }
   /* signal channel destruction */
   if (0 != ch->peer)
   {
@@ -630,10 +644,37 @@ remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
 }
 
 
+/**
+ * Notify the application about a change in the window size (if needed).
+ *
+ * @param ch Channel to notify about.
+ */
+static void
+notify_window_size (struct GNUNET_CADET_Channel *ch)
+{
+  if (NULL != ch->window_changes)
+  {
+    ch->window_changes (ch->ctx, ch, ch->allow_send);
+  }
+}
+
 /******************************************************************************/
 /***********************      MQ API CALLBACKS     ****************************/
 /******************************************************************************/
 
+/**
+ * Allow the MQ implementation to send the next message.
+ *
+ * @param cls Closure (channel whose mq to activate).
+ */
+static void
+cadet_mq_send_continue (void *cls)
+{
+  struct GNUNET_CADET_Channel *ch = cls;
+
+  ch->mq_cont = NULL;
+  GNUNET_MQ_impl_send_continue (ch->mq);
+}
 
 /**
  * Implement sending functionality of a message queue for
@@ -679,8 +720,21 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
                                  GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
                                  msg);
   cadet_msg->ccn = ch->ccn;
-  GNUNET_MQ_send (h->mq, env);
-  GNUNET_MQ_impl_send_continue (mq);
+
+  if (0 < ch->allow_send)
+  {
+    /* Service has allowed this message, just send it and continue accepting */
+    GNUNET_MQ_send (h->mq, env);
+    ch->allow_send--;
+    ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
+    // notify_window_size (ch); /* FIXME add "verbose" setting? */
+  }
+  else
+  {
+    /* Service has NOT allowed this message, queue it and wait for an ACK */
+    GNUNET_assert (NULL != ch->pending_env);
+    ch->pending_env = env;
+  }
 }
 
 
@@ -725,6 +779,7 @@ cadet_mq_error_handler (void *cls,
  * @param mq message queue
  * @param impl_state state specific to the implementation
  */
+
 static void
 cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
                      void *impl_state)
@@ -1012,8 +1067,6 @@ handle_local_data (void *cls,
  *
  * @param h Cadet handle.
  * @param message Message itself.
- *
- * FIXME either delete or port to MQ
  */
 static void
 handle_local_ack (void *cls,
@@ -1034,10 +1087,31 @@ handle_local_ack (void *cls,
     return;
   }
   ch->allow_send++;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Got an ACK on channel %X, allow send now %u!\n",
-       ntohl (ch->ccn.channel_of_client),
-       ch->allow_send);
+  if (NULL != ch->mq)
+  {
+    if (NULL == ch->pending_env)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Got an ACK on mq channel %X, allow send now %u!\n",
+           ntohl (ch->ccn.channel_of_client),
+           ch->allow_send);
+      notify_window_size (ch);
+    }
+    else
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Got an ACK on channel %X, sending pending message!\n",
+           ntohl (ch->ccn.channel_of_client));
+      GNUNET_MQ_send (h->mq, ch->pending_env);
+      ch->allow_send--;
+      ch->pending_env = NULL;
+      ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
+    }
+    return;
+  }
+
+  /** @deprecated */
+  /* Old style API */
   for (th = h->th_head; NULL != th; th = th->next)
   {
     if ( (th->channel == ch) &&
index 1434180f49dc8b5bbc930a0ace82a71b96070b81..1b3aac7c9315ac4c6f6a6cd73d9b2a193b6cf7d6 100644 (file)
@@ -702,9 +702,6 @@ GC_u2h (uint32_t port);
  * @param cls Closure from #GNUNET_CADET_open_porT.
  * @param channel New handle to the channel.
  * @param source Peer that started this channel.
- * FIXME: Add port that this channel is created for, or is cls enough?
- *        Port cannot be closed yet, #handle_channel_create would have
- *        rejected it.
  * @return Closure for the incoming @a channel. It's given to:
  *         - The #GNUNET_CADET_DisconnectEventHandler (given to
  *           #GNUNET_CADET_open_porT) when the channel dies.