/**
* The time the fragment was received.
*/
- GNUNET_TIME_Absolute time;
+ struct GNUNET_TIME_Absolute time;
/**
* Number of the bit for the fragment (in [0,..,63]).
/**
- * Information we keep for one message that is being assembled.
+ * Information we keep for one message that is being assembled. Note
+ * that we keep the context around even after the assembly is done to
+ * handle 'stray' messages that are received 'late'. A message
+ * context is ONLY discarded when the queue gets too big.
*/
struct MessageContext
{
*/
struct MessageContext *prev;
+ /**
+ * Associated defragmentation context.
+ */
+ struct GNUNET_DEFRAGMENT_Context *dc;
+
/**
* Pointer to the assembled message, allocated at the
* end of this struct.
* Task scheduled for transmitting the next ACK to the
* other peer.
*/
- struct GNUNET_SCHEDULER_TaskIdentifier ack_task;
+ GNUNET_SCHEDULER_TaskIdentifier ack_task;
/**
* When did we receive which fragment? Used to calculate
*/
struct GNUNET_STATISTICS_Handle *stats;
- /**
- * Closure for 'proc' and 'ackp'.
- */
- void *cls;
-
/**
* Head of list of messages we're defragmenting.
*/
*/
struct MessageContext *tail;
+ /**
+ * Closure for 'proc' and 'ackp'.
+ */
+ void *cls;
+
/**
* Function to call with defragmented messages.
*/
*/
unsigned int list_size;
-
+ /**
+ * Maximum message size for each fragment.
+ */
+ uint16_t mtu;
};
* Create a defragmentation context.
*
* @param stats statistics context
+ * @param mtu the maximum message size for each fragment
* @param num_msgs how many fragmented messages
* to we defragment at most at the same time?
* @param cls closure for proc and ackp
*/
struct GNUNET_DEFRAGMENT_Context *
GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
+ uint16_t mtu,
unsigned int num_msgs,
void *cls,
GNUNET_FRAGMENT_MessageProcessor proc,
dc->proc = proc;
dc->ackp = ackp;
dc->num_msgs = num_msgs;
+ dc->mtu = mtu;
dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
return dc;
}
void
GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
{
+ struct MessageContext *mc;
+
+ while (NULL != (mc = dc->head))
+ {
+ GNUNET_CONTAINER_DLL_remove (dc->head,
+ dc->tail,
+ mc);
+ dc->list_size--;
+ if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+ {
+ GNUNET_SCHEDULER_cancel (mc->ack_task);
+ mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_free (mc);
+ }
+ GNUNET_assert (0 == dc->list_size);
GNUNET_free (dc);
}
+/**
+ * Send acknowledgement to the other peer now.
+ *
+ * @param cls the message context
+ * @param tc the scheduler context
+ */
+static void
+send_ack (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct MessageContext *mc = cls;
+ struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
+ struct FragmentAcknowledgement fa;
+
+ mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+ fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
+ fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
+ fa.fragment_id = htonl (mc->fragment_id);
+ fa.bits = GNUNET_htonll (mc->bits);
+ dc->ackp (dc->cls, &fa.header);
+}
+
+
/**
* We have received a fragment. Process it.
*
GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
const struct GNUNET_MessageHeader *msg)
{
+ struct MessageContext *mc;
+ const struct FragmentHeader *fh;
+ uint16_t msize;
+ uint16_t foff;
+ uint32_t fid;
+ char *mbuf;
+ unsigned int bit;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Relative delay;
+
+ if (ntohs(msg->size) < sizeof (struct FragmentHeader))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (ntohs (msg->size) > dc->mtu)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ fh = (const struct FragmentHeader*) msg;
+ msize = ntohs (fh->total_size);
+ fid = ntohl (fh->fragment_id);
+ foff = ntohl (fh->offset);
+ if (foff >= msize)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ GNUNET_STATISTICS_update (dc->stats,
+ _("Fragments received"),
+ 1,
+ GNUNET_NO);
+ mc = dc->head;
+ while ( (NULL != mc) &&
+ (fid != mc->fragment_id) )
+ mc = mc->next;
+ bit = foff / dc->mtu;
+ if (bit * dc->mtu + ntohs (msg->size)
+ - sizeof (struct FragmentHeader) > msize)
+ {
+ /* payload extends past total message size */
+ GNUNET_break_op (0);
+ return;
+ }
+ if ( (NULL != mc) && (msize != mc->total_size) )
+ {
+ /* inconsistent message size */
+ GNUNET_break_op (0);
+ return;
+ }
+ now = GNUNET_TIME_absolute_get ();
+ if (NULL == mc)
+ {
+ mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
+ mc->msg = (const struct GNUNET_MessageHeader*) &mc[1];
+ mc->dc = dc;
+ mc->total_size = msize;
+ mc->fragment_id = fid;
+ mc->last_update = now;
+ mc->bits = (msize + dc->mtu - 1) / (dc->mtu - sizeof (struct FragmentHeader));
+ GNUNET_CONTAINER_DLL_insert (dc->head,
+ dc->tail,
+ mc);
+ dc->list_size++;
+ if (dc->list_size > dc->num_msgs)
+ {
+ /* FIXME: discard oldest entry... */
+ }
+ }
+
+ /* copy data to 'mc' */
+ if (0 != (mc->bits & (1 << bit)))
+ {
+ mc->bits -= 1 << bit;
+ mbuf = (char* )&mc[1];
+ memcpy (&mbuf[bit * dc->mtu],
+ &fh[1],
+ ntohs (msg->size) - sizeof (struct FragmentHeader));
+ mc->last_update = now;
+ mc->frag_times[mc->frag_times_write_offset].time = now;
+ mc->frag_times[mc->frag_times_write_offset].bit = bit;
+ mc->frag_times_write_offset++;
+ if (0 == mc->bits)
+ {
+ /* message complete, notify! */
+ dc->proc (dc->cls,
+ mc->msg);
+ GNUNET_STATISTICS_update (dc->stats,
+ _("Messages defragmented"),
+ 1,
+ GNUNET_NO);
+ }
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (dc->stats,
+ _("Duplicate fragments received"),
+ 1,
+ GNUNET_NO);
+ }
+
+ /* FIXME: update ACK timer (if 0==mc->bits, always ACK now!) */
+ delay = GNUNET_TIME_UNIT_SECONDS; /* FIXME: bad! */
+ if (mc->frag_times_write_offset == 1)
+ {
+ /* FIXME: use number-of-fragments * dc->delay */
+ }
+ else
+ {
+ /* FIXME: use best-fit regression */
+ }
+ /* FIXME: update dc->latency! */
+
+ if (0 == mc->bits)
+ delay = GNUNET_TIME_UNIT_ZERO;
+ if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+ GNUNET_SCHEDULER_cancel (mc->ack_task);
+ mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &send_ack,
+ mc);
}
/* end of defragmentation_new.c */