From: Christian Grothoff Date: Wed, 13 Jul 2011 11:07:19 +0000 (+0000) Subject: revised fragmentation API for blocking writes X-Git-Tag: initial-import-from-subversion-38251~17894 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=c94be01c832866f9c0169963c658e36dd3329cc2;p=oweals%2Fgnunet.git revised fragmentation API for blocking writes --- diff --git a/src/fragmentation/defragmentation_new.c b/src/fragmentation/defragmentation_new.c index 379ec57d4..cc42d3e75 100644 --- a/src/fragmentation/defragmentation_new.c +++ b/src/fragmentation/defragmentation_new.c @@ -161,7 +161,7 @@ struct GNUNET_DEFRAGMENT_Context /** * Function to call with acknowledgements. */ - GNUNET_FRAGMENT_MessageProcessor ackp; + GNUNET_DEFRAGMENT_AckProcessor ackp; /** * Running average of the latency (delay between messages) for this @@ -207,7 +207,7 @@ GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, unsigned int num_msgs, void *cls, GNUNET_FRAGMENT_MessageProcessor proc, - GNUNET_FRAGMENT_MessageProcessor ackp) + GNUNET_DEFRAGMENT_AckProcessor ackp) { struct GNUNET_DEFRAGMENT_Context *dc; @@ -270,7 +270,7 @@ send_ack (void *cls, fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK); fa.fragment_id = htonl (mc->fragment_id); fa.bits = GNUNET_htonll (mc->bits); - dc->ackp (dc->cls, &fa.header); + dc->ackp (dc->cls, mc->fragment_id, &fa.header); } diff --git a/src/fragmentation/fragmentation_new.c b/src/fragmentation/fragmentation_new.c index dbbaa859b..db66f5a5b 100644 --- a/src/fragmentation/fragmentation_new.c +++ b/src/fragmentation/fragmentation_new.c @@ -88,10 +88,15 @@ struct GNUNET_FRAGMENT_Context */ unsigned int next_transmission; + /** + * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done' + */ + int8_t proc_busy; + /** * GNUNET_YES if we are waiting for an ACK. */ - int wack; + int8_t wack; /** * Target fragment size. @@ -122,6 +127,7 @@ transmit_next (void *cls, int wrap; fc->task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (GNUNET_NO == fc->proc_busy); if (0 == fc->acks) return; /* all done */ @@ -194,9 +200,7 @@ transmit_next (void *cls, fc->last_round = GNUNET_TIME_absolute_get (); fc->wack = GNUNET_YES; } - fc->task = GNUNET_SCHEDULER_add_delayed (delay, - &transmit_next, - fc); + fc->proc_busy = GNUNET_YES; fc->proc (fc->proc_cls, &fh->header); } @@ -264,6 +268,24 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, } +/** + * Continuation to call from the 'proc' function after the fragment + * has been transmitted (and hence the next fragment can now be + * given to proc). + * + * @param fc fragmentation context + */ +void +GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc) +{ + GNUNET_assert (fc->proc_busy == GNUNET_YES); + fc->proc_busy = GNUNET_NO; + GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK); + fc->task = GNUNET_SCHEDULER_add_now (&transmit_next, + fc); +} + + /** * Process an acknowledgement message we got from the other * side (to control re-transmits). diff --git a/src/fragmentation/test_fragmentation.c b/src/fragmentation/test_fragmentation.c index 8a9af04e7..1a231bbb4 100644 --- a/src/fragmentation/test_fragmentation.c +++ b/src/fragmentation/test_fragmentation.c @@ -42,7 +42,7 @@ /** * Simulate dropping of 1 out of how many messages? (must be > 1) */ -#define DROPRATE 2 +#define DROPRATE 10 static int ret = 1; @@ -102,6 +102,7 @@ proc_msgs (void *cls, */ static void proc_acks (void *cls, + uint32_t msg_id, const struct GNUNET_MessageHeader *hdr) { unsigned int i; @@ -150,8 +151,10 @@ static void proc_frac (void *cls, const struct GNUNET_MessageHeader *hdr) { + struct GNUNET_FRAGMENT_Context **fc = cls; int ret; + GNUNET_FRAGMENT_context_transmission_done (*fc); if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, DROPRATE)) { frag_drops++; @@ -211,7 +214,7 @@ run (void *cls, GNUNET_TIME_UNIT_SECONDS, msg, &proc_frac, - NULL); + &frags[i]); } } diff --git a/src/include/gnunet_fragmentation_lib.h b/src/include/gnunet_fragmentation_lib.h index bb29a5fba..e91e74c6f 100644 --- a/src/include/gnunet_fragmentation_lib.h +++ b/src/include/gnunet_fragmentation_lib.h @@ -49,8 +49,10 @@ struct GNUNET_FRAGMENT_Context; /** - * Function that is called with messages - * created by the fragmentation module. + * Function that is called with messages created by the fragmentation + * module. In the case of the 'proc' callback of the + * GNUNET_FRAGMENT_context_create function, this function must + * eventually call 'GNUNET_FRAGMENT_context_transmission_done'. * * @param cls closure * @param msg the message that was created @@ -87,6 +89,17 @@ GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, void *proc_cls); +/** + * Continuation to call from the 'proc' function after the fragment + * has been transmitted (and hence the next fragment can now be + * given to proc). + * + * @param fc fragmentation context + */ +void +GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc); + + /** * Process an acknowledgement message we got from the other * side (to control re-transmits). @@ -120,6 +133,21 @@ GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc); struct GNUNET_DEFRAGMENT_Context; +/** + * Function that is called with acknowledgement messages created by + * the fragmentation module. Acknowledgements are cummulative, + * so it is OK to only transmit the 'latest' ack message for the same + * message ID. + * + * @param cls closure + * @param id unique message ID (modulo collisions) + * @param msg the message that was created + */ +typedef void (*GNUNET_DEFRAGMENT_AckProcessor) (void *cls, + uint32_t id, + const struct GNUNET_MessageHeader *msg); + + /** * Create a defragmentation context. * @@ -139,7 +167,7 @@ GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, unsigned int num_msgs, void *cls, GNUNET_FRAGMENT_MessageProcessor proc, - GNUNET_FRAGMENT_MessageProcessor ackp); + GNUNET_DEFRAGMENT_AckProcessor ackp); /**