/**
* Function to call with acknowledgements.
*/
- GNUNET_FRAGMENT_MessageProcessor ackp;
+ GNUNET_DEFRAGMENT_AckProcessor ackp;
/**
* Running average of the latency (delay between messages) for this
unsigned int num_msgs,
void *cls,
GNUNET_FRAGMENT_MessageProcessor proc,
- GNUNET_FRAGMENT_MessageProcessor ackp)
+ GNUNET_DEFRAGMENT_AckProcessor ackp)
{
struct GNUNET_DEFRAGMENT_Context *dc;
fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
fa.fragment_id = htonl (mc->fragment_id);
fa.bits = GNUNET_htonll (mc->bits);
- dc->ackp (dc->cls, &fa.header);
+ dc->ackp (dc->cls, mc->fragment_id, &fa.header);
}
*/
unsigned int next_transmission;
+ /**
+ * GNUNET_YES if we called 'proc' and are now waiting for 'GNUNET_FRAGMENT_transmission_done'
+ */
+ int8_t proc_busy;
+
/**
* GNUNET_YES if we are waiting for an ACK.
*/
- int wack;
+ int8_t wack;
/**
* Target fragment size.
int wrap;
fc->task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (GNUNET_NO == fc->proc_busy);
if (0 == fc->acks)
return; /* all done */
fc->last_round = GNUNET_TIME_absolute_get ();
fc->wack = GNUNET_YES;
}
- fc->task = GNUNET_SCHEDULER_add_delayed (delay,
- &transmit_next,
- fc);
+ fc->proc_busy = GNUNET_YES;
fc->proc (fc->proc_cls, &fh->header);
}
}
+/**
+ * Continuation to call from the 'proc' function after the fragment
+ * has been transmitted (and hence the next fragment can now be
+ * given to proc).
+ *
+ * @param fc fragmentation context
+ */
+void
+GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc)
+{
+ GNUNET_assert (fc->proc_busy == GNUNET_YES);
+ fc->proc_busy = GNUNET_NO;
+ GNUNET_assert (fc->task == GNUNET_SCHEDULER_NO_TASK);
+ fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
+ fc);
+}
+
+
/**
* Process an acknowledgement message we got from the other
* side (to control re-transmits).
/**
* Simulate dropping of 1 out of how many messages? (must be > 1)
*/
-#define DROPRATE 2
+#define DROPRATE 10
static int ret = 1;
*/
static void
proc_acks (void *cls,
+ uint32_t msg_id,
const struct GNUNET_MessageHeader *hdr)
{
unsigned int i;
proc_frac (void *cls,
const struct GNUNET_MessageHeader *hdr)
{
+ struct GNUNET_FRAGMENT_Context **fc = cls;
int ret;
+ GNUNET_FRAGMENT_context_transmission_done (*fc);
if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, DROPRATE))
{
frag_drops++;
GNUNET_TIME_UNIT_SECONDS,
msg,
&proc_frac,
- NULL);
+ &frags[i]);
}
}
/**
- * Function that is called with messages
- * created by the fragmentation module.
+ * Function that is called with messages created by the fragmentation
+ * module. In the case of the 'proc' callback of the
+ * GNUNET_FRAGMENT_context_create function, this function must
+ * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
*
* @param cls closure
* @param msg the message that was created
void *proc_cls);
+/**
+ * Continuation to call from the 'proc' function after the fragment
+ * has been transmitted (and hence the next fragment can now be
+ * given to proc).
+ *
+ * @param fc fragmentation context
+ */
+void
+GNUNET_FRAGMENT_context_transmission_done (struct GNUNET_FRAGMENT_Context *fc);
+
+
/**
* Process an acknowledgement message we got from the other
* side (to control re-transmits).
struct GNUNET_DEFRAGMENT_Context;
+/**
+ * Function that is called with acknowledgement messages created by
+ * the fragmentation module. Acknowledgements are cummulative,
+ * so it is OK to only transmit the 'latest' ack message for the same
+ * message ID.
+ *
+ * @param cls closure
+ * @param id unique message ID (modulo collisions)
+ * @param msg the message that was created
+ */
+typedef void (*GNUNET_DEFRAGMENT_AckProcessor) (void *cls,
+ uint32_t id,
+ const struct GNUNET_MessageHeader *msg);
+
+
/**
* Create a defragmentation context.
*
unsigned int num_msgs,
void *cls,
GNUNET_FRAGMENT_MessageProcessor proc,
- GNUNET_FRAGMENT_MessageProcessor ackp);
+ GNUNET_DEFRAGMENT_AckProcessor ackp);
/**