lib_LTLIBRARIES = libgnunetfragmentation.la
libgnunetfragmentation_la_SOURCES = \
- fragmentation_new.c \
- defragmentation_new.c
+ fragmentation.c \
+ defragmentation.c
libgnunetfragmentation_la_LIBADD = \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
$(top_builddir)/src/util/libgnunetutil.la
--- /dev/null
+/*
+ This file is part of GNUnet
+ (C) 2009, 2011 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+/**
+ * @file src/fragmentation/defragmentation_new.c
+ * @brief library to help defragment messages
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_fragmentation_lib.h"
+#include "fragmentation.h"
+
+/**
+ * Timestamps for fragments.
+ */
+struct FragTimes
+{
+ /**
+ * The time the fragment was received.
+ */
+ struct GNUNET_TIME_Absolute time;
+
+ /**
+ * Number of the bit for the fragment (in [0,..,63]).
+ */
+ unsigned int bit;
+};
+
+
+/**
+ * Information we keep for one message that is being assembled. Note
+ * that we keep the context around even after the assembly is done to
+ * handle 'stray' messages that are received 'late'. A message
+ * context is ONLY discarded when the queue gets too big.
+ */
+struct MessageContext
+{
+ /**
+ * This is a DLL.
+ */
+ struct MessageContext *next;
+
+ /**
+ * This is a DLL.
+ */
+ struct MessageContext *prev;
+
+ /**
+ * Associated defragmentation context.
+ */
+ struct GNUNET_DEFRAGMENT_Context *dc;
+
+ /**
+ * Pointer to the assembled message, allocated at the
+ * end of this struct.
+ */
+ const struct GNUNET_MessageHeader *msg;
+
+ /**
+ * Last time we received any update for this message
+ * (least-recently updated message will be discarded
+ * if we hit the queue size).
+ */
+ struct GNUNET_TIME_Absolute last_update;
+
+ /**
+ * Task scheduled for transmitting the next ACK to the
+ * other peer.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier ack_task;
+
+ /**
+ * When did we receive which fragment? Used to calculate
+ * the time we should send the ACK.
+ */
+ struct FragTimes frag_times[64];
+
+ /**
+ * Which fragments have we gotten yet? bits that are 1
+ * indicate missing fragments.
+ */
+ uint64_t bits;
+
+ /**
+ * Unique ID for this message.
+ */
+ uint32_t fragment_id;
+
+ /**
+ * Which 'bit' did the last fragment we received correspond to?
+ */
+ unsigned int last_bit;
+
+ /**
+ * For the current ACK round, which is the first relevant
+ * offset in 'frag_times'?
+ */
+ unsigned int frag_times_start_offset;
+
+ /**
+ * Which offset whould we write the next frag value into
+ * in the 'frag_times' array? All smaller entries are valid.
+ */
+ unsigned int frag_times_write_offset;
+
+ /**
+ * Total size of the message that we are assembling.
+ */
+ uint16_t total_size;
+
+};
+
+
+/**
+ * Defragmentation context (one per connection).
+ */
+struct GNUNET_DEFRAGMENT_Context
+{
+
+ /**
+ * For statistics.
+ */
+ struct GNUNET_STATISTICS_Handle *stats;
+
+ /**
+ * Head of list of messages we're defragmenting.
+ */
+ struct MessageContext *head;
+
+ /**
+ * Tail of list of messages we're defragmenting.
+ */
+ struct MessageContext *tail;
+
+ /**
+ * Closure for 'proc' and 'ackp'.
+ */
+ void *cls;
+
+ /**
+ * Function to call with defragmented messages.
+ */
+ GNUNET_FRAGMENT_MessageProcessor proc;
+
+ /**
+ * Function to call with acknowledgements.
+ */
+ GNUNET_DEFRAGMENT_AckProcessor ackp;
+
+ /**
+ * Running average of the latency (delay between messages) for this
+ * connection.
+ */
+ struct GNUNET_TIME_Relative latency;
+
+ /**
+ * num_msgs how many fragmented messages
+ * to we defragment at most at the same time?
+ */
+ unsigned int num_msgs;
+
+ /**
+ * Current number of messages in the 'struct MessageContext'
+ * DLL (smaller or equal to 'num_msgs').
+ */
+ unsigned int list_size;
+
+ /**
+ * Maximum message size for each fragment.
+ */
+ uint16_t mtu;
+};
+
+
+/**
+ * Create a defragmentation context.
+ *
+ * @param stats statistics context
+ * @param mtu the maximum message size for each fragment
+ * @param num_msgs how many fragmented messages
+ * to we defragment at most at the same time?
+ * @param cls closure for proc and ackp
+ * @param proc function to call with defragmented messages
+ * @param ackp function to call with acknowledgements (to send
+ * back to the other side)
+ * @return the defragmentation context
+ */
+struct GNUNET_DEFRAGMENT_Context *
+GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
+ uint16_t mtu,
+ unsigned int num_msgs,
+ void *cls,
+ GNUNET_FRAGMENT_MessageProcessor proc,
+ GNUNET_DEFRAGMENT_AckProcessor ackp)
+{
+ struct GNUNET_DEFRAGMENT_Context *dc;
+
+ dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context));
+ dc->stats = stats;
+ dc->cls = cls;
+ dc->proc = proc;
+ dc->ackp = ackp;
+ dc->num_msgs = num_msgs;
+ dc->mtu = mtu;
+ dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
+ return dc;
+}
+
+
+/**
+ * Destroy the given defragmentation context.
+ *
+ * @param dc defragmentation context
+ */
+void
+GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
+{
+ struct MessageContext *mc;
+
+ while (NULL != (mc = dc->head))
+ {
+ GNUNET_CONTAINER_DLL_remove (dc->head,
+ dc->tail,
+ mc);
+ dc->list_size--;
+ if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+ {
+ GNUNET_SCHEDULER_cancel (mc->ack_task);
+ mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_free (mc);
+ }
+ GNUNET_assert (0 == dc->list_size);
+ GNUNET_free (dc);
+}
+
+
+/**
+ * Send acknowledgement to the other peer now.
+ *
+ * @param cls the message context
+ * @param tc the scheduler context
+ */
+static void
+send_ack (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct MessageContext *mc = cls;
+ struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
+ struct FragmentAcknowledgement fa;
+
+ mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+ fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
+ fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
+ fa.fragment_id = htonl (mc->fragment_id);
+ fa.bits = GNUNET_htonll (mc->bits);
+ dc->ackp (dc->cls, mc->fragment_id, &fa.header);
+}
+
+
+/**
+ * This function is from the GNU Scientific Library, linear/fit.c,
+ * (C) 2000 Brian Gough
+ */
+static void
+gsl_fit_mul (const double *x, const size_t xstride,
+ const double *y, const size_t ystride,
+ const size_t n,
+ double *c1, double *cov_11, double *sumsq)
+{
+ double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
+
+ size_t i;
+
+ for (i = 0; i < n; i++)
+ {
+ m_x += (x[i * xstride] - m_x) / (i + 1.0);
+ m_y += (y[i * ystride] - m_y) / (i + 1.0);
+ }
+
+ for (i = 0; i < n; i++)
+ {
+ const double dx = x[i * xstride] - m_x;
+ const double dy = y[i * ystride] - m_y;
+
+ m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
+ m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
+ }
+
+ /* In terms of y = b x */
+
+ {
+ double s2 = 0, d2 = 0;
+ double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
+
+ *c1 = b;
+
+ /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
+
+ for (i = 0; i < n; i++)
+ {
+ const double dx = x[i * xstride] - m_x;
+ const double dy = y[i * ystride] - m_y;
+ const double d = (m_y - b * m_x) + dy - b * dx;
+ d2 += d * d;
+ }
+
+ s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
+
+ *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
+
+ *sumsq = d2;
+ }
+}
+
+
+/**
+ * Estimate the latency between messages based on the most recent
+ * message time stamps.
+ *
+ * @param mc context with time stamps
+ * @return average delay between time stamps (based on least-squares fit)
+ */
+static struct GNUNET_TIME_Relative
+estimate_latency (struct MessageContext *mc)
+{
+ struct FragTimes *first;
+ size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
+ double x[total];
+ double y[total];
+ size_t i;
+ double c1;
+ double cov11;
+ double sumsq;
+ struct GNUNET_TIME_Relative ret;
+
+ first = &mc->frag_times[mc->frag_times_start_offset];
+ GNUNET_assert (total > 1);
+ for (i=0;i<total;i++)
+ {
+ x[i] = (double) i;
+ y[i] = (double) (first[i].time.abs_value - first[0].time.abs_value);
+ }
+ gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
+ ret.rel_value = (uint64_t) c1;
+ return ret;
+};
+
+
+/**
+ * Discard the message context that was inactive for the longest time.
+ *
+ * @param dc defragmentation context
+ */
+static void
+discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
+{
+ struct MessageContext *old;
+ struct MessageContext *pos;
+
+ old = NULL;
+ pos = dc->head;
+ while (NULL != pos)
+ {
+ if ( (old == NULL) ||
+ (old->last_update.abs_value > pos->last_update.abs_value) )
+ old = pos;
+ pos = pos->next;
+ }
+ GNUNET_assert (NULL != old);
+ GNUNET_CONTAINER_DLL_remove (dc->head,
+ dc->tail,
+ old);
+ dc->list_size--;
+ if (GNUNET_SCHEDULER_NO_TASK != old->ack_task)
+ GNUNET_SCHEDULER_cancel (old->ack_task);
+ GNUNET_free (old);
+ fprintf (stderr, "D");
+}
+
+
+/**
+ * We have received a fragment. Process it.
+ *
+ * @param dc the context
+ * @param msg the message that was received
+ * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error
+ */
+int
+GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct MessageContext *mc;
+ const struct FragmentHeader *fh;
+ uint16_t msize;
+ uint16_t foff;
+ uint32_t fid;
+ char *mbuf;
+ unsigned int bit;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Relative delay;
+ unsigned int bc;
+ unsigned int b;
+ unsigned int n;
+ int duplicate;
+
+ if (ntohs(msg->size) < sizeof (struct FragmentHeader))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (ntohs (msg->size) > dc->mtu)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ fh = (const struct FragmentHeader*) msg;
+ msize = ntohs (fh->total_size);
+ fid = ntohl (fh->fragment_id);
+ foff = ntohs (fh->offset);
+ if (foff >= msize)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_STATISTICS_update (dc->stats,
+ _("Fragments received"),
+ 1,
+ GNUNET_NO);
+ mc = dc->head;
+ while ( (NULL != mc) &&
+ (fid != mc->fragment_id) )
+ mc = mc->next;
+ bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
+ if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size)
+ - sizeof (struct FragmentHeader) > msize)
+ {
+ /* payload extends past total message size */
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if ( (NULL != mc) && (msize != mc->total_size) )
+ {
+ /* inconsistent message size */
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ now = GNUNET_TIME_absolute_get ();
+ if (NULL == mc)
+ {
+ mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
+ mc->msg = (const struct GNUNET_MessageHeader*) &mc[1];
+ mc->dc = dc;
+ mc->total_size = msize;
+ mc->fragment_id = fid;
+ mc->last_update = now;
+ n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - sizeof (struct FragmentHeader));
+ if (n == 64)
+ mc->bits = UINT64_MAX; /* set all 64 bit */
+ else
+ mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */
+ GNUNET_CONTAINER_DLL_insert (dc->head,
+ dc->tail,
+ mc);
+ dc->list_size++;
+ if (dc->list_size > dc->num_msgs)
+ discard_oldest_mc (dc);
+ }
+
+ /* copy data to 'mc' */
+ if (0 != (mc->bits & (1LL << bit)))
+ {
+ mc->bits -= 1LL << bit;
+ mbuf = (char* )&mc[1];
+ memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))],
+ &fh[1],
+ ntohs (msg->size) - sizeof (struct FragmentHeader));
+ mc->last_update = now;
+ if (bit < mc->last_bit)
+ mc->frag_times_start_offset = mc->frag_times_write_offset;
+ mc->last_bit = bit;
+ mc->frag_times[mc->frag_times_write_offset].time = now;
+ mc->frag_times[mc->frag_times_write_offset].bit = bit;
+ mc->frag_times_write_offset++;
+ duplicate = GNUNET_NO;
+ }
+ else
+ {
+ duplicate = GNUNET_YES;
+ GNUNET_STATISTICS_update (dc->stats,
+ _("Duplicate fragments received"),
+ 1,
+ GNUNET_NO);
+ }
+
+ /* count number of missing fragments */
+ bc = 0;
+ for (b=0;b<64;b++)
+ if (0 != (mc->bits & (1LL << b))) bc++;
+ if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
+ dc->latency = estimate_latency (mc);
+ delay = GNUNET_TIME_relative_multiply (dc->latency,
+ bc + 1);
+ if ( (0 == mc->bits) || (GNUNET_YES == duplicate) ) /* message complete or duplicate, ACK now! */
+ delay = GNUNET_TIME_UNIT_ZERO;
+ if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+ GNUNET_SCHEDULER_cancel (mc->ack_task);
+ mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &send_ack,
+ mc);
+ if ( (duplicate == GNUNET_NO) &&
+ (0 == mc->bits) )
+ {
+ GNUNET_STATISTICS_update (dc->stats,
+ _("Messages defragmented"),
+ 1,
+ GNUNET_NO);
+ /* message complete, notify! */
+ dc->proc (dc->cls,
+ mc->msg);
+ }
+ if (duplicate == GNUNET_YES)
+ return GNUNET_NO;
+ return GNUNET_YES;
+}
+
+/* end of defragmentation_new.c */
+
+++ /dev/null
-/*
- This file is part of GNUnet
- (C) 2009, 2011 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-/**
- * @file src/fragmentation/defragmentation_new.c
- * @brief library to help defragment messages
- * @author Christian Grothoff
- */
-#include "platform.h"
-#include "gnunet_fragmentation_lib.h"
-#include "fragmentation.h"
-
-/**
- * Timestamps for fragments.
- */
-struct FragTimes
-{
- /**
- * The time the fragment was received.
- */
- struct GNUNET_TIME_Absolute time;
-
- /**
- * Number of the bit for the fragment (in [0,..,63]).
- */
- unsigned int bit;
-};
-
-
-/**
- * Information we keep for one message that is being assembled. Note
- * that we keep the context around even after the assembly is done to
- * handle 'stray' messages that are received 'late'. A message
- * context is ONLY discarded when the queue gets too big.
- */
-struct MessageContext
-{
- /**
- * This is a DLL.
- */
- struct MessageContext *next;
-
- /**
- * This is a DLL.
- */
- struct MessageContext *prev;
-
- /**
- * Associated defragmentation context.
- */
- struct GNUNET_DEFRAGMENT_Context *dc;
-
- /**
- * Pointer to the assembled message, allocated at the
- * end of this struct.
- */
- const struct GNUNET_MessageHeader *msg;
-
- /**
- * Last time we received any update for this message
- * (least-recently updated message will be discarded
- * if we hit the queue size).
- */
- struct GNUNET_TIME_Absolute last_update;
-
- /**
- * Task scheduled for transmitting the next ACK to the
- * other peer.
- */
- GNUNET_SCHEDULER_TaskIdentifier ack_task;
-
- /**
- * When did we receive which fragment? Used to calculate
- * the time we should send the ACK.
- */
- struct FragTimes frag_times[64];
-
- /**
- * Which fragments have we gotten yet? bits that are 1
- * indicate missing fragments.
- */
- uint64_t bits;
-
- /**
- * Unique ID for this message.
- */
- uint32_t fragment_id;
-
- /**
- * Which 'bit' did the last fragment we received correspond to?
- */
- unsigned int last_bit;
-
- /**
- * For the current ACK round, which is the first relevant
- * offset in 'frag_times'?
- */
- unsigned int frag_times_start_offset;
-
- /**
- * Which offset whould we write the next frag value into
- * in the 'frag_times' array? All smaller entries are valid.
- */
- unsigned int frag_times_write_offset;
-
- /**
- * Total size of the message that we are assembling.
- */
- uint16_t total_size;
-
-};
-
-
-/**
- * Defragmentation context (one per connection).
- */
-struct GNUNET_DEFRAGMENT_Context
-{
-
- /**
- * For statistics.
- */
- struct GNUNET_STATISTICS_Handle *stats;
-
- /**
- * Head of list of messages we're defragmenting.
- */
- struct MessageContext *head;
-
- /**
- * Tail of list of messages we're defragmenting.
- */
- struct MessageContext *tail;
-
- /**
- * Closure for 'proc' and 'ackp'.
- */
- void *cls;
-
- /**
- * Function to call with defragmented messages.
- */
- GNUNET_FRAGMENT_MessageProcessor proc;
-
- /**
- * Function to call with acknowledgements.
- */
- GNUNET_DEFRAGMENT_AckProcessor ackp;
-
- /**
- * Running average of the latency (delay between messages) for this
- * connection.
- */
- struct GNUNET_TIME_Relative latency;
-
- /**
- * num_msgs how many fragmented messages
- * to we defragment at most at the same time?
- */
- unsigned int num_msgs;
-
- /**
- * Current number of messages in the 'struct MessageContext'
- * DLL (smaller or equal to 'num_msgs').
- */
- unsigned int list_size;
-
- /**
- * Maximum message size for each fragment.
- */
- uint16_t mtu;
-};
-
-
-/**
- * Create a defragmentation context.
- *
- * @param stats statistics context
- * @param mtu the maximum message size for each fragment
- * @param num_msgs how many fragmented messages
- * to we defragment at most at the same time?
- * @param cls closure for proc and ackp
- * @param proc function to call with defragmented messages
- * @param ackp function to call with acknowledgements (to send
- * back to the other side)
- * @return the defragmentation context
- */
-struct GNUNET_DEFRAGMENT_Context *
-GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
- uint16_t mtu,
- unsigned int num_msgs,
- void *cls,
- GNUNET_FRAGMENT_MessageProcessor proc,
- GNUNET_DEFRAGMENT_AckProcessor ackp)
-{
- struct GNUNET_DEFRAGMENT_Context *dc;
-
- dc = GNUNET_malloc (sizeof (struct GNUNET_DEFRAGMENT_Context));
- dc->stats = stats;
- dc->cls = cls;
- dc->proc = proc;
- dc->ackp = ackp;
- dc->num_msgs = num_msgs;
- dc->mtu = mtu;
- dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
- return dc;
-}
-
-
-/**
- * Destroy the given defragmentation context.
- *
- * @param dc defragmentation context
- */
-void
-GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
-{
- struct MessageContext *mc;
-
- while (NULL != (mc = dc->head))
- {
- GNUNET_CONTAINER_DLL_remove (dc->head,
- dc->tail,
- mc);
- dc->list_size--;
- if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
- {
- GNUNET_SCHEDULER_cancel (mc->ack_task);
- mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
- }
- GNUNET_free (mc);
- }
- GNUNET_assert (0 == dc->list_size);
- GNUNET_free (dc);
-}
-
-
-/**
- * Send acknowledgement to the other peer now.
- *
- * @param cls the message context
- * @param tc the scheduler context
- */
-static void
-send_ack (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct MessageContext *mc = cls;
- struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
- struct FragmentAcknowledgement fa;
-
- mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
- fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
- fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
- fa.fragment_id = htonl (mc->fragment_id);
- fa.bits = GNUNET_htonll (mc->bits);
- dc->ackp (dc->cls, mc->fragment_id, &fa.header);
-}
-
-
-/**
- * This function is from the GNU Scientific Library, linear/fit.c,
- * (C) 2000 Brian Gough
- */
-static void
-gsl_fit_mul (const double *x, const size_t xstride,
- const double *y, const size_t ystride,
- const size_t n,
- double *c1, double *cov_11, double *sumsq)
-{
- double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
-
- size_t i;
-
- for (i = 0; i < n; i++)
- {
- m_x += (x[i * xstride] - m_x) / (i + 1.0);
- m_y += (y[i * ystride] - m_y) / (i + 1.0);
- }
-
- for (i = 0; i < n; i++)
- {
- const double dx = x[i * xstride] - m_x;
- const double dy = y[i * ystride] - m_y;
-
- m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
- m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
- }
-
- /* In terms of y = b x */
-
- {
- double s2 = 0, d2 = 0;
- double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
-
- *c1 = b;
-
- /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
-
- for (i = 0; i < n; i++)
- {
- const double dx = x[i * xstride] - m_x;
- const double dy = y[i * ystride] - m_y;
- const double d = (m_y - b * m_x) + dy - b * dx;
- d2 += d * d;
- }
-
- s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
-
- *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
-
- *sumsq = d2;
- }
-}
-
-
-/**
- * Estimate the latency between messages based on the most recent
- * message time stamps.
- *
- * @param mc context with time stamps
- * @return average delay between time stamps (based on least-squares fit)
- */
-static struct GNUNET_TIME_Relative
-estimate_latency (struct MessageContext *mc)
-{
- struct FragTimes *first;
- size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
- double x[total];
- double y[total];
- size_t i;
- double c1;
- double cov11;
- double sumsq;
- struct GNUNET_TIME_Relative ret;
-
- first = &mc->frag_times[mc->frag_times_start_offset];
- GNUNET_assert (total > 1);
- for (i=0;i<total;i++)
- {
- x[i] = (double) i;
- y[i] = (double) (first[i].time.abs_value - first[0].time.abs_value);
- }
- gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
- ret.rel_value = (uint64_t) c1;
- return ret;
-};
-
-
-/**
- * Discard the message context that was inactive for the longest time.
- *
- * @param dc defragmentation context
- */
-static void
-discard_oldest_mc (struct GNUNET_DEFRAGMENT_Context *dc)
-{
- struct MessageContext *old;
- struct MessageContext *pos;
-
- old = NULL;
- pos = dc->head;
- while (NULL != pos)
- {
- if ( (old == NULL) ||
- (old->last_update.abs_value > pos->last_update.abs_value) )
- old = pos;
- pos = pos->next;
- }
- GNUNET_assert (NULL != old);
- GNUNET_CONTAINER_DLL_remove (dc->head,
- dc->tail,
- old);
- dc->list_size--;
- if (GNUNET_SCHEDULER_NO_TASK != old->ack_task)
- GNUNET_SCHEDULER_cancel (old->ack_task);
- GNUNET_free (old);
- fprintf (stderr, "D");
-}
-
-
-/**
- * We have received a fragment. Process it.
- *
- * @param dc the context
- * @param msg the message that was received
- * @return GNUNET_OK on success, GNUNET_NO if this was a duplicate, GNUNET_SYSERR on error
- */
-int
-GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
- const struct GNUNET_MessageHeader *msg)
-{
- struct MessageContext *mc;
- const struct FragmentHeader *fh;
- uint16_t msize;
- uint16_t foff;
- uint32_t fid;
- char *mbuf;
- unsigned int bit;
- struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Relative delay;
- unsigned int bc;
- unsigned int b;
- unsigned int n;
- int duplicate;
-
- if (ntohs(msg->size) < sizeof (struct FragmentHeader))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (ntohs (msg->size) > dc->mtu)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- fh = (const struct FragmentHeader*) msg;
- msize = ntohs (fh->total_size);
- fid = ntohl (fh->fragment_id);
- foff = ntohs (fh->offset);
- if (foff >= msize)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if (0 != (foff % (dc->mtu - sizeof (struct FragmentHeader))))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- GNUNET_STATISTICS_update (dc->stats,
- _("Fragments received"),
- 1,
- GNUNET_NO);
- mc = dc->head;
- while ( (NULL != mc) &&
- (fid != mc->fragment_id) )
- mc = mc->next;
- bit = foff / (dc->mtu - sizeof (struct FragmentHeader));
- if (bit * (dc->mtu - sizeof (struct FragmentHeader)) + ntohs (msg->size)
- - sizeof (struct FragmentHeader) > msize)
- {
- /* payload extends past total message size */
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- if ( (NULL != mc) && (msize != mc->total_size) )
- {
- /* inconsistent message size */
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- now = GNUNET_TIME_absolute_get ();
- if (NULL == mc)
- {
- mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
- mc->msg = (const struct GNUNET_MessageHeader*) &mc[1];
- mc->dc = dc;
- mc->total_size = msize;
- mc->fragment_id = fid;
- mc->last_update = now;
- n = (msize + dc->mtu - sizeof (struct FragmentHeader) - 1) / (dc->mtu - sizeof (struct FragmentHeader));
- if (n == 64)
- mc->bits = UINT64_MAX; /* set all 64 bit */
- else
- mc->bits = (1LL << n) - 1; /* set lowest 'bits' bit */
- GNUNET_CONTAINER_DLL_insert (dc->head,
- dc->tail,
- mc);
- dc->list_size++;
- if (dc->list_size > dc->num_msgs)
- discard_oldest_mc (dc);
- }
-
- /* copy data to 'mc' */
- if (0 != (mc->bits & (1LL << bit)))
- {
- mc->bits -= 1LL << bit;
- mbuf = (char* )&mc[1];
- memcpy (&mbuf[bit * (dc->mtu - sizeof (struct FragmentHeader))],
- &fh[1],
- ntohs (msg->size) - sizeof (struct FragmentHeader));
- mc->last_update = now;
- if (bit < mc->last_bit)
- mc->frag_times_start_offset = mc->frag_times_write_offset;
- mc->last_bit = bit;
- mc->frag_times[mc->frag_times_write_offset].time = now;
- mc->frag_times[mc->frag_times_write_offset].bit = bit;
- mc->frag_times_write_offset++;
- duplicate = GNUNET_NO;
- }
- else
- {
- duplicate = GNUNET_YES;
- GNUNET_STATISTICS_update (dc->stats,
- _("Duplicate fragments received"),
- 1,
- GNUNET_NO);
- }
-
- /* count number of missing fragments */
- bc = 0;
- for (b=0;b<64;b++)
- if (0 != (mc->bits & (1LL << b))) bc++;
- if (mc->frag_times_write_offset - mc->frag_times_start_offset > 1)
- dc->latency = estimate_latency (mc);
- delay = GNUNET_TIME_relative_multiply (dc->latency,
- bc + 1);
- if ( (0 == mc->bits) || (GNUNET_YES == duplicate) ) /* message complete or duplicate, ACK now! */
- delay = GNUNET_TIME_UNIT_ZERO;
- if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
- GNUNET_SCHEDULER_cancel (mc->ack_task);
- mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
- &send_ack,
- mc);
- if ( (duplicate == GNUNET_NO) &&
- (0 == mc->bits) )
- {
- GNUNET_STATISTICS_update (dc->stats,
- _("Messages defragmented"),
- 1,
- GNUNET_NO);
- /* message complete, notify! */
- dc->proc (dc->cls,
- mc->msg);
- }
- if (duplicate == GNUNET_YES)
- return GNUNET_NO;
- return GNUNET_YES;
-}
-
-/* end of defragmentation_new.c */
-
--- /dev/null
+/*
+ This file is part of GNUnet
+ (C) 2009, 2011 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+/**
+ * @file src/fragmentation/fragmentation_new.c
+ * @brief library to help fragment messages
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_fragmentation_lib.h"
+#include "gnunet_protocols.h"
+#include "fragmentation.h"
+
+
+/**
+ * Fragmentation context.
+ */
+struct GNUNET_FRAGMENT_Context
+{
+ /**
+ * Statistics to use.
+ */
+ struct GNUNET_STATISTICS_Handle *stats;
+
+ /**
+ * Tracker for flow control.
+ */
+ struct GNUNET_BANDWIDTH_Tracker *tracker;
+
+ /**
+ * Current expected delay for ACKs.
+ */
+ struct GNUNET_TIME_Relative delay;
+
+ /**
+ * Time we transmitted the last message of the last round.
+ */
+ struct GNUNET_TIME_Absolute last_round;
+
+ /**
+ * Message to fragment (allocated at the end of this struct).
+ */
+ const struct GNUNET_MessageHeader *msg;
+
+ /**
+ * Function to call for transmissions.
+ */
+ GNUNET_FRAGMENT_MessageProcessor proc;
+
+ /**
+ * Closure for 'proc'.
+ */
+ void *proc_cls;
+
+ /**
+ * Bitfield, set to 1 for each unacknowledged fragment.
+ */
+ uint64_t acks;
+
+ /**
+ * Task performing work for the fragmenter.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier task;
+
+ /**
+ * Our fragmentation ID. (chosen at random)
+ */
+ uint32_t fragment_id;
+
+ /**
+ * Round-robin selector for the next transmission.
+ */
+ unsigned int next_transmission;
+
+ /**
+ * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
+ */
+ int8_t proc_busy;
+
+ /**
+ * GNUNET_YES if we are waiting for an ACK.
+ */
+ int8_t wack;
+
+ /**
+ * Target fragment size.
+ */
+ uint16_t mtu;
+
+};
+
+
+/**
+ * Transmit the next fragment to the other peer.
+ *
+ * @param cls the 'struct GNUNET_FRAGMENT_Context'
+ * @param tc scheduler context
+ */
+static void
+transmit_next (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_FRAGMENT_Context *fc = cls;
+ char msg[fc->mtu];
+ const char *mbuf;
+ struct FragmentHeader *fh;
+ struct GNUNET_TIME_Relative delay;
+ unsigned int bit;
+ size_t size;
+ size_t fsize;
+ int wrap;
+
+ fc->task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (GNUNET_NO == fc->proc_busy);
+ if (0 == fc->acks)
+ return; /* all done */
+
+ /* calculate delay */
+ wrap = 0;
+ while (0 == (fc->acks & (1LL << fc->next_transmission)))
+ {
+ fc->next_transmission = (fc->next_transmission + 1) % 64;
+ wrap |= (fc->next_transmission == 0);
+ }
+ bit = fc->next_transmission;
+ size = ntohs (fc->msg->size);
+ if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+ fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader);
+ else
+ fsize = fc->mtu;
+ if (fc->tracker != NULL)
+ delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+ fsize);
+ else
+ delay = GNUNET_TIME_UNIT_ZERO;
+ if (delay.rel_value > 0)
+ {
+ fc->task = GNUNET_SCHEDULER_add_delayed (delay,
+ &transmit_next,
+ fc);
+ return;
+ }
+ fc->next_transmission = (fc->next_transmission + 1) % 64;
+ wrap |= (fc->next_transmission == 0);
+
+ /* assemble fragmentation message */
+ mbuf = (const char*) &fc[1];
+ fh = (struct FragmentHeader*) msg;
+ fh->header.size = htons (fsize);
+ fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
+ fh->fragment_id = htonl (fc->fragment_id);
+ fh->total_size = fc->msg->size; /* already in big-endian */
+ fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
+ memcpy (&fh[1],
+ &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
+ fsize - sizeof (struct FragmentHeader));
+ if (NULL != fc->tracker)
+ GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
+ GNUNET_STATISTICS_update (fc->stats,
+ _("Fragments transmitted"),
+ 1, GNUNET_NO);
+ if (0 != fc->last_round.abs_value)
+ GNUNET_STATISTICS_update (fc->stats,
+ _("Fragments retransmitted"),
+ 1, GNUNET_NO);
+
+ /* select next message to calculate delay */
+ bit = fc->next_transmission;
+ size = ntohs (fc->msg->size);
+ if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+ fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
+ else
+ fsize = fc->mtu;
+ if (NULL != fc->tracker)
+ delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+ fsize);
+ else
+ delay = GNUNET_TIME_UNIT_ZERO;
+ if (wrap)
+ {
+ /* full round transmitted wait 2x delay for ACK before going again */
+ delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
+ fc->delay);
+ fc->last_round = GNUNET_TIME_absolute_get ();
+ fc->wack = GNUNET_YES;
+ }
+ fc->proc_busy = GNUNET_YES;
+ fc->proc (fc->proc_cls, &fh->header);
+}
+
+
+/**
+ * Create a fragmentation context for the given message.
+ * Fragments the message into fragments of size "mtu" or
+ * less. Calls 'proc' on each un-acknowledged fragment,
+ * using both the expected 'delay' between messages and
+ * acknowledgements and the given 'tracker' to guide the
+ * frequency of calls to 'proc'.
+ *
+ * @param stats statistics context
+ * @param mtu the maximum message size for each fragment
+ * @param tracker bandwidth tracker to use for flow control (can be NULL)
+ * @param delay expected delay between fragment transmission
+ * and ACK based on previous messages
+ * @param msg the message to fragment
+ * @param proc function to call for each fragment to transmit
+ * @param proc_cls closure for proc
+ * @return the fragmentation context
+ */
+struct GNUNET_FRAGMENT_Context *
+GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
+ uint16_t mtu,
+ struct GNUNET_BANDWIDTH_Tracker *tracker,
+ struct GNUNET_TIME_Relative delay,
+ const struct GNUNET_MessageHeader *msg,
+ GNUNET_FRAGMENT_MessageProcessor proc,
+ void *proc_cls)
+{
+ struct GNUNET_FRAGMENT_Context *fc;
+ size_t size;
+ uint64_t bits;
+
+ GNUNET_STATISTICS_update (stats,
+ _("Messages fragmented"),
+ 1, GNUNET_NO);
+ GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
+ size = ntohs (msg->size);
+ GNUNET_STATISTICS_update (stats,
+ _("Total size of fragmented messages"),
+ size, GNUNET_NO);
+ GNUNET_assert (size > mtu);
+ fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
+ fc->stats = stats;
+ fc->mtu = mtu;
+ fc->tracker = tracker;
+ fc->delay = delay;
+ fc->msg = (const struct GNUNET_MessageHeader*)&fc[1];
+ fc->proc = proc;
+ fc->proc_cls = proc_cls;
+ fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
+ memcpy (&fc[1], msg, size);
+ bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader));
+ GNUNET_assert (bits <= 64);
+ if (bits == 64)
+ fc->acks = UINT64_MAX; /* set all 64 bit */
+ else
+ fc->acks = (1LL << bits) - 1; /* set lowest 'bits' bit */
+ fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
+ fc);
+ return fc;
+}
+
+
+/**
+ * Continuation to call from the 'proc' function after the fragment
+ * has been transmitted (and hence the next fragment can now be
+ * given to proc).
+ *
+ * @param fc fragmentation context
+ */
+void
+GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
+{
+ GNUNET_assert (fc->proc_busy == GNUNET_YES);
+ fc->proc_busy = GNUNET_NO;
+ GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
+ fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
+ fc);
+}
+
+
+/**
+ * Process an acknowledgement message we got from the other
+ * side (to control re-transmits).
+ *
+ * @param fc fragmentation context
+ * @param msg acknowledgement message we received
+ * @return GNUNET_OK if this ack completes the work of the 'fc'
+ * (all fragments have been received);
+ * GNUNET_NO if more messages are pending
+ * GNUNET_SYSERR if this ack is not valid for this fc
+ */
+int
+GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct FragmentAcknowledgement *fa;
+ uint64_t abits;
+ struct GNUNET_TIME_Relative ndelay;
+
+ if (sizeof (struct FragmentAcknowledgement) !=
+ ntohs (msg->size))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ fa = (const struct FragmentAcknowledgement *) msg;
+ if (ntohl (fa->fragment_id) != fc->fragment_id)
+ return GNUNET_SYSERR; /* not our ACK */
+ abits = GNUNET_ntohll (fa->bits);
+ if (GNUNET_YES == fc->wack)
+ {
+ /* normal ACK, can update running average of delay... */
+ fc->wack = GNUNET_NO;
+ ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
+ fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
+ }
+ if (abits != (fc->acks & abits))
+ {
+ /* ID collission or message reordering, count! This should be rare! */
+ GNUNET_STATISTICS_update (fc->stats,
+ _("Bits removed from ACK"),
+ 1, GNUNET_NO);
+ }
+ fc->acks = abits;
+ if (0 != fc->acks)
+ {
+ /* more to transmit, do so right now (if tracker permits...) */
+ GNUNET_SCHEDULER_cancel (fc->task);
+ fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
+ fc);
+ return GNUNET_NO;
+ }
+
+ /* all done */
+ if (fc->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (fc->task);
+ fc->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Destroy the given fragmentation context (stop calling 'proc', free
+ * resources).
+ *
+ * @param fc fragmentation context
+ * @return average delay between transmission and ACK for the
+ * last message, FOREVER if the message was not fully transmitted
+ */
+struct GNUNET_TIME_Relative
+GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
+{
+ struct GNUNET_TIME_Relative ret;
+
+ if (fc->task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (fc->task);
+ ret = fc->delay;
+ GNUNET_free (fc);
+ return ret;
+}
+
+
+/* end of fragmentation_new.c */
+
+++ /dev/null
-/*
- This file is part of GNUnet
- (C) 2009, 2011 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-/**
- * @file src/fragmentation/fragmentation_new.c
- * @brief library to help fragment messages
- * @author Christian Grothoff
- */
-#include "platform.h"
-#include "gnunet_fragmentation_lib.h"
-#include "gnunet_protocols.h"
-#include "fragmentation.h"
-
-
-/**
- * Fragmentation context.
- */
-struct GNUNET_FRAGMENT_Context
-{
- /**
- * Statistics to use.
- */
- struct GNUNET_STATISTICS_Handle *stats;
-
- /**
- * Tracker for flow control.
- */
- struct GNUNET_BANDWIDTH_Tracker *tracker;
-
- /**
- * Current expected delay for ACKs.
- */
- struct GNUNET_TIME_Relative delay;
-
- /**
- * Time we transmitted the last message of the last round.
- */
- struct GNUNET_TIME_Absolute last_round;
-
- /**
- * Message to fragment (allocated at the end of this struct).
- */
- const struct GNUNET_MessageHeader *msg;
-
- /**
- * Function to call for transmissions.
- */
- GNUNET_FRAGMENT_MessageProcessor proc;
-
- /**
- * Closure for 'proc'.
- */
- void *proc_cls;
-
- /**
- * Bitfield, set to 1 for each unacknowledged fragment.
- */
- uint64_t acks;
-
- /**
- * Task performing work for the fragmenter.
- */
- GNUNET_SCHEDULER_TaskIdentifier task;
-
- /**
- * Our fragmentation ID. (chosen at random)
- */
- uint32_t fragment_id;
-
- /**
- * Round-robin selector for the next transmission.
- */
- unsigned int next_transmission;
-
- /**
- * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
- */
- int8_t proc_busy;
-
- /**
- * GNUNET_YES if we are waiting for an ACK.
- */
- int8_t wack;
-
- /**
- * Target fragment size.
- */
- uint16_t mtu;
-
-};
-
-
-/**
- * Transmit the next fragment to the other peer.
- *
- * @param cls the 'struct GNUNET_FRAGMENT_Context'
- * @param tc scheduler context
- */
-static void
-transmit_next (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct GNUNET_FRAGMENT_Context *fc = cls;
- char msg[fc->mtu];
- const char *mbuf;
- struct FragmentHeader *fh;
- struct GNUNET_TIME_Relative delay;
- unsigned int bit;
- size_t size;
- size_t fsize;
- int wrap;
-
- fc->task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_assert (GNUNET_NO == fc->proc_busy);
- if (0 == fc->acks)
- return; /* all done */
-
- /* calculate delay */
- wrap = 0;
- while (0 == (fc->acks & (1LL << fc->next_transmission)))
- {
- fc->next_transmission = (fc->next_transmission + 1) % 64;
- wrap |= (fc->next_transmission == 0);
- }
- bit = fc->next_transmission;
- size = ntohs (fc->msg->size);
- if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
- fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct FragmentHeader);
- else
- fsize = fc->mtu;
- if (fc->tracker != NULL)
- delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
- fsize);
- else
- delay = GNUNET_TIME_UNIT_ZERO;
- if (delay.rel_value > 0)
- {
- fc->task = GNUNET_SCHEDULER_add_delayed (delay,
- &transmit_next,
- fc);
- return;
- }
- fc->next_transmission = (fc->next_transmission + 1) % 64;
- wrap |= (fc->next_transmission == 0);
-
- /* assemble fragmentation message */
- mbuf = (const char*) &fc[1];
- fh = (struct FragmentHeader*) msg;
- fh->header.size = htons (fsize);
- fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
- fh->fragment_id = htonl (fc->fragment_id);
- fh->total_size = fc->msg->size; /* already in big-endian */
- fh->offset = htons ((fc->mtu - sizeof (struct FragmentHeader)) * bit);
- memcpy (&fh[1],
- &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))],
- fsize - sizeof (struct FragmentHeader));
- if (NULL != fc->tracker)
- GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
- GNUNET_STATISTICS_update (fc->stats,
- _("Fragments transmitted"),
- 1, GNUNET_NO);
- if (0 != fc->last_round.abs_value)
- GNUNET_STATISTICS_update (fc->stats,
- _("Fragments retransmitted"),
- 1, GNUNET_NO);
-
- /* select next message to calculate delay */
- bit = fc->next_transmission;
- size = ntohs (fc->msg->size);
- if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
- fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
- else
- fsize = fc->mtu;
- if (NULL != fc->tracker)
- delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
- fsize);
- else
- delay = GNUNET_TIME_UNIT_ZERO;
- if (wrap)
- {
- /* full round transmitted wait 2x delay for ACK before going again */
- delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 2),
- fc->delay);
- fc->last_round = GNUNET_TIME_absolute_get ();
- fc->wack = GNUNET_YES;
- }
- fc->proc_busy = GNUNET_YES;
- fc->proc (fc->proc_cls, &fh->header);
-}
-
-
-/**
- * Create a fragmentation context for the given message.
- * Fragments the message into fragments of size "mtu" or
- * less. Calls 'proc' on each un-acknowledged fragment,
- * using both the expected 'delay' between messages and
- * acknowledgements and the given 'tracker' to guide the
- * frequency of calls to 'proc'.
- *
- * @param stats statistics context
- * @param mtu the maximum message size for each fragment
- * @param tracker bandwidth tracker to use for flow control (can be NULL)
- * @param delay expected delay between fragment transmission
- * and ACK based on previous messages
- * @param msg the message to fragment
- * @param proc function to call for each fragment to transmit
- * @param proc_cls closure for proc
- * @return the fragmentation context
- */
-struct GNUNET_FRAGMENT_Context *
-GNUNET_FRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
- uint16_t mtu,
- struct GNUNET_BANDWIDTH_Tracker *tracker,
- struct GNUNET_TIME_Relative delay,
- const struct GNUNET_MessageHeader *msg,
- GNUNET_FRAGMENT_MessageProcessor proc,
- void *proc_cls)
-{
- struct GNUNET_FRAGMENT_Context *fc;
- size_t size;
- uint64_t bits;
-
- GNUNET_STATISTICS_update (stats,
- _("Messages fragmented"),
- 1, GNUNET_NO);
- GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
- size = ntohs (msg->size);
- GNUNET_STATISTICS_update (stats,
- _("Total size of fragmented messages"),
- size, GNUNET_NO);
- GNUNET_assert (size > mtu);
- fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
- fc->stats = stats;
- fc->mtu = mtu;
- fc->tracker = tracker;
- fc->delay = delay;
- fc->msg = (const struct GNUNET_MessageHeader*)&fc[1];
- fc->proc = proc;
- fc->proc_cls = proc_cls;
- fc->fragment_id = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- UINT32_MAX);
- memcpy (&fc[1], msg, size);
- bits = (size + mtu - sizeof (struct FragmentHeader) - 1) / (mtu - sizeof (struct FragmentHeader));
- GNUNET_assert (bits <= 64);
- if (bits == 64)
- fc->acks = UINT64_MAX; /* set all 64 bit */
- else
- fc->acks = (1LL << bits) - 1; /* set lowest 'bits' bit */
- fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
- fc);
- return fc;
-}
-
-
-/**
- * Continuation to call from the 'proc' function after the fragment
- * has been transmitted (and hence the next fragment can now be
- * given to proc).
- *
- * @param fc fragmentation context
- */
-void
-GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
-{
- GNUNET_assert (fc->proc_busy == GNUNET_YES);
- fc->proc_busy = GNUNET_NO;
- GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
- fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
- fc);
-}
-
-
-/**
- * Process an acknowledgement message we got from the other
- * side (to control re-transmits).
- *
- * @param fc fragmentation context
- * @param msg acknowledgement message we received
- * @return GNUNET_OK if this ack completes the work of the 'fc'
- * (all fragments have been received);
- * GNUNET_NO if more messages are pending
- * GNUNET_SYSERR if this ack is not valid for this fc
- */
-int
-GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
- const struct GNUNET_MessageHeader *msg)
-{
- const struct FragmentAcknowledgement *fa;
- uint64_t abits;
- struct GNUNET_TIME_Relative ndelay;
-
- if (sizeof (struct FragmentAcknowledgement) !=
- ntohs (msg->size))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- fa = (const struct FragmentAcknowledgement *) msg;
- if (ntohl (fa->fragment_id) != fc->fragment_id)
- return GNUNET_SYSERR; /* not our ACK */
- abits = GNUNET_ntohll (fa->bits);
- if (GNUNET_YES == fc->wack)
- {
- /* normal ACK, can update running average of delay... */
- fc->wack = GNUNET_NO;
- ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
- fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
- }
- if (abits != (fc->acks & abits))
- {
- /* ID collission or message reordering, count! This should be rare! */
- GNUNET_STATISTICS_update (fc->stats,
- _("Bits removed from ACK"),
- 1, GNUNET_NO);
- }
- fc->acks = abits;
- if (0 != fc->acks)
- {
- /* more to transmit, do so right now (if tracker permits...) */
- GNUNET_SCHEDULER_cancel (fc->task);
- fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
- fc);
- return GNUNET_NO;
- }
-
- /* all done */
- if (fc->task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (fc->task);
- fc->task = GNUNET_SCHEDULER_NO_TASK;
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Destroy the given fragmentation context (stop calling 'proc', free
- * resources).
- *
- * @param fc fragmentation context
- * @return average delay between transmission and ACK for the
- * last message, FOREVER if the message was not fully transmitted
- */
-struct GNUNET_TIME_Relative
-GNUNET_FRAGMENT_context_destroy (struct GNUNET_FRAGMENT_Context *fc)
-{
- struct GNUNET_TIME_Relative ret;
-
- if (fc->task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (fc->task);
- ret = fc->delay;
- GNUNET_free (fc);
- return ret;
-}
-
-
-/* end of fragmentation_new.c */
-