From: Christian Grothoff Date: Sat, 9 Jul 2011 17:25:43 +0000 (+0000) Subject: defrag X-Git-Tag: initial-import-from-subversion-38251~17917 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=1e463296e3635a4a82584a3c4e205e096ce53ad1;p=oweals%2Fgnunet.git defrag --- diff --git a/src/fragmentation/defragmentation_new.c b/src/fragmentation/defragmentation_new.c index 523a2cede..775885158 100644 --- a/src/fragmentation/defragmentation_new.c +++ b/src/fragmentation/defragmentation_new.c @@ -35,7 +35,7 @@ struct FragTimes /** * The time the fragment was received. */ - GNUNET_TIME_Absolute time; + struct GNUNET_TIME_Absolute time; /** * Number of the bit for the fragment (in [0,..,63]). @@ -45,7 +45,10 @@ struct FragTimes /** - * Information we keep for one message that is being assembled. + * Information we keep for one message that is being assembled. Note + * that we keep the context around even after the assembly is done to + * handle 'stray' messages that are received 'late'. A message + * context is ONLY discarded when the queue gets too big. */ struct MessageContext { @@ -59,6 +62,11 @@ struct MessageContext */ struct MessageContext *prev; + /** + * Associated defragmentation context. + */ + struct GNUNET_DEFRAGMENT_Context *dc; + /** * Pointer to the assembled message, allocated at the * end of this struct. @@ -76,7 +84,7 @@ struct MessageContext * Task scheduled for transmitting the next ACK to the * other peer. */ - struct GNUNET_SCHEDULER_TaskIdentifier ack_task; + GNUNET_SCHEDULER_TaskIdentifier ack_task; /** * When did we receive which fragment? Used to calculate @@ -126,11 +134,6 @@ struct GNUNET_DEFRAGMENT_Context */ struct GNUNET_STATISTICS_Handle *stats; - /** - * Closure for 'proc' and 'ackp'. - */ - void *cls; - /** * Head of list of messages we're defragmenting. */ @@ -141,6 +144,11 @@ struct GNUNET_DEFRAGMENT_Context */ struct MessageContext *tail; + /** + * Closure for 'proc' and 'ackp'. + */ + void *cls; + /** * Function to call with defragmented messages. */ @@ -169,7 +177,10 @@ struct GNUNET_DEFRAGMENT_Context */ unsigned int list_size; - + /** + * Maximum message size for each fragment. + */ + uint16_t mtu; }; @@ -177,6 +188,7 @@ struct GNUNET_DEFRAGMENT_Context * Create a defragmentation context. * * @param stats statistics context + * @param mtu the maximum message size for each fragment * @param num_msgs how many fragmented messages * to we defragment at most at the same time? * @param cls closure for proc and ackp @@ -187,6 +199,7 @@ struct GNUNET_DEFRAGMENT_Context */ struct GNUNET_DEFRAGMENT_Context * GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, + uint16_t mtu, unsigned int num_msgs, void *cls, GNUNET_FRAGMENT_MessageProcessor proc, @@ -200,6 +213,7 @@ GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, dc->proc = proc; dc->ackp = ackp; dc->num_msgs = num_msgs; + dc->mtu = mtu; dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */ return dc; } @@ -213,10 +227,49 @@ GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, void GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc) { + struct MessageContext *mc; + + while (NULL != (mc = dc->head)) + { + GNUNET_CONTAINER_DLL_remove (dc->head, + dc->tail, + mc); + dc->list_size--; + if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) + { + GNUNET_SCHEDULER_cancel (mc->ack_task); + mc->ack_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_free (mc); + } + GNUNET_assert (0 == dc->list_size); GNUNET_free (dc); } +/** + * Send acknowledgement to the other peer now. + * + * @param cls the message context + * @param tc the scheduler context + */ +static void +send_ack (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct MessageContext *mc = cls; + struct GNUNET_DEFRAGMENT_Context *dc = mc->dc; + struct FragmentAcknowledgement fa; + + mc->ack_task = GNUNET_SCHEDULER_NO_TASK; + fa.header.size = htons (sizeof (struct FragmentAcknowledgement)); + fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK); + fa.fragment_id = htonl (mc->fragment_id); + fa.bits = GNUNET_htonll (mc->bits); + dc->ackp (dc->cls, &fa.header); +} + + /** * We have received a fragment. Process it. * @@ -227,6 +280,127 @@ void GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc, const struct GNUNET_MessageHeader *msg) { + struct MessageContext *mc; + const struct FragmentHeader *fh; + uint16_t msize; + uint16_t foff; + uint32_t fid; + char *mbuf; + unsigned int bit; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Relative delay; + + if (ntohs(msg->size) < sizeof (struct FragmentHeader)) + { + GNUNET_break_op (0); + return; + } + if (ntohs (msg->size) > dc->mtu) + { + GNUNET_break_op (0); + return; + } + fh = (const struct FragmentHeader*) msg; + msize = ntohs (fh->total_size); + fid = ntohl (fh->fragment_id); + foff = ntohl (fh->offset); + if (foff >= msize) + { + GNUNET_break_op (0); + return; + } + GNUNET_STATISTICS_update (dc->stats, + _("Fragments received"), + 1, + GNUNET_NO); + mc = dc->head; + while ( (NULL != mc) && + (fid != mc->fragment_id) ) + mc = mc->next; + bit = foff / dc->mtu; + if (bit * dc->mtu + ntohs (msg->size) + - sizeof (struct FragmentHeader) > msize) + { + /* payload extends past total message size */ + GNUNET_break_op (0); + return; + } + if ( (NULL != mc) && (msize != mc->total_size) ) + { + /* inconsistent message size */ + GNUNET_break_op (0); + return; + } + now = GNUNET_TIME_absolute_get (); + if (NULL == mc) + { + mc = GNUNET_malloc (sizeof (struct MessageContext) + msize); + mc->msg = (const struct GNUNET_MessageHeader*) &mc[1]; + mc->dc = dc; + mc->total_size = msize; + mc->fragment_id = fid; + mc->last_update = now; + mc->bits = (msize + dc->mtu - 1) / (dc->mtu - sizeof (struct FragmentHeader)); + GNUNET_CONTAINER_DLL_insert (dc->head, + dc->tail, + mc); + dc->list_size++; + if (dc->list_size > dc->num_msgs) + { + /* FIXME: discard oldest entry... */ + } + } + + /* copy data to 'mc' */ + if (0 != (mc->bits & (1 << bit))) + { + mc->bits -= 1 << bit; + mbuf = (char* )&mc[1]; + memcpy (&mbuf[bit * dc->mtu], + &fh[1], + ntohs (msg->size) - sizeof (struct FragmentHeader)); + mc->last_update = now; + mc->frag_times[mc->frag_times_write_offset].time = now; + mc->frag_times[mc->frag_times_write_offset].bit = bit; + mc->frag_times_write_offset++; + if (0 == mc->bits) + { + /* message complete, notify! */ + dc->proc (dc->cls, + mc->msg); + GNUNET_STATISTICS_update (dc->stats, + _("Messages defragmented"), + 1, + GNUNET_NO); + } + } + else + { + GNUNET_STATISTICS_update (dc->stats, + _("Duplicate fragments received"), + 1, + GNUNET_NO); + } + + /* FIXME: update ACK timer (if 0==mc->bits, always ACK now!) */ + delay = GNUNET_TIME_UNIT_SECONDS; /* FIXME: bad! */ + if (mc->frag_times_write_offset == 1) + { + /* FIXME: use number-of-fragments * dc->delay */ + } + else + { + /* FIXME: use best-fit regression */ + } + /* FIXME: update dc->latency! */ + + if (0 == mc->bits) + delay = GNUNET_TIME_UNIT_ZERO; + if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task) + GNUNET_SCHEDULER_cancel (mc->ack_task); + mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay, + &send_ack, + mc); } /* end of defragmentation_new.c */ diff --git a/src/include/gnunet_fragmentation_lib.h b/src/include/gnunet_fragmentation_lib.h index b2ba064f8..5fbf415f3 100644 --- a/src/include/gnunet_fragmentation_lib.h +++ b/src/include/gnunet_fragmentation_lib.h @@ -112,7 +112,7 @@ GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc); /** - * Defragmentation context. + * Defragmentation context (one per connection). */ struct GNUNET_DEFRAGMENT_Context; @@ -121,6 +121,9 @@ struct GNUNET_DEFRAGMENT_Context; * Create a defragmentation context. * * @param stats statistics context + * @param mtu the maximum message size for each fragment + * @param num_msgs how many fragmented messages + * to we defragment at most at the same time? * @param cls closure for proc and ackp * @param proc function to call with defragmented messages * @param ackp function to call with acknowledgements (to send @@ -129,6 +132,8 @@ struct GNUNET_DEFRAGMENT_Context; */ struct GNUNET_DEFRAGMENT_Context * GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats, + uint16_t mtu, + unsigned int num_msgs, void *cls, GNUNET_FRAGMENT_MessageProcessor proc, GNUNET_FRAGMENT_MessageProcessor ackp); diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 960ec60ab..2800e5b92 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -102,6 +102,12 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_FRAGMENT 18 +/** + * Acknowledgement of a FRAGMENT of a larger message. + * Managed by libgnunetfragment. + */ +#define GNUNET_MESSAGE_TYPE_FRAGMENT_ACK 19 + /** * Message from the core saying that the transport * server should start giving it messages. This