From: Bart Polot Date: Thu, 2 Feb 2017 16:16:20 +0000 (+0100) Subject: Implement data ack in CADET MQ API X-Git-Tag: taler-0.2.1~199 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=17047b7bcbe3f1756028058a9887416c6afab5d8;p=oweals%2Fgnunet.git Implement data ack in CADET MQ API --- diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c index 7640a924a..d3bb1abd6 100644 --- a/src/cadet/cadet_api.c +++ b/src/cadet/cadet_api.c @@ -290,6 +290,11 @@ struct GNUNET_CADET_Channel */ struct GNUNET_MQ_Handle *mq; + /** + * Task to allow mq to send more traffic. + */ + struct GNUNET_SCHEDULER_Task *mq_cont; + /** * Window change handler. */ @@ -630,10 +635,36 @@ remove_from_queue (struct GNUNET_CADET_TransmitHandle *th) } +/** + * Notify the application about a change in the window size (if needed). + * + * @param ch Channel to notify about. + */ +static void +notify_window_size (struct GNUNET_CADET_Channel *ch) +{ + if (NULL != ch->window_changes) + { + ch->window_changes (ch->ctx, ch, ch->allow_send); + } +} + /******************************************************************************/ /*********************** MQ API CALLBACKS ****************************/ /******************************************************************************/ +/** + * Allow the MQ implementation to send the next message. + * + * @param cls Closure (channel whose mq to activate). + */ +static void +cadet_mq_send_continue (void *cls) +{ + struct GNUNET_CADET_Channel *ch = cls; + + GNUNET_MQ_impl_send_continue (ch->mq); +} /** * Implement sending functionality of a message queue for @@ -680,7 +711,14 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq, msg); cadet_msg->ccn = ch->ccn; GNUNET_MQ_send (h->mq, env); - GNUNET_MQ_impl_send_continue (mq); + + GNUNET_assert (0 < ch->allow_send); + ch->allow_send--; + notify_window_size (ch); + if (0 < ch->allow_send) + { + ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch); + } /* Otherwise it will be called upon ACK receipt. */ } @@ -1012,8 +1050,6 @@ handle_local_data (void *cls, * * @param h Cadet handle. * @param message Message itself. - * - * FIXME either delete or port to MQ */ static void handle_local_ack (void *cls, @@ -1038,6 +1074,18 @@ handle_local_ack (void *cls, "Got an ACK on channel %X, allow send now %u!\n", ntohl (ch->ccn.channel_of_client), ch->allow_send); + if (NULL != ch->mq) + { + notify_window_size (ch); + if (1 == ch->allow_send) + { + ch->mq_cont = GNUNET_SCHEDULER_add_now (&cadet_mq_send_continue, ch); + } + return; + } + + /** @deprecated */ + /* Old style API */ for (th = h->th_head; NULL != th; th = th->next) { if ( (th->channel == ch) && diff --git a/src/include/gnunet_cadet_service.h b/src/include/gnunet_cadet_service.h index 1434180f4..1b3aac7c9 100644 --- a/src/include/gnunet_cadet_service.h +++ b/src/include/gnunet_cadet_service.h @@ -702,9 +702,6 @@ GC_u2h (uint32_t port); * @param cls Closure from #GNUNET_CADET_open_porT. * @param channel New handle to the channel. * @param source Peer that started this channel. - * FIXME: Add port that this channel is created for, or is cls enough? - * Port cannot be closed yet, #handle_channel_create would have - * rejected it. * @return Closure for the incoming @a channel. It's given to: * - The #GNUNET_CADET_DisconnectEventHandler (given to * #GNUNET_CADET_open_porT) when the channel dies.