#include "gnunet_signatures.h"
#include "transport.h"
+/**
+ * Maximum number of messages we acknowledge together in one
+ * cummulative ACK. Larger values may save a bit of bandwidth.
+ */
+#define MAX_CUMMULATIVE_ACKS 64
/**
* What is the size we assume for a read operation in the
#define MAX_VALIDATION_CHALLENGE_FREQ \
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
+/**
+ * How long until we forget about historic accumulators and thus
+ * reset the ACK counter? Should exceed the maximum time an
+ * active connection experiences without an ACK.
+ */
+#define ACK_CUMMULATOR_TIMEOUT \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
+
/**
* What is the non-randomized base frequency at which we
* would initiate DV learn messages?
* Unique value, generated by incrementing the
* `message_uuid_ctr` of `struct Neighbour`.
*/
- uint32_t uuid GNUNET_PACKED;
+ uint64_t uuid GNUNET_PACKED;
+};
+
+/**
+ * Unique identifier to map an acknowledgement to a transmission.
+ */
+struct AcknowledgementUUIDP
+{
/**
- * UUID of the queue that was used to transmit this message.
- * Used to map acknowledgements back to the respective queue.
+ * The UUID value. Not actually a hash, but a random value.
*/
- uint32_t queue_uuid GNUNET_PACKED;
+ struct GNUNET_ShortHashCode value;
};
* messages sent over possibly unreliable channels. Should
* be a random.
*/
- struct MessageUUIDP msg_uuid;
+ struct AcknowledgementUUIDP ack_uuid;
+};
+
+
+/**
+ * Acknowledgement payload.
+ */
+struct TransportCummulativeAckPayloadP
+{
+ /**
+ * How long was the ACK delayed for generating cummulative ACKs?
+ * Used to calculate the correct network RTT by taking the receipt
+ * time of the ack minus the transmission time of the sender minus
+ * this value.
+ */
+ struct GNUNET_TIME_RelativeNBO ack_delay;
+
+ /**
+ * UUID of a message being acknowledged.
+ */
+ struct AcknowledgementUUIDP ack_uuid;
};
struct GNUNET_MessageHeader header;
/**
- * Reserved. Zero.
+ * Counter of ACKs transmitted by the sender to us. Incremented
+ * by one for each ACK, used to detect how many ACKs were lost.
*/
- uint32_t reserved GNUNET_PACKED;
+ uint32_t ack_counter GNUNET_PACKED;
- /**
- * How long was the ACK delayed relative to the average time of
- * receipt of the messages being acknowledged? Used to calculate
- * the average RTT by taking the receipt time of the ack minus the
- * average transmission time of the sender minus this value.
- */
- struct GNUNET_TIME_RelativeNBO avg_ack_delay;
-
- /* followed by any number of `struct MessageUUIDP`
+ /* followed by any number of `struct TransportCummulativeAckPayloadP`
messages providing ACKs */
};
/**
* Unique ID of this fragment (and fragment transmission!). Will
* change even if a fragement is retransmitted to make each
- * transmission attempt unique! Should be incremented by one for
- * each fragment transmission. If a client receives a duplicate
- * fragment (same @e frag_off), it must send
- * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately.
+ * transmission attempt unique! If a client receives a duplicate
+ * fragment (same @e frag_off for same @a msg_uuid, it must send
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK immediately.
*/
- struct FragmentUUIDP frag_uuid;
+ struct AcknowledgementUUIDP ack_uuid;
/**
- * Original message ID for of the message that all the
- * fragments belong to. Must be the same for all fragments.
+ * Original message ID for of the message that all the fragments
+ * belong to. Must be the same for all fragments.
*/
struct MessageUUIDP msg_uuid;
};
-/**
- * Outer layer of an fragmented application message sent over a queue
- * with finite MTU. When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is
- * received, the receiver has two RTTs or 64 further fragments with
- * the same basic message time to send an acknowledgement, possibly
- * acknowledging up to 65 fragments in one ACK. ACKs must also be
- * sent immediately once all fragments were sent.
- */
-struct TransportFragmentAckMessage
-{
- /**
- * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Unique ID of the lowest fragment UUID being acknowledged.
- */
- struct FragmentUUIDP frag_uuid;
-
- /**
- * Bitfield of up to 64 additional fragments following the
- * @e msg_uuid being acknowledged by this message.
- */
- uint64_t extra_acks GNUNET_PACKED;
-
- /**
- * Original message ID for of the message that all the
- * fragments belong to.
- */
- struct MessageUUIDP msg_uuid;
-
- /**
- * How long was the ACK delayed relative to the average time of
- * receipt of the fragments being acknowledged? Used to calculate
- * the average RTT by taking the receipt time of the ack minus the
- * average transmission time of the sender minus this value.
- */
- struct GNUNET_TIME_RelativeNBO avg_ack_delay;
-
- /**
- * How long until the receiver will stop trying reassembly
- * of this message?
- */
- struct GNUNET_TIME_RelativeNBO reassembly_timeout;
-};
-
-
/**
* Content signed by the initator during DV learning.
*
*/
struct TransportClient;
-
/**
* A neighbour that at least one communicator is connected to.
*/
struct Neighbour;
-
/**
* Entry in our #dv_routes table, representing a (set of) distance
* vector routes to a particular peer.
*/
struct DistanceVector;
+/**
+ * A queue is a message queue provided by a communicator
+ * via which we can reach a particular neighbour.
+ */
+struct Queue;
+
+/**
+ * Message awaiting transmission. See detailed comments below.
+ */
+struct PendingMessage;
+
+/**
+ * One possible hop towards a DV target.
+ */
+struct DistanceVectorHop;
+
+
+/**
+ * Data structure kept when we are waiting for an acknowledgement.
+ */
+struct PendingAcknowledgement
+{
+
+ /**
+ * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to its pending message.
+ */
+ struct PendingAcknowledgement *next_pm;
+
+ /**
+ * If @e pm is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to its pending message.
+ */
+ struct PendingAcknowledgement *prev_pm;
+
+ /**
+ * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to the queue that was used to transmit the
+ * @a pm.
+ */
+ struct PendingAcknowledgement *next_queue;
+
+ /**
+ * If @e queue is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to the queue that was used to transmit the
+ * @a pm.
+ */
+ struct PendingAcknowledgement *prev_queue;
+
+ /**
+ * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to the DVH that was used to transmit the
+ * @a pm.
+ */
+ struct PendingAcknowledgement *next_dvh;
+
+ /**
+ * If @e dvh is non-NULL, this is the DLL in which this acknowledgement
+ * is kept in relation to the DVH that was used to transmit the
+ * @a pm.
+ */
+ struct PendingAcknowledgement *prev_dvh;
+
+ /**
+ * Pointers for the DLL of all pending acknowledgements.
+ * This list is sorted by @e transmission time. If the list gets too
+ * long, the oldest entries are discarded.
+ */
+ struct PendingAcknowledgement *next_pa;
+
+ /**
+ * Pointers for the DLL of all pending acknowledgements.
+ * This list is sorted by @e transmission time. If the list gets too
+ * long, the oldest entries are discarded.
+ */
+ struct PendingAcknowledgement *prev_pa;
+
+ /**
+ * Unique identifier for this transmission operation.
+ */
+ struct AcknowledgementUUIDP ack_uuid;
+
+ /**
+ * Message that was transmitted, may be NULL if the message was ACKed
+ * via another channel.
+ */
+ struct PendingMessage *pm;
+
+ /**
+ * Distance vector path chosen for this transmission, NULL if transmission
+ * was to a direct neighbour OR if the path was forgotten in the meantime.
+ */
+ struct DistanceVectorHop *dvh;
+
+ /**
+ * Queue used for transmission, NULL if the queue has been destroyed
+ * (which may happen before we get an acknowledgement).
+ */
+ struct Queue *queue;
+
+ /**
+ * Time of the transmission, for RTT calculation.
+ */
+ struct GNUNET_TIME_Absolute transmission_time;
+
+ /**
+ * Number of bytes of the original message (to calculate bandwidth).
+ */
+ uint16_t message_size;
+};
+
+
/**
* One possible hop towards a DV target.
*/
*/
struct DistanceVectorHop *prev_neighbour;
+ /**
+ * Head of DLL of PAs that used our @a path.
+ */
+ struct PendingAcknowledgement *pa_head;
+
+ /**
+ * Tail of DLL of PAs that used our @a path.
+ */
+ struct PendingAcknowledgement *pa_tail;
+
/**
* What would be the next hop to @e target?
*/
};
-/**
- * A queue is a message queue provided by a communicator
- * via which we can reach a particular neighbour.
- */
-struct Queue;
-
-/**
- * Message awaiting transmission. See detailed comments below.
- */
-struct PendingMessage;
-
/**
* Entry identifying transmission in one of our `struct
* Queue` which still awaits an ACK. This is used to
*/
struct Queue *next_client;
+ /**
+ * Head of DLL of PAs that used this queue.
+ */
+ struct PendingAcknowledgement *pa_head;
+
+ /**
+ * Tail of DLL of PAs that used this queue.
+ */
+ struct PendingAcknowledgement *pa_tail;
+
/**
* Head of DLL of unacked transmission requests.
*/
*/
uint32_t qid;
- /**
- * UUID used to map acknowledgements back to the queue that
- * was used for transmission. Note that @e queue_uuid-s are
- * only unique per neighbour (generated via `queue_uuid_gen`
- * of `struct Neighbour`).
- */
- uint32_t queue_uuid;
-
/**
* Maximum transmission unit supported by this queue.
*/
{
/**
- * Original message ID for of the message that all the
- * fragments belong to.
+ * Original message ID for of the message that all the fragments
+ * belong to.
*/
struct MessageUUIDP msg_uuid;
*/
struct GNUNET_TIME_Absolute reassembly_timeout;
- /**
- * Average delay of all acks in @e extra_acks and @e frag_uuid.
- * Should be reset to zero when @e num_acks is set to 0.
- */
- struct GNUNET_TIME_Relative avg_ack_delay;
-
/**
* Time we received the last fragment. @e avg_ack_delay must be
* incremented by now - @e last_frag multiplied by @e num_acks.
*/
struct GNUNET_TIME_Absolute last_frag;
- /**
- * Bitfield of up to 64 additional fragments following @e frag_uuid
- * to be acknowledged in the next cummulative ACK.
- */
- uint64_t extra_acks;
-
- /**
- * Unique ID of the lowest fragment UUID to be acknowledged in the
- * next cummulative ACK. Only valid if @e num_acks > 0.
- */
- uint32_t frag_uuid;
-
- /**
- * Number of ACKs we have accumulated so far. Reset to 0
- * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK.
- */
- unsigned int num_acks;
-
/**
* How big is the message we are reassembling in total?
*/
*/
struct GNUNET_TIME_Absolute earliest_timeout;
- /**
- * Incremented by one for each queue to generate
- * unique queue identifiers. Initially set to a random value.
- *
- * FIXME: Deal with wrap around (might be triggered by very
- * persistent adversary).
- */
- uint32_t queue_uuid_gen;
-
- /**
- * Incremented by one for each message sent to this neighbour, to
- * uniquely identify that message in replies (note that fragments
- * use another additional counter). Initially set to a random value.
- *
- * It should be safe to assume that by the time this value may wrap
- * around, the original message is long "gone" and no longer
- * relevant.
- */
- uint32_t message_uuid_ctr;
-
/**
* Do we have a confirmed working queue and are thus visible to
* CORE?
*/
struct PendingMessage *prev_frag;
+ /**
+ * Head of DLL of PAs for this pending message.
+ */
+ struct PendingAcknowledgement *pa_head;
+
+ /**
+ * Tail of DLL of PAs for this pending message.
+ */
+ struct PendingAcknowledgement *pa_tail;
+
/**
* This message, reliability boxed. Only possibly available if @e pmt is
* #PMT_CORE.
*/
struct MessageUUIDP msg_uuid;
- /**
- * Counter incremented per generated fragment.
- */
- uint32_t frag_uuidgen;
-
/**
* Type of the pending message.
*/
};
+/**
+ * Acknowledgement payload.
+ */
+struct TransportCummulativeAckPayload
+{
+ /**
+ * When did we receive the message we are ACKing? Used to calculate
+ * the delay we introduced by cummulating ACKs.
+ */
+ struct GNUNET_TIME_Absolute receive_time;
+
+ /**
+ * UUID of a message being acknowledged.
+ */
+ struct AcknowledgementUUIDP ack_uuid;
+};
+
+
+/**
+ * Data structure in which we track acknowledgements still to
+ * be sent to the
+ */
+struct AcknowledgementCummulator
+{
+ /**
+ * Target peer for which we are accumulating ACKs here.
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * ACK data being accumulated. Only @e num_acks slots are valid.
+ */
+ struct TransportCummulativeAckPayload ack_uuids[MAX_CUMMULATIVE_ACKS];
+
+ /**
+ * Task scheduled either to transmit the cummulative ACK message,
+ * or to clean up this data structure after extended periods of
+ * inactivity (if @e num_acks is zero).
+ */
+ struct GNUNET_SCHEDULER_Task *task;
+
+ /**
+ * When is @e task run (only used if @e num_acks is non-zero)?
+ */
+ struct GNUNET_TIME_Absolute min_transmission_time;
+
+ /**
+ * Counter to produce the `ack_counter` in the `struct
+ * TransportReliabilityAckMessage`. Allows the receiver to detect
+ * lost ACK messages. Incremented by @e num_acks upon transmission.
+ */
+ uint32_t ack_counter;
+
+ /**
+ * Number of entries used in @e ack_uuids. Reset to 0 upon transmission.
+ */
+ unsigned int num_acks;
+};
+
+
/**
* One of the addresses of this peer.
*/
*/
static struct GNUNET_CONTAINER_MultiPeerMap *backtalkers;
+/**
+ * Map from PIDs to `struct AcknowledgementCummulator`s.
+ * Here we track the cummulative ACKs for transmission.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *ack_cummulators;
+
+/**
+ * Map of pending acknowledgements, mapping `struct AcknowledgementUUID` to
+ * a `struct PendingAcknowledgement`.
+ */
+static struct GNUNET_CONTAINER_MultiShortmap *pending_acks;
+
/**
* Map from PIDs to `struct DistanceVector` entries describing
* known paths to the peer.
*/
static struct GNUNET_SCHEDULER_Task *validation_task;
+/**
+ * The most recent PA we have created, head of DLL.
+ * The length of the DLL is kept in #pa_count.
+ */
+static struct PendingAcknowledgement *pa_head;
+
+/**
+ * The oldest PA we have created, tail of DLL.
+ * The length of the DLL is kept in #pa_count.
+ */
+static struct PendingAcknowledgement *pa_tail;
+
+/**
+ * Number of entries in the #pa_head/#pa_tail DLL. Used to
+ * limit the size of the data structure.
+ */
+static unsigned int pa_count;
+
+
+/**
+ * Release @a pa data structure.
+ *
+ * @param pa data structure to release
+ */
+static void
+free_pending_acknowledgement (struct PendingAcknowledgement *pa)
+{
+ struct Queue *q = pa->queue;
+ struct PendingMessage *pm = pa->pm;
+ struct DistanceVectorHop *dvh = pa->dvh;
+
+ GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa);
+ pa_count--;
+ if (NULL != q)
+ {
+ GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
+ pa->queue = NULL;
+ }
+ if (NULL != pm)
+ {
+ GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+ pa->pm = NULL;
+ }
+ if (NULL != dvh)
+ {
+ GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+ pa->queue = NULL;
+ }
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multishortmap_remove (pending_acks,
+ &pa->ack_uuid.value,
+ pa));
+ GNUNET_free (pa);
+}
+
/**
* Free cached ephemeral key.
{
struct Neighbour *n = dvh->next_hop;
struct DistanceVector *dv = dvh->dv;
+ struct PendingAcknowledgement *pa;
+ while (NULL != (pa = dvh->pa_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
+ pa->dvh = NULL;
+ }
GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh);
GNUNET_free (dvh);
.rtt = GNUNET_TIME_UNIT_FOREVER_REL};
struct QueueEntry *qe;
int maxxed;
+ struct PendingAcknowledgement *pa;
if (NULL != queue->transmit_task)
{
GNUNET_SCHEDULER_cancel (queue->visibility_task);
queue->visibility_task = NULL;
}
+ while (NULL != (pa = queue->pa_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
+ pa->queue = NULL;
+ }
+
GNUNET_CONTAINER_MDLL_remove (neighbour,
neighbour->queue_head,
neighbour->queue_tail,
/**
* Free fragment tree below @e root, excluding @e root itself.
+ * FIXME: this does NOT seem to have the intended semantics
+ * based on how this is called. Seems we generally DO expect
+ * @a root to be free'ed itself as well!
*
* @param root root of the tree to free
*/
while (NULL != (frag = root->head_frag))
{
+ struct PendingAcknowledgement *pa;
+
free_fragment_tree (frag);
+ while (NULL != (pa = frag->pa_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
+ pa->pm = NULL;
+ }
GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
GNUNET_free (frag);
}
{
struct TransportClient *tc = pm->client;
struct Neighbour *target = pm->target;
+ struct PendingAcknowledgement *pa;
if (NULL != tc)
{
target->pending_msg_head,
target->pending_msg_tail,
pm);
+ while (NULL != (pa = pm->pa_head))
+ {
+ GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
+ pa->pm = NULL;
+ }
+
free_fragment_tree (pm);
if (NULL != pm->qe)
{
/**
- * Generate a fragment acknowledgement for an @a rc.
+ * Clean up an idle cummulative acknowledgement data structure.
*
- * @param rc context to generate ACK for, @a rc ACK state is reset
+ * @param cls a `struct AcknowledgementCummulator *`
*/
static void
-send_fragment_ack (struct ReassemblyContext *rc)
+destroy_ack_cummulator (void *cls)
{
- struct TransportFragmentAckMessage *ack;
+ struct AcknowledgementCummulator *ac = cls;
- ack = GNUNET_new (struct TransportFragmentAckMessage);
- ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
- ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
- ack->frag_uuid.uuid = htonl (rc->frag_uuid);
- ack->extra_acks = GNUNET_htonll (rc->extra_acks);
- ack->msg_uuid = rc->msg_uuid;
- ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
- if (0 == rc->msg_missing)
- ack->reassembly_timeout = GNUNET_TIME_relative_hton (
- GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
+ ac->task = NULL;
+ GNUNET_assert (0 == ac->num_acks);
+ GNUNET_assert (
+ GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (ack_cummulators, &ac->target, ac));
+ GNUNET_free (ac);
+}
+
+
+/**
+ * Do the transmission of a cummulative acknowledgement now.
+ *
+ * @param cls a `struct AcknowledgementCummulator *`
+ */
+static void
+transmit_cummulative_ack_cb (void *cls)
+{
+ struct AcknowledgementCummulator *ac = cls;
+ struct TransportReliabilityAckMessage *ack;
+ struct TransportCummulativeAckPayloadP *ap;
+
+ ac->task = NULL;
+ GNUNET_assert (0 < ac->ack_counter);
+ ack = GNUNET_malloc (sizeof (*ack) +
+ ac->ack_counter *
+ sizeof (struct TransportCummulativeAckPayloadP));
+ ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
+ ack->header.size =
+ htons (sizeof (*ack) +
+ ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP));
+ ack->ack_counter = htonl (ac->ack_counter++);
+ ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
+ for (unsigned int i = 0; i < ac->ack_counter; i++)
+ {
+ ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
+ ap[i].ack_delay = GNUNET_TIME_relative_hton (
+ GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
+ }
+ route_message (&ac->target, &ack->header, RMO_DV_ALLOWED);
+ ac->num_acks = 0;
+ ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
+ &destroy_ack_cummulator,
+ ac);
+}
+
+
+/**
+ * Transmit an acknowledgement for @a ack_uuid to @a pid delaying
+ * transmission by at most @a ack_delay.
+ *
+ * @param pid target peer
+ * @param ack_uuid UUID to ack
+ * @param max_delay how long can the ACK wait
+ */
+static void
+cummulative_ack (const struct GNUNET_PeerIdentity *pid,
+ const struct AcknowledgementUUIDP *ack_uuid,
+ struct GNUNET_TIME_Absolute max_delay)
+{
+ struct AcknowledgementCummulator *ac;
+
+ ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid);
+ if (NULL == ac)
+ {
+ ac = GNUNET_new (struct AcknowledgementCummulator);
+ ac->target = *pid;
+ ac->min_transmission_time = max_delay;
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ ack_cummulators,
+ &ac->target,
+ ac,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ }
else
- ack->reassembly_timeout = GNUNET_TIME_relative_hton (
- GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
- route_message (&rc->neighbour->pid, &ack->header, RMO_DV_ALLOWED);
- rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
- rc->num_acks = 0;
- rc->extra_acks = 0LLU;
+ {
+ if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
+ {
+ /* must run immediately, ack buffer full! */
+ GNUNET_SCHEDULER_cancel (ac->task);
+ transmit_cummulative_ack_cb (ac);
+ }
+ GNUNET_SCHEDULER_cancel (ac->task);
+ ac->min_transmission_time =
+ GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
+ }
+ GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
+ ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
+ ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
+ ac->num_acks++;
+ ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
+ &transmit_cummulative_ack_cb,
+ ac);
}
uint16_t msize;
uint16_t fsize;
uint16_t frag_off;
- uint32_t frag_uuid;
char *target;
struct GNUNET_TIME_Relative cdelay;
- int ack_now;
struct FindByMessageUuidContext fc;
n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
/* reassemble */
fsize = ntohs (fb->header.size) - sizeof (*fb);
+ if (0 == fsize)
+ {
+ GNUNET_break (0);
+ finish_cmc_handling (cmc);
+ return;
+ }
frag_off = ntohs (fb->frag_off);
memcpy (&target[frag_off], &fb[1], fsize);
/* update bitfield and msg_missing */
}
/* Compute cummulative ACK */
- frag_uuid = ntohl (fb->frag_uuid.uuid);
cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
- cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->num_acks);
+ cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
+ if (0 == rc->msg_missing)
+ cdelay = GNUNET_TIME_UNIT_ZERO;
+ cummulative_ack (&cmc->im.sender,
+ &fb->ack_uuid,
+ GNUNET_TIME_relative_to_absolute (cdelay));
rc->last_frag = GNUNET_TIME_absolute_get ();
- rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, cdelay);
- ack_now = GNUNET_NO;
- if (0 == rc->num_acks)
- {
- /* case one: first ack */
- rc->frag_uuid = frag_uuid;
- rc->extra_acks = 0LLU;
- rc->num_acks = 1;
- }
- else if ((frag_uuid >= rc->frag_uuid) && (frag_uuid <= rc->frag_uuid + 64))
- {
- /* case two: ack fits after existing min UUID */
- if ((frag_uuid == rc->frag_uuid) ||
- (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))))
- {
- /* duplicate fragment, ack now! */
- ack_now = GNUNET_YES;
- }
- else
- {
- rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
- rc->num_acks++;
- }
- }
- else if ((rc->frag_uuid > frag_uuid) &&
- (((rc->frag_uuid == frag_uuid + 64) && (0 == rc->extra_acks)) ||
- ((rc->frag_uuid < frag_uuid + 64) &&
- (rc->extra_acks ==
- (rc->extra_acks &
- ~((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))))))
- {
- /* can fit ack by shifting extra acks and starting at
- frag_uid, test above esured that the bits we will
- shift 'extra_acks' by are all zero. */
- rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
- rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
- rc->frag_uuid = frag_uuid;
- rc->num_acks++;
- }
- if (65 == rc->num_acks) /* OPTIMIZE-FIXME: maybe use smaller threshold? This
- is very aggressive. */
- ack_now = GNUNET_YES; /* maximum acks received */
- // FIXME: possibly also ACK based on RTT (but for that we'd need to
- // determine the queue used for the ACK first!)
-
/* is reassembly complete? */
if (0 != rc->msg_missing)
{
- if (ack_now)
- send_fragment_ack (rc);
finish_cmc_handling (cmc);
return;
}
return;
}
/* successful reassembly */
- send_fragment_ack (rc);
demultiplex_with_cmc (cmc, msg);
/* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
en-route and we forget that we finished this reassembly immediately!
/**
- * Check the @a fa against the fragments associated with @a pm.
- * If it matches, remove the matching fragments from the transmission
- * list.
+ * Communicator gave us a reliability box. Check the message.
*
- * @param pm pending message to check against the ack
- * @param fa the ack that was received
- * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param rb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
*/
static int
-check_ack_against_pm (struct PendingMessage *pm,
- const struct TransportFragmentAckMessage *fa)
-{
- int match;
- struct PendingMessage *nxt;
- uint32_t fs = ntohl (fa->frag_uuid.uuid);
- uint64_t xtra = GNUNET_ntohll (fa->extra_acks);
-
- match = GNUNET_NO;
- for (struct PendingMessage *frag = pm->head_frag; NULL != frag; frag = nxt)
- {
- const struct TransportFragmentBoxMessage *tfb =
- (const struct TransportFragmentBoxMessage *) &pm[1];
- uint32_t fu = ntohl (tfb->frag_uuid.uuid);
-
- GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt);
- nxt = frag->next_frag;
- /* Check for exact match or match in the 'xtra' bitmask */
- if ((fu == fs) ||
- ((fu > fs) && (fu <= fs + 64) && (0 != (1LLU << (fu - fs - 1) & xtra))))
- {
- match = GNUNET_YES;
- free_fragment_tree (frag);
- }
- }
- return match;
+check_reliability_box (void *cls,
+ const struct TransportReliabilityBoxMessage *rb)
+{
+ GNUNET_MQ_check_boxed_message (rb);
+ return GNUNET_YES;
}
/**
- * Communicator gave us a fragment acknowledgement. Process the request.
+ * Communicator gave us a reliability box. Process the request.
*
* @param cls a `struct CommunicatorMessageContext` (must call
* #finish_cmc_handling() when done)
- * @param fa the message that was received
+ * @param rb the message that was received
*/
static void
-handle_fragment_ack (void *cls, const struct TransportFragmentAckMessage *fa)
+handle_reliability_box (void *cls,
+ const struct TransportReliabilityBoxMessage *rb)
{
struct CommunicatorMessageContext *cmc = cls;
- struct Neighbour *n;
- int matched;
+ const struct GNUNET_MessageHeader *inbox =
+ (const struct GNUNET_MessageHeader *) &rb[1];
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
- if (NULL == n)
- {
- struct GNUNET_SERVICE_Client *client = cmc->tc->client;
+ // FIXME: call cummulative_ack(), have ack_countdown influence max_delay!
+ (void) (0 == ntohl (rb->ack_countdown));
+ /* continue with inner message */
+ demultiplex_with_cmc (cmc, inbox);
+}
- GNUNET_break (0);
- finish_cmc_handling (cmc);
- GNUNET_SERVICE_client_drop (client);
- return;
- }
- /* FIXME-OPTIMIZE: maybe use another hash map here? */
- matched = GNUNET_NO;
- for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm;
- pm = pm->prev_neighbour)
- {
- if (0 != GNUNET_memcmp (&fa->msg_uuid, &pm->msg_uuid))
- continue;
- matched = GNUNET_YES;
- if (GNUNET_YES == check_ack_against_pm (pm, fa))
- {
- struct GNUNET_TIME_Relative avg_ack_delay =
- GNUNET_TIME_relative_ntoh (fa->avg_ack_delay);
- // FIXME: update RTT and other reliability data!
- // ISSUE: we don't know which of n's queues the message(s)
- // took (and in fact the different messages might have gone
- // over different queues and possibly over multiple).
- // => track queues with PendingMessages, and update RTT only if
- // the queue used is unique?
- // -> how can we get loss rates?
- // -> or, add extra state to Box and ACK to identify queue?
- // IDEA: generate MULTIPLE frag-uuids per fragment and track
- // the queue with the fragment! (-> this logic must
- // be moved into check_ack_against_pm!)
- (void) avg_ack_delay;
- }
- else
- {
- GNUNET_STATISTICS_update (GST_stats,
- "# FRAGMENT_ACKS dropped, no matching fragment",
- 1,
- GNUNET_NO);
- }
- if (NULL == pm->head_frag)
- {
- // if entire message is ACKed, handle that as well.
- // => clean up PM, any post actions?
- free_pending_message (pm);
- }
- else
- {
- struct GNUNET_TIME_Relative reassembly_timeout =
- GNUNET_TIME_relative_ntoh (fa->reassembly_timeout);
- // OPTIMIZE-FIXME: adjust retransmission strategy based on
- // reassembly_timeout!
- (void) reassembly_timeout;
- }
- break;
- }
- if (GNUNET_NO == matched)
- {
- GNUNET_STATISTICS_update (GST_stats,
- "# FRAGMENT_ACKS dropped, no matching pending message",
- 1,
- GNUNET_NO);
- }
- finish_cmc_handling (cmc);
+
+/**
+ * We have successfully transmitted data via @a q, update metrics.
+ *
+ * @param q queue to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
+ */
+static void
+update_queue_performance (struct Queue *q,
+ struct GNUNET_TIME_Relative rtt,
+ uint16_t bytes_transmitted_ok)
+{
+ // FIXME: implement!
}
/**
- * Communicator gave us a reliability box. Check the message.
+ * We have successfully transmitted data via @a dvh, update metrics.
*
- * @param cls a `struct CommunicatorMessageContext`
- * @param rb the send message that was sent
- * @return #GNUNET_YES if message is well-formed
+ * @param dvh distance vector path data to update
+ * @param rtt round trip time observed
+ * @param bytes_transmitted_ok number of bytes successfully transmitted
*/
-static int
-check_reliability_box (void *cls,
- const struct TransportReliabilityBoxMessage *rb)
+static void
+update_dvh_performance (struct DistanceVectorHop *dvh,
+ struct GNUNET_TIME_Relative rtt,
+ uint16_t bytes_transmitted_ok)
{
- GNUNET_MQ_check_boxed_message (rb);
- return GNUNET_YES;
+ // FIXME: implement!
}
/**
- * Communicator gave us a reliability box. Process the request.
+ * The @a pa was acknowledged, process the acknowledgement.
*
- * @param cls a `struct CommunicatorMessageContext` (must call
- * #finish_cmc_handling() when done)
- * @param rb the message that was received
+ * @param pa the pending acknowledgement that was satisfied
+ * @param ack_delay artificial delay from cummulative acks created by the other
+ * peer
*/
static void
-handle_reliability_box (void *cls,
- const struct TransportReliabilityBoxMessage *rb)
+handle_acknowledged (struct PendingAcknowledgement *pa,
+ struct GNUNET_TIME_Relative ack_delay)
{
- struct CommunicatorMessageContext *cmc = cls;
- const struct GNUNET_MessageHeader *inbox =
- (const struct GNUNET_MessageHeader *) &rb[1];
+ struct PendingMessage *pm = pa->pm;
+ struct GNUNET_TIME_Relative delay;
- if (0 == ntohl (rb->ack_countdown))
- {
- struct TransportReliabilityAckMessage *ack;
+ delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time);
+ if (delay.rel_value_us > ack_delay.rel_value_us)
+ delay = GNUNET_TIME_UNIT_ZERO;
+ else
+ delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
+ if (NULL != pa->queue)
+ update_queue_performance (pa->queue, delay, pa->message_size);
+ if (NULL != pa->dvh)
+ update_dvh_performance (pa->dvh, delay, pa->message_size);
+ if (NULL != pm)
+ {
+ if (NULL != pm->frag_parent)
+ {
+ pm = pm->frag_parent;
+ free_fragment_tree (pa->pm);
+ }
+ while ((NULL != pm->frag_parent) && (NULL == pm->head_frag))
+ {
+ struct PendingMessage *parent = pm->frag_parent;
- /* FIXME-OPTIMIZE: implement cummulative ACKs and ack_countdown,
- then setting the avg_ack_delay field below: */
- ack = GNUNET_malloc (sizeof (*ack) + sizeof (struct MessageUUIDP));
- ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK);
- ack->header.size = htons (sizeof (*ack) + sizeof (struct MessageUUIDP));
- memcpy (&ack[1], &rb->msg_uuid, sizeof (struct MessageUUIDP));
- route_message (&cmc->im.sender, &ack->header, RMO_DV_ALLOWED);
+ free_fragment_tree (pm);
+ pm = parent;
+ }
+ if (NULL != pm->head_frag)
+ pm = NULL; /* we are done, otherwise free 'pm' below */
}
- /* continue with inner message */
- demultiplex_with_cmc (cmc, inbox);
+ if (NULL != pm)
+ free_pending_message (pm);
+ free_pending_acknowledgement (pa);
+}
+
+
+/**
+ * Communicator gave us a reliability ack. Check it is well-formed.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (unused)
+ * @param ra the message that was received
+ * @return #GNUNET_Ok if @a ra is well-formed
+ */
+static int
+check_reliability_ack (void *cls,
+ const struct TransportReliabilityAckMessage *ra)
+{
+ unsigned int n_acks;
+
+ (void) cls;
+ n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
+ sizeof (struct TransportCummulativeAckPayloadP);
+ if (0 == n_acks)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if ((ntohs (ra->header.size) - sizeof (*ra)) !=
+ n_acks * sizeof (struct TransportCummulativeAckPayloadP))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
}
const struct TransportReliabilityAckMessage *ra)
{
struct CommunicatorMessageContext *cmc = cls;
- struct Neighbour *n;
+ const struct TransportCummulativeAckPayloadP *ack;
+ struct PendingAcknowledgement *pa;
unsigned int n_acks;
- const struct MessageUUIDP *msg_uuids;
- struct PendingMessage *nxt;
- int matched;
+ uint32_t ack_counter;
- n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender);
- if (NULL == n)
+ n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
+ sizeof (struct TransportCummulativeAckPayloadP);
+ ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
+ for (unsigned int i = 0; i < n_acks; i++)
{
- struct GNUNET_SERVICE_Client *client = cmc->tc->client;
-
- GNUNET_break (0);
- finish_cmc_handling (cmc);
- GNUNET_SERVICE_client_drop (client);
- return;
- }
- n_acks =
- (ntohs (ra->header.size) - sizeof (*ra)) / sizeof (struct MessageUUIDP);
- msg_uuids = (const struct MessageUUIDP *) &ra[1];
-
- /* FIXME-OPTIMIZE: maybe use another hash map here? */
- matched = GNUNET_NO;
- for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm; pm = nxt)
- {
- int in_list;
-
- nxt = pm->next_neighbour;
- in_list = GNUNET_NO;
- for (unsigned int i = 0; i < n_acks; i++)
+ pa =
+ GNUNET_CONTAINER_multishortmap_get (pending_acks, &ack[i].ack_uuid.value);
+ if (NULL == pa)
{
- if (0 != GNUNET_memcmp (&msg_uuids[i], &pm->msg_uuid))
- continue;
- in_list = GNUNET_YES;
- break;
- }
- if (GNUNET_NO == in_list)
+ GNUNET_STATISTICS_update (
+ GST_stats,
+ "# FRAGMENT_ACKS dropped, no matching pending message",
+ 1,
+ GNUNET_NO);
continue;
-
- /* this pm was acked! */
- matched = GNUNET_YES;
- free_pending_message (pm);
-
- {
- struct GNUNET_TIME_Relative avg_ack_delay =
- GNUNET_TIME_relative_ntoh (ra->avg_ack_delay);
- // FIXME: update RTT and other reliability data!
- // ISSUE: we don't know which of n's queues the message(s)
- // took (and in fact the different messages might have gone
- // over different queues and possibly over multiple).
- // => track queues with PendingMessages, and update RTT only if
- // the queue used is unique?
- // -> how can we get loss rates?
- // -> or, add extra state to MSG and ACKs to identify queue?
- // -> if we do this, might just do the same for the avg_ack_delay!
- (void) avg_ack_delay;
}
+ handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
}
- if (GNUNET_NO == matched)
- {
- GNUNET_STATISTICS_update (GST_stats,
- "# FRAGMENT_ACKS dropped, no matching pending message",
- 1,
- GNUNET_NO);
- }
+
+ ack_counter = htonl (ra->ack_counter);
+ // FIXME: track ACK losses based on ack_counter somewhere!
+ // (DV and/or Neighbour?)
finish_cmc_handling (cmc);
}
1,
GNUNET_NO);
b->monotonic_time = mt;
- /* Setting body_size to 0 prevents call to #forward_backchannel_payload() */
+ /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
+ */
b->body_size = 0;
return;
}
* non-first hop is in our neighbour list (returning #GNUNET_SYSERR).
*
* @param path the path we learned, path[0] should be us,
- * and then path contains a valid path from us to `path[path_len-1]`
- * path[1] should be a direct neighbour (we should check!)
+ * and then path contains a valid path from us to
+ * `path[path_len-1]` path[1] should be a direct neighbour (we should check!)
* @param path_len number of entries on the @a path, at least three!
* @param network_latency how long does the message take from us to
* `path[path_len-1]`? set to "forever" if unknown
- * @param path_valid_until how long is this path considered validated? Maybe be
- * zero.
+ * @param path_valid_until how long is this path considered validated? Maybe
+ * be zero.
* @return #GNUNET_YES on success,
* #GNUNET_NO if we have better path(s) to the target
* #GNUNET_SYSERR if the path is useless and/or invalid
finish_cmc_handling (cmc);
/* OPTIMIZE-FIXME: Technically, we only need to bother checking
- the initiator signature if we send the message back to the initiator... */
+ the initiator signature if we send the message back to the initiator...
+ */
if (GNUNET_OK != validate_dv_initiator_signature (&dvl->initiator,
&dvl->challenge,
&dvl->init_sig))
GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
struct TransportFragmentBoxMessage,
&cmc),
- GNUNET_MQ_hd_fixed_size (fragment_ack,
- GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
- struct TransportFragmentAckMessage,
- &cmc),
GNUNET_MQ_hd_var_size (reliability_box,
GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
struct TransportReliabilityBoxMessage,
&cmc),
- GNUNET_MQ_hd_fixed_size (reliability_ack,
- GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
- struct TransportReliabilityAckMessage,
- &cmc),
+ GNUNET_MQ_hd_var_size (reliability_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+ struct TransportReliabilityAckMessage,
+ &cmc),
GNUNET_MQ_hd_var_size (backchannel_encapsulation,
GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
struct TransportBackchannelEncapsulationMessage,
}
+/**
+ * Setup data structure waiting for acknowledgements.
+ *
+ * @param queue queue the @a pm will be sent over
+ * @param dvh path the message will take, may be NULL
+ * @param pm the pending message for transmission
+ * @return corresponding fresh pending acknowledgement
+ */
+static struct PendingAcknowledgement *
+prepare_pending_acknowledgement (struct Queue *queue,
+ struct DistanceVectorHop *dvh,
+ struct PendingMessage *pm)
+{
+ struct PendingAcknowledgement *pa;
+
+ pa = GNUNET_new (struct PendingAcknowledgement);
+ pa->queue = queue;
+ pa->dvh = dvh;
+ pa->pm = pm;
+ do
+ {
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &pa->ack_uuid,
+ sizeof (pa->ack_uuid));
+ } while (GNUNET_YES != GNUNET_CONTAINER_multishortmap_put (
+ pending_acks,
+ &pa->ack_uuid.value,
+ pa,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_MDLL_insert (queue, queue->pa_head, queue->pa_tail, pa);
+ GNUNET_CONTAINER_MDLL_insert (pm, pm->pa_head, pm->pa_tail, pa);
+ if (NULL != dvh)
+ GNUNET_CONTAINER_MDLL_insert (dvh, dvh->pa_head, dvh->pa_tail, pa);
+ pa->transmission_time = GNUNET_TIME_absolute_get ();
+ pa->message_size = pm->bytes_msg;
+ return pa;
+}
+
+
/**
* Fragment the given @a pm to the given @a mtu. Adds
* additional fragments to the neighbour as well. If the
* @a mtu is too small, generates and error for the @a pm
* and returns NULL.
*
+ * @param queue which queue to fragment for
+ * @param dvh path the message will take, or NULL
* @param pm pending message to fragment for transmission
- * @param mtu MTU to apply
* @return new message to transmit
*/
static struct PendingMessage *
-fragment_message (struct PendingMessage *pm, uint16_t mtu)
+fragment_message (struct Queue *queue,
+ struct DistanceVectorHop *dvh,
+ struct PendingMessage *pm)
{
+ struct PendingAcknowledgement *pa;
struct PendingMessage *ff;
+ uint16_t mtu;
+ pa = prepare_pending_acknowledgement (queue, dvh, pm);
+ mtu = (0 == queue->mtu)
+ ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
+ : queue->mtu;
set_pending_message_uuid (pm);
/* This invariant is established in #handle_add_queue_message() */
GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage));
/* select fragment for transmission, descending the tree if it has
- been expanded until we are at a leaf or at a fragment that is small enough
+ been expanded until we are at a leaf or at a fragment that is small
+ enough
*/
ff = pm;
while (((ff->bytes_msg > mtu) || (pm == ff)) &&
tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
tfb.header.size =
htons (sizeof (struct TransportFragmentBoxMessage) + fragsize);
- tfb.frag_uuid.uuid = htonl (pm->frag_uuidgen++);
+ tfb.ack_uuid = pa->ack_uuid;
tfb.msg_uuid = pm->msg_uuid;
tfb.frag_off = htons (ff->frag_off + xoff);
tfb.msg_size = htons (pm->bytes_msg);
* @a pm). If the @a pm is already fragmented or reliability boxed,
* or itself an ACK, this function simply returns @a pm.
*
+ * @param queue which queue to prepare transmission for
+ * @param dvh path the message will take, or NULL
* @param pm pending message to box for transmission over unreliabile queue
* @return new message to transmit
*/
static struct PendingMessage *
-reliability_box_message (struct PendingMessage *pm)
+reliability_box_message (struct Queue *queue,
+ struct DistanceVectorHop *dvh,
+ struct PendingMessage *pm)
{
struct TransportReliabilityBoxMessage rbox;
+ struct PendingAcknowledgement *pa;
struct PendingMessage *bpm;
char *msg;
client_send_response (pm, GNUNET_NO, 0);
return NULL;
}
+ pa = prepare_pending_acknowledgement (queue, dvh, pm);
+
bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) +
pm->bytes_msg);
bpm->target = pm->target;
rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
- rbox.msg_uuid = pm->msg_uuid;
+
+ rbox.ack_uuid = pa->ack_uuid;
msg = (char *) &bpm[1];
memcpy (msg, &rbox, sizeof (rbox));
memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg);
(NULL != pm->head_frag /* fragments already exist, should
respect that even if MTU is 0 for
this queue */) )
- s = fragment_message (s,
- (0 == queue->mtu)
- ? UINT16_MAX -
- sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
- : queue->mtu);
+ s = fragment_message (queue, NULL /*FIXME! */, s);
if (NULL == s)
{
/* Fragmentation failed, try next message... */
return;
}
if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
- s = reliability_box_message (s);
+ s = reliability_box_message (queue, NULL /* FIXME! */, s);
if (NULL == s)
{
/* Reliability boxing failed, try next message... */
/**
- * Communicator tells us that our request to create a queue failed. This usually
- * indicates that the provided address is simply invalid or that the
+ * Communicator tells us that our request to create a queue failed. This
+ * usually indicates that the provided address is simply invalid or that the
* communicator's resources are exhausted.
*
* @param cls the `struct TransportClient`
(void) cls;
// OPTIMIZE-FIXME: checking that we know this address already should
// be done BEFORE checking the signature => HELLO API change!
- // OPTIMIZE-FIXME: pre-check: rate-limit signature verification / validation?!
+ // OPTIMIZE-FIXME: pre-check: rate-limit signature verification /
+ // validation?!
address =
GNUNET_HELLO_extract_address (&hdr[1],
ntohs (hdr->header.size) - sizeof (*hdr),
}
+/**
+ * Free pending acknowledgement.
+ *
+ * @param cls NULL
+ * @param key unused
+ * @param value a `struct PendingAcknowledgement`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_pending_ack_cb (void *cls,
+ const struct GNUNET_ShortHashCode *key,
+ void *value)
+{
+ struct PendingAcknowledgement *pa = value;
+
+ (void) cls;
+ (void) key;
+ free_pending_acknowledgement (pa);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Free acknowledgement cummulator.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct AcknowledgementCummulator`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_ack_cummulator_cb (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct AcknowledgementCummulator *ac = value;
+
+ (void) cls;
+ (void) pid;
+ GNUNET_free (ac);
+ return GNUNET_OK;
+}
+
+
/**
* Function called when the service shuts down. Unloads our plugins
* and cancels pending validations.
GNUNET_free (GST_my_private_key);
GST_my_private_key = NULL;
}
+ GNUNET_CONTAINER_multipeermap_iterate (ack_cummulators,
+ &free_ack_cummulator_cb,
+ NULL);
+ GNUNET_CONTAINER_multipeermap_destroy (ack_cummulators);
+ ack_cummulators = NULL;
+ GNUNET_CONTAINER_multishortmap_iterate (pending_acks,
+ &free_pending_ack_cb,
+ NULL);
+ GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
+ pending_acks = NULL;
GNUNET_CONTAINER_multipeermap_destroy (neighbours);
neighbours = NULL;
GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
/* setup globals */
GST_cfg = c;
backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+ pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
+ ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);