*
* Implement next:
* - DV data structures:
- * + initiation of DV learn (incl. RTT measurement logic!)
- * - security considerations? add signatures to routes? initiator signature?
* + using DV routes!
* - handling of DV-boxed messages that need to be forwarded
* - route_message implementation, including using DV data structures
#define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
/**
- * We only consider queues as "quality" connections when
- * suppressing the generation of DV initiation messages if
+ * We only consider queues as "quality" connections when
+ * suppressing the generation of DV initiation messages if
* the latency of the queue is below this threshold.
*/
#define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
* do we need to have to suppress initiating DV learn messages?
*/
#define DV_LEARN_QUALITY_THRESHOLD 100
-
+
/**
* When do we forget an invalid address for sure?
*/
/**
- * When did we launch this DV learning activity?
+ * When did we launch this DV learning activity?
*/
struct LearnLaunchEntry
{
}
+/**
+ * Check the @a fa against the fragments associated with @a pm.
+ * If it matches, remove the matching fragments from the transmission
+ * list.
+ *
+ * @param pm pending message to check against the ack
+ * @param fa the ack that was received
+ * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
+ */
+static int
+check_ack_against_pm (struct PendingMessage *pm,
+ const struct TransportFragmentAckMessage *fa)
+{
+ int match;
+ struct PendingMessage *nxt;
+ uint32_t fs = ntohl (fa->frag_uuid);
+ uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
+
+ match = GNUNET_NO;
+ for (struct PendingMessage *frag = pm->head_frag;
+ NULL != frag;
+ frag = nxt)
+ {
+ const struct TransportFragmentBox *tfb
+ = (const struct TransportFragmentBox *) &pm[1];
+ uint32_t fu = ntohl (tfb->frag_uuid);
+
+ GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
+ nxt = frag->next_frag;
+ /* Check for exact match or match in the 'xtra' bitmask */
+ if ( (fu == fs) ||
+ ( (fu > fs) &&
+ (fu <= fs + 64) &&
+ (0 != (1LLU << (fu - fs - 1) & xtra)) ) )
+ {
+ match = GNUNET_YES;
+ free_fragment_tree (frag);
+ }
+ }
+ return match;
+}
+
+
/**
* Communicator gave us a fragment acknowledgement. Process the request.
*
const struct TransportFragmentAckMessage *fa)
{
struct CommunicatorMessageContext *cmc = cls;
+ struct Neighbour *n;
+ int matched;
- // FIXME: do work: identify original message; then identify fragments being acked;
- // remove those from the tree to prevent retransmission;
- // compute RTT
- // if entire message is ACKed, handle that as well.
+ n = GNUNET_CONTAINER_multipeermap_get (neighbours,
+ &cmc->im.sender);
+ if (NULL == n)
+ {
+ struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+
+ GNUNET_break (0);
+ finish_cmc_handling (cmc);
+ GNUNET_SERVICE_client_drop (client);
+ return;
+ }
+ /* FIXME-OPTIMIZE: maybe use another hash map here? */
+ matched = GNUNET_NO;
+ for (struct PendingMessage *pm = n->pending_msg_head;
+ NULL != pm;
+ pm = pm->prev_neighbour)
+ {
+ if (0 !=
+ GNUNET_memcmp (&fa->msg_uuid,
+ &pm->msg_uuid))
+ continue;
+ matched = GNUNET_YES;
+ if (GNUNET_YES ==
+ check_ack_against_pm (pm,
+ fa))
+ {
+ struct GNUNET_TIME_Relative avg_ack_delay
+ = GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
+ // FIXME: update RTT and other reliability data!
+ // ISSUE: we don't know which of n's queues the message(s)
+ // took (and in fact the different messages might have gone
+ // over different queues and possibly over multiple).
+ // => track queues with PendingMessages, and update RTT only if
+ // the queue used is unique?
+ // -> how can we get loss rates?
+ // -> or, add extra state to Box and ACK to identify queue?
+ (void) avg_ack_delay;
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# FRAGMENT_ACKS dropped, no matching fragment",
+ 1,
+ GNUNET_NO);
+ }
+ if (NULL == pm->head_frag)
+ {
+ // if entire message is ACKed, handle that as well.
+ // => clean up PM, any post actions?
+ free_pending_message (pm);
+ }
+ else
+ {
+ struct GNUNET_TIME_Relative reassembly_timeout
+ = GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
+ // OPTIMIZE-FIXME: adjust retransmission strategy based on reassembly_timeout!
+ (void) reassembly_timeout;
+ }
+ break;
+ }
+ if (GNUNET_NO == matched)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# FRAGMENT_ACKS dropped, no matching pending message",
+ 1,
+ GNUNET_NO);
+ }
finish_cmc_handling (cmc);
}
if (pm->msg_uuid_set)
return;
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
- &pm->msg_uuid,
- sizeof (pm->msg_uuid));
+ &pm->msg_uuid,
+ sizeof (pm->msg_uuid));
pm->msg_uuid_set = GNUNET_YES;
}
*/
static struct PendingMessage *
fragment_message (struct PendingMessage *pm,
- uint16_t mtu)
+ uint16_t mtu)
{
struct PendingMessage *ff;
been expanded until we are at a leaf or at a fragment that is small enough */
ff = pm;
while ( ( (ff->bytes_msg > mtu) ||
- (pm == ff) ) &&
- (ff->frag_off == ff->bytes_msg) &&
- (NULL != ff->head_frag) )
+ (pm == ff) ) &&
+ (ff->frag_off == ff->bytes_msg) &&
+ (NULL != ff->head_frag) )
{
ff = ff->head_frag; /* descent into fragmented fragments */
}
if ( ( (ff->bytes_msg > mtu) ||
- (pm == ff) ) &&
+ (pm == ff) ) &&
(pm->frag_off < pm->bytes_msg) )
{
/* Did not yet calculate all fragments, calculate next fragment */
}
fragmax = mtu - sizeof (struct TransportFragmentBox);
fragsize = GNUNET_MIN (msize - ff->frag_off,
- fragmax);
+ fragmax);
frag = GNUNET_malloc (sizeof (struct PendingMessage) +
- sizeof (struct TransportFragmentBox) +
- fragsize);
+ sizeof (struct TransportFragmentBox) +
+ fragsize);
frag->target = pm->target;
frag->frag_parent = ff;
frag->timeout = pm->timeout;
msg = (char *) &frag[1];
tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
- fragsize);
+ fragsize);
tfb.frag_uuid = htonl (pm->frag_uuidgen++);
tfb.msg_uuid = pm->msg_uuid;
tfb.frag_off = htons (ff->frag_off + xoff);
tfb.msg_size = htons (pm->bytes_msg);
memcpy (msg,
- &tfb,
- sizeof (tfb));
+ &tfb,
+ sizeof (tfb));
memcpy (&msg[sizeof (tfb)],
- &orig[ff->frag_off],
- fragsize);
+ &orig[ff->frag_off],
+ fragsize);
GNUNET_CONTAINER_MDLL_insert (frag,
- ff->head_frag,
- ff->tail_frag,
- frag);
+ ff->head_frag,
+ ff->tail_frag,
+ frag);
ff->frag_off += fragsize;
ff = frag;
}
tracker_excess_out_cb (void *cls)
{
(void) cls;
-
+
/* FIXME: trigger excess bandwidth report to core? Right now,
this is done internally within transport_api2_core already,
but we probably want to change the logic and trigger it
{
/**
* Set to the @e k'th queue encountered.
- */
+ */
struct Queue *q;
/**
/**
* Set to the total number of queues encountered.
- */
+ */
unsigned int num_queues;
/**
/**
- * Task run when we CONSIDER initiating a DV learn
+ * Task run when we CONSIDER initiating a DV learn
* process. We first check that sending out a message is
* even possible (queues exist), then that it is desirable
* (if not, reschedule the task for later), and finally
&check_connection_quality,
&qqc);
GNUNET_assert (NULL != qqc.q);
-
+
/* Do this as close to transmission time as possible! */
- lle->launch_time = GNUNET_TIME_absolute_get ();
+ lle->launch_time = GNUNET_TIME_absolute_get ();
// FIXME: not so easy, need to BOX this message
// in a transmission request! (mistake also done elsewhere!)
GNUNET_MQ_send (qqc.q->tc->mq,
env);
- /* reschedule this job, randomizing the time it runs (but no
+ /* reschedule this job, randomizing the time it runs (but no
actual backoff!) */
dvlearn_task
= GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY),