Only call window_size on incrementing size, allow ACK+1 messages to be taken
authorBart Polot <bart.polot+voyager@gmail.com>
Thu, 2 Feb 2017 17:13:00 +0000 (18:13 +0100)
committerBart Polot <bart.polot+voyager@gmail.com>
Thu, 2 Feb 2017 17:13:00 +0000 (18:13 +0100)
src/cadet/cadet_api.c

index d3bb1abd6f10dce703526a19f373407400978ddb..c07b10f5be7e86b36bed03076c2a9c5fcd1f4558 100644 (file)
@@ -295,6 +295,11 @@ struct GNUNET_CADET_Channel
    */
   struct GNUNET_SCHEDULER_Task *mq_cont;
 
+  /**
+   * Pending envelope in case we don't have an ACK from the service.
+   */
+  struct GNUNET_MQ_Envelope *pending_env;
+
   /**
    * Window change handler.
    */
@@ -552,7 +557,11 @@ destroy_channel (struct GNUNET_CADET_Channel *ch)
   GNUNET_CONTAINER_DLL_remove (h->channels_head,
                                h->channels_tail,
                                ch);
-
+  if (NULL != ch->mq_cont)
+  {
+    GNUNET_SCHEDULER_cancel (ch->mq_cont);
+    ch->mq_cont = NULL;
+  }
   /* signal channel destruction */
   if (0 != ch->peer)
   {
@@ -663,6 +672,7 @@ cadet_mq_send_continue (void *cls)
 {
   struct GNUNET_CADET_Channel *ch = cls;
 
+  ch->mq_cont = NULL;
   GNUNET_MQ_impl_send_continue (ch->mq);
 }
 
@@ -710,15 +720,21 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
                                  GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
                                  msg);
   cadet_msg->ccn = ch->ccn;
-  GNUNET_MQ_send (h->mq, env);
 
-  GNUNET_assert (0 < ch->allow_send);
-  ch->allow_send--;
-  notify_window_size (ch);
   if (0 < ch->allow_send)
   {
+    /* Service has allowed this message, just send it and continue accepting */
+    GNUNET_MQ_send (h->mq, env);
+    ch->allow_send--;
     ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
-  } /* Otherwise it will be called upon ACK receipt. */
+    // notify_window_size (ch); /* FIXME add "verbose" setting? */
+  }
+  else
+  {
+    /* Service has NOT allowed this message, queue it and wait for an ACK */
+    GNUNET_assert (NULL != ch->pending_env);
+    ch->pending_env = env;
+  }
 }
 
 
@@ -763,6 +779,7 @@ cadet_mq_error_handler (void *cls,
  * @param mq message queue
  * @param impl_state state specific to the implementation
  */
+
 static void
 cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
                      void *impl_state)
@@ -1070,15 +1087,24 @@ handle_local_ack (void *cls,
     return;
   }
   ch->allow_send++;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Got an ACK on channel %X, allow send now %u!\n",
-       ntohl (ch->ccn.channel_of_client),
-       ch->allow_send);
   if (NULL != ch->mq)
   {
-    notify_window_size (ch);
-    if (1 == ch->allow_send)
+    if (NULL == ch->pending_env)
     {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Got an ACK on mq channel %X, allow send now %u!\n",
+           ntohl (ch->ccn.channel_of_client),
+           ch->allow_send);
+      notify_window_size (ch);
+    }
+    else
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Got an ACK on channel %X, sending pending message!\n",
+           ntohl (ch->ccn.channel_of_client));
+      GNUNET_MQ_send (h->mq, ch->pending_env);
+      ch->allow_send--;
+      ch->pending_env = NULL;
       ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch);
     }
     return;