From f1834a5f1c8c2b06f5c140c4aaefe27e474d16d6 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 22 Apr 2019 16:51:36 +0200 Subject: [PATCH] massive refactoring to intruce data structure --- src/include/gnunet_protocols.h | 7 +- src/transport/gnunet-service-tng.c | 1162 +++++++++++++++++----------- 2 files changed, 730 insertions(+), 439 deletions(-) diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index e402460c0..27a7034b0 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2121,7 +2121,7 @@ extern "C" { /** M<->S<->C: PSYC message which contains a header and one or more message * parts. */ -#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_HEADER \ +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_HEADER \ 692 // FIXME: start using this where appropriate /** Message part: method */ @@ -3107,11 +3107,6 @@ extern "C" { */ #define GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT 1214 -/** - * Acknowledgement generated for a fragment. - */ -#define GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK 1215 - /** * Wrapper around non-fragmented CORE message used to measure RTT * and ensure reliability. diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index a35357d9b..5c51ed59a 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -117,6 +117,11 @@ #include "gnunet_signatures.h" #include "transport.h" +/** + * Maximum number of messages we acknowledge together in one + * cummulative ACK. Larger values may save a bit of bandwidth. + */ +#define MAX_CUMMULATIVE_ACKS 64 /** * What is the size we assume for a read operation in the @@ -211,6 +216,14 @@ #define MAX_VALIDATION_CHALLENGE_FREQ \ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1) +/** + * How long until we forget about historic accumulators and thus + * reset the ACK counter? Should exceed the maximum time an + * active connection experiences without an ACK. + */ +#define ACK_CUMMULATOR_TIMEOUT \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4) + /** * What is the non-randomized base frequency at which we * would initiate DV learn messages? @@ -278,13 +291,19 @@ struct MessageUUIDP * Unique value, generated by incrementing the * `message_uuid_ctr` of `struct Neighbour`. */ - uint32_t uuid GNUNET_PACKED; + uint64_t uuid GNUNET_PACKED; +}; + +/** + * Unique identifier to map an acknowledgement to a transmission. + */ +struct AcknowledgementUUIDP +{ /** - * UUID of the queue that was used to transmit this message. - * Used to map acknowledgements back to the respective queue. + * The UUID value. Not actually a hash, but a random value. */ - uint32_t queue_uuid GNUNET_PACKED; + struct GNUNET_ShortHashCode value; }; @@ -474,7 +493,27 @@ struct TransportReliabilityBoxMessage * messages sent over possibly unreliable channels. Should * be a random. */ - struct MessageUUIDP msg_uuid; + struct AcknowledgementUUIDP ack_uuid; +}; + + +/** + * Acknowledgement payload. + */ +struct TransportCummulativeAckPayloadP +{ + /** + * How long was the ACK delayed for generating cummulative ACKs? + * Used to calculate the correct network RTT by taking the receipt + * time of the ack minus the transmission time of the sender minus + * this value. + */ + struct GNUNET_TIME_RelativeNBO ack_delay; + + /** + * UUID of a message being acknowledged. + */ + struct AcknowledgementUUIDP ack_uuid; }; @@ -493,19 +532,12 @@ struct TransportReliabilityAckMessage struct GNUNET_MessageHeader header; /** - * Reserved. Zero. + * Counter of ACKs transmitted by the sender to us. Incremented + * by one for each ACK, used to detect how many ACKs were lost. */ - uint32_t reserved GNUNET_PACKED; + uint32_t ack_counter GNUNET_PACKED; - /** - * How long was the ACK delayed relative to the average time of - * receipt of the messages being acknowledged? Used to calculate - * the average RTT by taking the receipt time of the ack minus the - * average transmission time of the sender minus this value. - */ - struct GNUNET_TIME_RelativeNBO avg_ack_delay; - - /* followed by any number of `struct MessageUUIDP` + /* followed by any number of `struct TransportCummulativeAckPayloadP` messages providing ACKs */ }; @@ -523,16 +555,15 @@ struct TransportFragmentBoxMessage /** * Unique ID of this fragment (and fragment transmission!). Will * change even if a fragement is retransmitted to make each - * transmission attempt unique! Should be incremented by one for - * each fragment transmission. If a client receives a duplicate - * fragment (same @e frag_off), it must send - * #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK immediately. + * transmission attempt unique! If a client receives a duplicate + * fragment (same @e frag_off for same @a msg_uuid, it must send + * #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK immediately. */ - struct FragmentUUIDP frag_uuid; + struct AcknowledgementUUIDP ack_uuid; /** - * Original message ID for of the message that all the - * fragments belong to. Must be the same for all fragments. + * Original message ID for of the message that all the fragments + * belong to. Must be the same for all fragments. */ struct MessageUUIDP msg_uuid; @@ -548,54 +579,6 @@ struct TransportFragmentBoxMessage }; -/** - * Outer layer of an fragmented application message sent over a queue - * with finite MTU. When a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT is - * received, the receiver has two RTTs or 64 further fragments with - * the same basic message time to send an acknowledgement, possibly - * acknowledging up to 65 fragments in one ACK. ACKs must also be - * sent immediately once all fragments were sent. - */ -struct TransportFragmentAckMessage -{ - /** - * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK - */ - struct GNUNET_MessageHeader header; - - /** - * Unique ID of the lowest fragment UUID being acknowledged. - */ - struct FragmentUUIDP frag_uuid; - - /** - * Bitfield of up to 64 additional fragments following the - * @e msg_uuid being acknowledged by this message. - */ - uint64_t extra_acks GNUNET_PACKED; - - /** - * Original message ID for of the message that all the - * fragments belong to. - */ - struct MessageUUIDP msg_uuid; - - /** - * How long was the ACK delayed relative to the average time of - * receipt of the fragments being acknowledged? Used to calculate - * the average RTT by taking the receipt time of the ack minus the - * average transmission time of the sender minus this value. - */ - struct GNUNET_TIME_RelativeNBO avg_ack_delay; - - /** - * How long until the receiver will stop trying reassembly - * of this message? - */ - struct GNUNET_TIME_RelativeNBO reassembly_timeout; -}; - - /** * Content signed by the initator during DV learning. * @@ -1025,19 +1008,129 @@ struct EphemeralCacheEntry */ struct TransportClient; - /** * A neighbour that at least one communicator is connected to. */ struct Neighbour; - /** * Entry in our #dv_routes table, representing a (set of) distance * vector routes to a particular peer. */ struct DistanceVector; +/** + * A queue is a message queue provided by a communicator + * via which we can reach a particular neighbour. + */ +struct Queue; + +/** + * Message awaiting transmission. See detailed comments below. + */ +struct PendingMessage; + +/** + * One possible hop towards a DV target. + */ +struct DistanceVectorHop; + + +/** + * Data structure kept when we are waiting for an acknowledgement. + */ +struct PendingAcknowledgement +{ + + /** + * If @e pm is non-NULL, this is the DLL in which this acknowledgement + * is kept in relation to its pending message. + */ + struct PendingAcknowledgement *next_pm; + + /** + * If @e pm is non-NULL, this is the DLL in which this acknowledgement + * is kept in relation to its pending message. + */ + struct PendingAcknowledgement *prev_pm; + + /** + * If @e queue is non-NULL, this is the DLL in which this acknowledgement + * is kept in relation to the queue that was used to transmit the + * @a pm. + */ + struct PendingAcknowledgement *next_queue; + + /** + * If @e queue is non-NULL, this is the DLL in which this acknowledgement + * is kept in relation to the queue that was used to transmit the + * @a pm. + */ + struct PendingAcknowledgement *prev_queue; + + /** + * If @e dvh is non-NULL, this is the DLL in which this acknowledgement + * is kept in relation to the DVH that was used to transmit the + * @a pm. + */ + struct PendingAcknowledgement *next_dvh; + + /** + * If @e dvh is non-NULL, this is the DLL in which this acknowledgement + * is kept in relation to the DVH that was used to transmit the + * @a pm. + */ + struct PendingAcknowledgement *prev_dvh; + + /** + * Pointers for the DLL of all pending acknowledgements. + * This list is sorted by @e transmission time. If the list gets too + * long, the oldest entries are discarded. + */ + struct PendingAcknowledgement *next_pa; + + /** + * Pointers for the DLL of all pending acknowledgements. + * This list is sorted by @e transmission time. If the list gets too + * long, the oldest entries are discarded. + */ + struct PendingAcknowledgement *prev_pa; + + /** + * Unique identifier for this transmission operation. + */ + struct AcknowledgementUUIDP ack_uuid; + + /** + * Message that was transmitted, may be NULL if the message was ACKed + * via another channel. + */ + struct PendingMessage *pm; + + /** + * Distance vector path chosen for this transmission, NULL if transmission + * was to a direct neighbour OR if the path was forgotten in the meantime. + */ + struct DistanceVectorHop *dvh; + + /** + * Queue used for transmission, NULL if the queue has been destroyed + * (which may happen before we get an acknowledgement). + */ + struct Queue *queue; + + /** + * Time of the transmission, for RTT calculation. + */ + struct GNUNET_TIME_Absolute transmission_time; + + /** + * Number of bytes of the original message (to calculate bandwidth). + */ + uint16_t message_size; +}; + + /** * One possible hop towards a DV target. */ @@ -1064,6 +1157,16 @@ struct DistanceVectorHop */ struct DistanceVectorHop *prev_neighbour; + /** + * Head of DLL of PAs that used our @a path. + */ + struct PendingAcknowledgement *pa_head; + + /** + * Tail of DLL of PAs that used our @a path. + */ + struct PendingAcknowledgement *pa_tail; + /** * What would be the next hop to @e target? */ @@ -1161,17 +1264,6 @@ struct DistanceVector }; -/** - * A queue is a message queue provided by a communicator - * via which we can reach a particular neighbour. - */ -struct Queue; - -/** - * Message awaiting transmission. See detailed comments below. - */ -struct PendingMessage; - /** * Entry identifying transmission in one of our `struct * Queue` which still awaits an ACK. This is used to @@ -1237,6 +1329,16 @@ struct Queue */ struct Queue *next_client; + /** + * Head of DLL of PAs that used this queue. + */ + struct PendingAcknowledgement *pa_head; + + /** + * Tail of DLL of PAs that used this queue. + */ + struct PendingAcknowledgement *pa_tail; + /** * Head of DLL of unacked transmission requests. */ @@ -1299,14 +1401,6 @@ struct Queue */ uint32_t qid; - /** - * UUID used to map acknowledgements back to the queue that - * was used for transmission. Note that @e queue_uuid-s are - * only unique per neighbour (generated via `queue_uuid_gen` - * of `struct Neighbour`). - */ - uint32_t queue_uuid; - /** * Maximum transmission unit supported by this queue. */ @@ -1356,8 +1450,8 @@ struct ReassemblyContext { /** - * Original message ID for of the message that all the - * fragments belong to. + * Original message ID for of the message that all the fragments + * belong to. */ struct MessageUUIDP msg_uuid; @@ -1392,36 +1486,12 @@ struct ReassemblyContext */ struct GNUNET_TIME_Absolute reassembly_timeout; - /** - * Average delay of all acks in @e extra_acks and @e frag_uuid. - * Should be reset to zero when @e num_acks is set to 0. - */ - struct GNUNET_TIME_Relative avg_ack_delay; - /** * Time we received the last fragment. @e avg_ack_delay must be * incremented by now - @e last_frag multiplied by @e num_acks. */ struct GNUNET_TIME_Absolute last_frag; - /** - * Bitfield of up to 64 additional fragments following @e frag_uuid - * to be acknowledged in the next cummulative ACK. - */ - uint64_t extra_acks; - - /** - * Unique ID of the lowest fragment UUID to be acknowledged in the - * next cummulative ACK. Only valid if @e num_acks > 0. - */ - uint32_t frag_uuid; - - /** - * Number of ACKs we have accumulated so far. Reset to 0 - * whenever we send a #GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK. - */ - unsigned int num_acks; - /** * How big is the message we are reassembling in total? */ @@ -1526,26 +1596,6 @@ struct Neighbour */ struct GNUNET_TIME_Absolute earliest_timeout; - /** - * Incremented by one for each queue to generate - * unique queue identifiers. Initially set to a random value. - * - * FIXME: Deal with wrap around (might be triggered by very - * persistent adversary). - */ - uint32_t queue_uuid_gen; - - /** - * Incremented by one for each message sent to this neighbour, to - * uniquely identify that message in replies (note that fragments - * use another additional counter). Initially set to a random value. - * - * It should be safe to assume that by the time this value may wrap - * around, the original message is long "gone" and no longer - * relevant. - */ - uint32_t message_uuid_ctr; - /** * Do we have a confirmed working queue and are thus visible to * CORE? @@ -1681,6 +1731,16 @@ struct PendingMessage */ struct PendingMessage *prev_frag; + /** + * Head of DLL of PAs for this pending message. + */ + struct PendingAcknowledgement *pa_head; + + /** + * Tail of DLL of PAs for this pending message. + */ + struct PendingAcknowledgement *pa_tail; + /** * This message, reliability boxed. Only possibly available if @e pmt is * #PMT_CORE. @@ -1738,11 +1798,6 @@ struct PendingMessage */ struct MessageUUIDP msg_uuid; - /** - * Counter incremented per generated fragment. - */ - uint32_t frag_uuidgen; - /** * Type of the pending message. */ @@ -1767,6 +1822,66 @@ struct PendingMessage }; +/** + * Acknowledgement payload. + */ +struct TransportCummulativeAckPayload +{ + /** + * When did we receive the message we are ACKing? Used to calculate + * the delay we introduced by cummulating ACKs. + */ + struct GNUNET_TIME_Absolute receive_time; + + /** + * UUID of a message being acknowledged. + */ + struct AcknowledgementUUIDP ack_uuid; +}; + + +/** + * Data structure in which we track acknowledgements still to + * be sent to the + */ +struct AcknowledgementCummulator +{ + /** + * Target peer for which we are accumulating ACKs here. + */ + struct GNUNET_PeerIdentity target; + + /** + * ACK data being accumulated. Only @e num_acks slots are valid. + */ + struct TransportCummulativeAckPayload ack_uuids[MAX_CUMMULATIVE_ACKS]; + + /** + * Task scheduled either to transmit the cummulative ACK message, + * or to clean up this data structure after extended periods of + * inactivity (if @e num_acks is zero). + */ + struct GNUNET_SCHEDULER_Task *task; + + /** + * When is @e task run (only used if @e num_acks is non-zero)? + */ + struct GNUNET_TIME_Absolute min_transmission_time; + + /** + * Counter to produce the `ack_counter` in the `struct + * TransportReliabilityAckMessage`. Allows the receiver to detect + * lost ACK messages. Incremented by @e num_acks upon transmission. + */ + uint32_t ack_counter; + + /** + * Number of entries used in @e ack_uuids. Reset to 0 upon transmission. + */ + unsigned int num_acks; +}; + + /** * One of the addresses of this peer. */ @@ -2164,6 +2279,18 @@ static struct GNUNET_CONTAINER_MultiPeerMap *neighbours; */ static struct GNUNET_CONTAINER_MultiPeerMap *backtalkers; +/** + * Map from PIDs to `struct AcknowledgementCummulator`s. + * Here we track the cummulative ACKs for transmission. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *ack_cummulators; + +/** + * Map of pending acknowledgements, mapping `struct AcknowledgementUUID` to + * a `struct PendingAcknowledgement`. + */ +static struct GNUNET_CONTAINER_MultiShortmap *pending_acks; + /** * Map from PIDs to `struct DistanceVector` entries describing * known paths to the peer. @@ -2233,6 +2360,61 @@ static struct GNUNET_SCHEDULER_Task *dvlearn_task; */ static struct GNUNET_SCHEDULER_Task *validation_task; +/** + * The most recent PA we have created, head of DLL. + * The length of the DLL is kept in #pa_count. + */ +static struct PendingAcknowledgement *pa_head; + +/** + * The oldest PA we have created, tail of DLL. + * The length of the DLL is kept in #pa_count. + */ +static struct PendingAcknowledgement *pa_tail; + +/** + * Number of entries in the #pa_head/#pa_tail DLL. Used to + * limit the size of the data structure. + */ +static unsigned int pa_count; + + +/** + * Release @a pa data structure. + * + * @param pa data structure to release + */ +static void +free_pending_acknowledgement (struct PendingAcknowledgement *pa) +{ + struct Queue *q = pa->queue; + struct PendingMessage *pm = pa->pm; + struct DistanceVectorHop *dvh = pa->dvh; + + GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa); + pa_count--; + if (NULL != q) + { + GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa); + pa->queue = NULL; + } + if (NULL != pm) + { + GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa); + pa->pm = NULL; + } + if (NULL != dvh) + { + GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa); + pa->queue = NULL; + } + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multishortmap_remove (pending_acks, + &pa->ack_uuid.value, + pa)); + GNUNET_free (pa); +} + /** * Free cached ephemeral key. @@ -2329,7 +2511,13 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh) { struct Neighbour *n = dvh->next_hop; struct DistanceVector *dv = dvh->dv; + struct PendingAcknowledgement *pa; + while (NULL != (pa = dvh->pa_head)) + { + GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa); + pa->dvh = NULL; + } GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh); GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh); GNUNET_free (dvh); @@ -2733,6 +2921,7 @@ free_queue (struct Queue *queue) .rtt = GNUNET_TIME_UNIT_FOREVER_REL}; struct QueueEntry *qe; int maxxed; + struct PendingAcknowledgement *pa; if (NULL != queue->transmit_task) { @@ -2744,6 +2933,12 @@ free_queue (struct Queue *queue) GNUNET_SCHEDULER_cancel (queue->visibility_task); queue->visibility_task = NULL; } + while (NULL != (pa = queue->pa_head)) + { + GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa); + pa->queue = NULL; + } + GNUNET_CONTAINER_MDLL_remove (neighbour, neighbour->queue_head, neighbour->queue_tail, @@ -3006,6 +3201,9 @@ check_client_send (void *cls, const struct OutboundMessage *obm) /** * Free fragment tree below @e root, excluding @e root itself. + * FIXME: this does NOT seem to have the intended semantics + * based on how this is called. Seems we generally DO expect + * @a root to be free'ed itself as well! * * @param root root of the tree to free */ @@ -3016,7 +3214,14 @@ free_fragment_tree (struct PendingMessage *root) while (NULL != (frag = root->head_frag)) { + struct PendingAcknowledgement *pa; + free_fragment_tree (frag); + while (NULL != (pa = frag->pa_head)) + { + GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa); + pa->pm = NULL; + } GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag); GNUNET_free (frag); } @@ -3035,6 +3240,7 @@ free_pending_message (struct PendingMessage *pm) { struct TransportClient *tc = pm->client; struct Neighbour *target = pm->target; + struct PendingAcknowledgement *pa; if (NULL != tc) { @@ -3047,6 +3253,12 @@ free_pending_message (struct PendingMessage *pm) target->pending_msg_head, target->pending_msg_tail, pm); + while (NULL != (pa = pm->pa_head)) + { + GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa); + pa->pm = NULL; + } + free_fragment_tree (pm); if (NULL != pm->qe) { @@ -4260,32 +4472,108 @@ check_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) /** - * Generate a fragment acknowledgement for an @a rc. + * Clean up an idle cummulative acknowledgement data structure. * - * @param rc context to generate ACK for, @a rc ACK state is reset + * @param cls a `struct AcknowledgementCummulator *` */ static void -send_fragment_ack (struct ReassemblyContext *rc) +destroy_ack_cummulator (void *cls) { - struct TransportFragmentAckMessage *ack; + struct AcknowledgementCummulator *ac = cls; - ack = GNUNET_new (struct TransportFragmentAckMessage); - ack->header.size = htons (sizeof (struct TransportFragmentAckMessage)); - ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK); - ack->frag_uuid.uuid = htonl (rc->frag_uuid); - ack->extra_acks = GNUNET_htonll (rc->extra_acks); - ack->msg_uuid = rc->msg_uuid; - ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay); - if (0 == rc->msg_missing) - ack->reassembly_timeout = GNUNET_TIME_relative_hton ( - GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */ + ac->task = NULL; + GNUNET_assert (0 == ac->num_acks); + GNUNET_assert ( + GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (ack_cummulators, &ac->target, ac)); + GNUNET_free (ac); +} + + +/** + * Do the transmission of a cummulative acknowledgement now. + * + * @param cls a `struct AcknowledgementCummulator *` + */ +static void +transmit_cummulative_ack_cb (void *cls) +{ + struct AcknowledgementCummulator *ac = cls; + struct TransportReliabilityAckMessage *ack; + struct TransportCummulativeAckPayloadP *ap; + + ac->task = NULL; + GNUNET_assert (0 < ac->ack_counter); + ack = GNUNET_malloc (sizeof (*ack) + + ac->ack_counter * + sizeof (struct TransportCummulativeAckPayloadP)); + ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK); + ack->header.size = + htons (sizeof (*ack) + + ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP)); + ack->ack_counter = htonl (ac->ack_counter++); + ap = (struct TransportCummulativeAckPayloadP *) &ack[1]; + for (unsigned int i = 0; i < ac->ack_counter; i++) + { + ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid; + ap[i].ack_delay = GNUNET_TIME_relative_hton ( + GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time)); + } + route_message (&ac->target, &ack->header, RMO_DV_ALLOWED); + ac->num_acks = 0; + ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT, + &destroy_ack_cummulator, + ac); +} + + +/** + * Transmit an acknowledgement for @a ack_uuid to @a pid delaying + * transmission by at most @a ack_delay. + * + * @param pid target peer + * @param ack_uuid UUID to ack + * @param max_delay how long can the ACK wait + */ +static void +cummulative_ack (const struct GNUNET_PeerIdentity *pid, + const struct AcknowledgementUUIDP *ack_uuid, + struct GNUNET_TIME_Absolute max_delay) +{ + struct AcknowledgementCummulator *ac; + + ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid); + if (NULL == ac) + { + ac = GNUNET_new (struct AcknowledgementCummulator); + ac->target = *pid; + ac->min_transmission_time = max_delay; + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + ack_cummulators, + &ac->target, + ac, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + } else - ack->reassembly_timeout = GNUNET_TIME_relative_hton ( - GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout)); - route_message (&rc->neighbour->pid, &ack->header, RMO_DV_ALLOWED); - rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO; - rc->num_acks = 0; - rc->extra_acks = 0LLU; + { + if (MAX_CUMMULATIVE_ACKS == ac->num_acks) + { + /* must run immediately, ack buffer full! */ + GNUNET_SCHEDULER_cancel (ac->task); + transmit_cummulative_ack_cb (ac); + } + GNUNET_SCHEDULER_cancel (ac->task); + ac->min_transmission_time = + GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay); + } + GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS); + ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get (); + ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid; + ac->num_acks++; + ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time, + &transmit_cummulative_ack_cb, + ac); } @@ -4348,10 +4636,8 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) uint16_t msize; uint16_t fsize; uint16_t frag_off; - uint32_t frag_uuid; char *target; struct GNUNET_TIME_Relative cdelay; - int ack_now; struct FindByMessageUuidContext fc; n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender); @@ -4417,6 +4703,12 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) /* reassemble */ fsize = ntohs (fb->header.size) - sizeof (*fb); + if (0 == fsize) + { + GNUNET_break (0); + finish_cmc_handling (cmc); + return; + } frag_off = ntohs (fb->frag_off); memcpy (&target[frag_off], &fb[1], fsize); /* update bitfield and msg_missing */ @@ -4430,60 +4722,17 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) } /* Compute cummulative ACK */ - frag_uuid = ntohl (fb->frag_uuid.uuid); cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag); - cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->num_acks); + cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize); + if (0 == rc->msg_missing) + cdelay = GNUNET_TIME_UNIT_ZERO; + cummulative_ack (&cmc->im.sender, + &fb->ack_uuid, + GNUNET_TIME_relative_to_absolute (cdelay)); rc->last_frag = GNUNET_TIME_absolute_get (); - rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, cdelay); - ack_now = GNUNET_NO; - if (0 == rc->num_acks) - { - /* case one: first ack */ - rc->frag_uuid = frag_uuid; - rc->extra_acks = 0LLU; - rc->num_acks = 1; - } - else if ((frag_uuid >= rc->frag_uuid) && (frag_uuid <= rc->frag_uuid + 64)) - { - /* case two: ack fits after existing min UUID */ - if ((frag_uuid == rc->frag_uuid) || - (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1))))) - { - /* duplicate fragment, ack now! */ - ack_now = GNUNET_YES; - } - else - { - rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1)); - rc->num_acks++; - } - } - else if ((rc->frag_uuid > frag_uuid) && - (((rc->frag_uuid == frag_uuid + 64) && (0 == rc->extra_acks)) || - ((rc->frag_uuid < frag_uuid + 64) && - (rc->extra_acks == - (rc->extra_acks & - ~((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU)))))) - { - /* can fit ack by shifting extra acks and starting at - frag_uid, test above esured that the bits we will - shift 'extra_acks' by are all zero. */ - rc->extra_acks <<= (rc->frag_uuid - frag_uuid); - rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1)); - rc->frag_uuid = frag_uuid; - rc->num_acks++; - } - if (65 == rc->num_acks) /* OPTIMIZE-FIXME: maybe use smaller threshold? This - is very aggressive. */ - ack_now = GNUNET_YES; /* maximum acks received */ - // FIXME: possibly also ACK based on RTT (but for that we'd need to - // determine the queue used for the ACK first!) - /* is reassembly complete? */ if (0 != rc->msg_missing) { - if (ack_now) - send_fragment_ack (rc); finish_cmc_handling (cmc); return; } @@ -4497,7 +4746,6 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) return; } /* successful reassembly */ - send_fragment_ack (rc); demultiplex_with_cmc (cmc, msg); /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still en-route and we forget that we finished this reassembly immediately! @@ -4508,172 +4756,149 @@ handle_fragment_box (void *cls, const struct TransportFragmentBoxMessage *fb) /** - * Check the @a fa against the fragments associated with @a pm. - * If it matches, remove the matching fragments from the transmission - * list. + * Communicator gave us a reliability box. Check the message. * - * @param pm pending message to check against the ack - * @param fa the ack that was received - * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not + * @param cls a `struct CommunicatorMessageContext` + * @param rb the send message that was sent + * @return #GNUNET_YES if message is well-formed */ static int -check_ack_against_pm (struct PendingMessage *pm, - const struct TransportFragmentAckMessage *fa) -{ - int match; - struct PendingMessage *nxt; - uint32_t fs = ntohl (fa->frag_uuid.uuid); - uint64_t xtra = GNUNET_ntohll (fa->extra_acks); - - match = GNUNET_NO; - for (struct PendingMessage *frag = pm->head_frag; NULL != frag; frag = nxt) - { - const struct TransportFragmentBoxMessage *tfb = - (const struct TransportFragmentBoxMessage *) &pm[1]; - uint32_t fu = ntohl (tfb->frag_uuid.uuid); - - GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt); - nxt = frag->next_frag; - /* Check for exact match or match in the 'xtra' bitmask */ - if ((fu == fs) || - ((fu > fs) && (fu <= fs + 64) && (0 != (1LLU << (fu - fs - 1) & xtra)))) - { - match = GNUNET_YES; - free_fragment_tree (frag); - } - } - return match; +check_reliability_box (void *cls, + const struct TransportReliabilityBoxMessage *rb) +{ + GNUNET_MQ_check_boxed_message (rb); + return GNUNET_YES; } /** - * Communicator gave us a fragment acknowledgement. Process the request. + * Communicator gave us a reliability box. Process the request. * * @param cls a `struct CommunicatorMessageContext` (must call * #finish_cmc_handling() when done) - * @param fa the message that was received + * @param rb the message that was received */ static void -handle_fragment_ack (void *cls, const struct TransportFragmentAckMessage *fa) +handle_reliability_box (void *cls, + const struct TransportReliabilityBoxMessage *rb) { struct CommunicatorMessageContext *cmc = cls; - struct Neighbour *n; - int matched; + const struct GNUNET_MessageHeader *inbox = + (const struct GNUNET_MessageHeader *) &rb[1]; - n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender); - if (NULL == n) - { - struct GNUNET_SERVICE_Client *client = cmc->tc->client; + // FIXME: call cummulative_ack(), have ack_countdown influence max_delay! + (void) (0 == ntohl (rb->ack_countdown)); + /* continue with inner message */ + demultiplex_with_cmc (cmc, inbox); +} - GNUNET_break (0); - finish_cmc_handling (cmc); - GNUNET_SERVICE_client_drop (client); - return; - } - /* FIXME-OPTIMIZE: maybe use another hash map here? */ - matched = GNUNET_NO; - for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm; - pm = pm->prev_neighbour) - { - if (0 != GNUNET_memcmp (&fa->msg_uuid, &pm->msg_uuid)) - continue; - matched = GNUNET_YES; - if (GNUNET_YES == check_ack_against_pm (pm, fa)) - { - struct GNUNET_TIME_Relative avg_ack_delay = - GNUNET_TIME_relative_ntoh (fa->avg_ack_delay); - // FIXME: update RTT and other reliability data! - // ISSUE: we don't know which of n's queues the message(s) - // took (and in fact the different messages might have gone - // over different queues and possibly over multiple). - // => track queues with PendingMessages, and update RTT only if - // the queue used is unique? - // -> how can we get loss rates? - // -> or, add extra state to Box and ACK to identify queue? - // IDEA: generate MULTIPLE frag-uuids per fragment and track - // the queue with the fragment! (-> this logic must - // be moved into check_ack_against_pm!) - (void) avg_ack_delay; - } - else - { - GNUNET_STATISTICS_update (GST_stats, - "# FRAGMENT_ACKS dropped, no matching fragment", - 1, - GNUNET_NO); - } - if (NULL == pm->head_frag) - { - // if entire message is ACKed, handle that as well. - // => clean up PM, any post actions? - free_pending_message (pm); - } - else - { - struct GNUNET_TIME_Relative reassembly_timeout = - GNUNET_TIME_relative_ntoh (fa->reassembly_timeout); - // OPTIMIZE-FIXME: adjust retransmission strategy based on - // reassembly_timeout! - (void) reassembly_timeout; - } - break; - } - if (GNUNET_NO == matched) - { - GNUNET_STATISTICS_update (GST_stats, - "# FRAGMENT_ACKS dropped, no matching pending message", - 1, - GNUNET_NO); - } - finish_cmc_handling (cmc); + +/** + * We have successfully transmitted data via @a q, update metrics. + * + * @param q queue to update + * @param rtt round trip time observed + * @param bytes_transmitted_ok number of bytes successfully transmitted + */ +static void +update_queue_performance (struct Queue *q, + struct GNUNET_TIME_Relative rtt, + uint16_t bytes_transmitted_ok) +{ + // FIXME: implement! } /** - * Communicator gave us a reliability box. Check the message. + * We have successfully transmitted data via @a dvh, update metrics. * - * @param cls a `struct CommunicatorMessageContext` - * @param rb the send message that was sent - * @return #GNUNET_YES if message is well-formed + * @param dvh distance vector path data to update + * @param rtt round trip time observed + * @param bytes_transmitted_ok number of bytes successfully transmitted */ -static int -check_reliability_box (void *cls, - const struct TransportReliabilityBoxMessage *rb) +static void +update_dvh_performance (struct DistanceVectorHop *dvh, + struct GNUNET_TIME_Relative rtt, + uint16_t bytes_transmitted_ok) { - GNUNET_MQ_check_boxed_message (rb); - return GNUNET_YES; + // FIXME: implement! } /** - * Communicator gave us a reliability box. Process the request. + * The @a pa was acknowledged, process the acknowledgement. * - * @param cls a `struct CommunicatorMessageContext` (must call - * #finish_cmc_handling() when done) - * @param rb the message that was received + * @param pa the pending acknowledgement that was satisfied + * @param ack_delay artificial delay from cummulative acks created by the other + * peer */ static void -handle_reliability_box (void *cls, - const struct TransportReliabilityBoxMessage *rb) +handle_acknowledged (struct PendingAcknowledgement *pa, + struct GNUNET_TIME_Relative ack_delay) { - struct CommunicatorMessageContext *cmc = cls; - const struct GNUNET_MessageHeader *inbox = - (const struct GNUNET_MessageHeader *) &rb[1]; + struct PendingMessage *pm = pa->pm; + struct GNUNET_TIME_Relative delay; - if (0 == ntohl (rb->ack_countdown)) - { - struct TransportReliabilityAckMessage *ack; + delay = GNUNET_TIME_absolute_get_duration (pa->transmission_time); + if (delay.rel_value_us > ack_delay.rel_value_us) + delay = GNUNET_TIME_UNIT_ZERO; + else + delay = GNUNET_TIME_relative_subtract (delay, ack_delay); + if (NULL != pa->queue) + update_queue_performance (pa->queue, delay, pa->message_size); + if (NULL != pa->dvh) + update_dvh_performance (pa->dvh, delay, pa->message_size); + if (NULL != pm) + { + if (NULL != pm->frag_parent) + { + pm = pm->frag_parent; + free_fragment_tree (pa->pm); + } + while ((NULL != pm->frag_parent) && (NULL == pm->head_frag)) + { + struct PendingMessage *parent = pm->frag_parent; - /* FIXME-OPTIMIZE: implement cummulative ACKs and ack_countdown, - then setting the avg_ack_delay field below: */ - ack = GNUNET_malloc (sizeof (*ack) + sizeof (struct MessageUUIDP)); - ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK); - ack->header.size = htons (sizeof (*ack) + sizeof (struct MessageUUIDP)); - memcpy (&ack[1], &rb->msg_uuid, sizeof (struct MessageUUIDP)); - route_message (&cmc->im.sender, &ack->header, RMO_DV_ALLOWED); + free_fragment_tree (pm); + pm = parent; + } + if (NULL != pm->head_frag) + pm = NULL; /* we are done, otherwise free 'pm' below */ } - /* continue with inner message */ - demultiplex_with_cmc (cmc, inbox); + if (NULL != pm) + free_pending_message (pm); + free_pending_acknowledgement (pa); +} + + +/** + * Communicator gave us a reliability ack. Check it is well-formed. + * + * @param cls a `struct CommunicatorMessageContext` (unused) + * @param ra the message that was received + * @return #GNUNET_Ok if @a ra is well-formed + */ +static int +check_reliability_ack (void *cls, + const struct TransportReliabilityAckMessage *ra) +{ + unsigned int n_acks; + + (void) cls; + n_acks = (ntohs (ra->header.size) - sizeof (*ra)) / + sizeof (struct TransportCummulativeAckPayloadP); + if (0 == n_acks) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if ((ntohs (ra->header.size) - sizeof (*ra)) != + n_acks * sizeof (struct TransportCummulativeAckPayloadP)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } @@ -4689,70 +4914,33 @@ handle_reliability_ack (void *cls, const struct TransportReliabilityAckMessage *ra) { struct CommunicatorMessageContext *cmc = cls; - struct Neighbour *n; + const struct TransportCummulativeAckPayloadP *ack; + struct PendingAcknowledgement *pa; unsigned int n_acks; - const struct MessageUUIDP *msg_uuids; - struct PendingMessage *nxt; - int matched; + uint32_t ack_counter; - n = GNUNET_CONTAINER_multipeermap_get (neighbours, &cmc->im.sender); - if (NULL == n) + n_acks = (ntohs (ra->header.size) - sizeof (*ra)) / + sizeof (struct TransportCummulativeAckPayloadP); + ack = (const struct TransportCummulativeAckPayloadP *) &ra[1]; + for (unsigned int i = 0; i < n_acks; i++) { - struct GNUNET_SERVICE_Client *client = cmc->tc->client; - - GNUNET_break (0); - finish_cmc_handling (cmc); - GNUNET_SERVICE_client_drop (client); - return; - } - n_acks = - (ntohs (ra->header.size) - sizeof (*ra)) / sizeof (struct MessageUUIDP); - msg_uuids = (const struct MessageUUIDP *) &ra[1]; - - /* FIXME-OPTIMIZE: maybe use another hash map here? */ - matched = GNUNET_NO; - for (struct PendingMessage *pm = n->pending_msg_head; NULL != pm; pm = nxt) - { - int in_list; - - nxt = pm->next_neighbour; - in_list = GNUNET_NO; - for (unsigned int i = 0; i < n_acks; i++) + pa = + GNUNET_CONTAINER_multishortmap_get (pending_acks, &ack[i].ack_uuid.value); + if (NULL == pa) { - if (0 != GNUNET_memcmp (&msg_uuids[i], &pm->msg_uuid)) - continue; - in_list = GNUNET_YES; - break; - } - if (GNUNET_NO == in_list) + GNUNET_STATISTICS_update ( + GST_stats, + "# FRAGMENT_ACKS dropped, no matching pending message", + 1, + GNUNET_NO); continue; - - /* this pm was acked! */ - matched = GNUNET_YES; - free_pending_message (pm); - - { - struct GNUNET_TIME_Relative avg_ack_delay = - GNUNET_TIME_relative_ntoh (ra->avg_ack_delay); - // FIXME: update RTT and other reliability data! - // ISSUE: we don't know which of n's queues the message(s) - // took (and in fact the different messages might have gone - // over different queues and possibly over multiple). - // => track queues with PendingMessages, and update RTT only if - // the queue used is unique? - // -> how can we get loss rates? - // -> or, add extra state to MSG and ACKs to identify queue? - // -> if we do this, might just do the same for the avg_ack_delay! - (void) avg_ack_delay; } + handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay)); } - if (GNUNET_NO == matched) - { - GNUNET_STATISTICS_update (GST_stats, - "# FRAGMENT_ACKS dropped, no matching pending message", - 1, - GNUNET_NO); - } + + ack_counter = htonl (ra->ack_counter); + // FIXME: track ACK losses based on ack_counter somewhere! + // (DV and/or Neighbour?) finish_cmc_handling (cmc); } @@ -4968,7 +5156,8 @@ backtalker_monotime_cb (void *cls, 1, GNUNET_NO); b->monotonic_time = mt; - /* Setting body_size to 0 prevents call to #forward_backchannel_payload() */ + /* Setting body_size to 0 prevents call to #forward_backchannel_payload() + */ b->body_size = 0; return; } @@ -5268,13 +5457,13 @@ activate_core_visible_dv_path (struct DistanceVectorHop *hop) * non-first hop is in our neighbour list (returning #GNUNET_SYSERR). * * @param path the path we learned, path[0] should be us, - * and then path contains a valid path from us to `path[path_len-1]` - * path[1] should be a direct neighbour (we should check!) + * and then path contains a valid path from us to + * `path[path_len-1]` path[1] should be a direct neighbour (we should check!) * @param path_len number of entries on the @a path, at least three! * @param network_latency how long does the message take from us to * `path[path_len-1]`? set to "forever" if unknown - * @param path_valid_until how long is this path considered validated? Maybe be - * zero. + * @param path_valid_until how long is this path considered validated? Maybe + * be zero. * @return #GNUNET_YES on success, * #GNUNET_NO if we have better path(s) to the target * #GNUNET_SYSERR if the path is useless and/or invalid @@ -5603,7 +5792,8 @@ handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl) finish_cmc_handling (cmc); /* OPTIMIZE-FIXME: Technically, we only need to bother checking - the initiator signature if we send the message back to the initiator... */ + the initiator signature if we send the message back to the initiator... + */ if (GNUNET_OK != validate_dv_initiator_signature (&dvl->initiator, &dvl->challenge, &dvl->init_sig)) @@ -6349,18 +6539,14 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT, struct TransportFragmentBoxMessage, &cmc), - GNUNET_MQ_hd_fixed_size (fragment_ack, - GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK, - struct TransportFragmentAckMessage, - &cmc), GNUNET_MQ_hd_var_size (reliability_box, GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX, struct TransportReliabilityBoxMessage, &cmc), - GNUNET_MQ_hd_fixed_size (reliability_ack, - GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK, - struct TransportReliabilityAckMessage, - &cmc), + GNUNET_MQ_hd_var_size (reliability_ack, + GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK, + struct TransportReliabilityAckMessage, + &cmc), GNUNET_MQ_hd_var_size (backchannel_encapsulation, GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION, struct TransportBackchannelEncapsulationMessage, @@ -6460,28 +6646,77 @@ set_pending_message_uuid (struct PendingMessage *pm) } +/** + * Setup data structure waiting for acknowledgements. + * + * @param queue queue the @a pm will be sent over + * @param dvh path the message will take, may be NULL + * @param pm the pending message for transmission + * @return corresponding fresh pending acknowledgement + */ +static struct PendingAcknowledgement * +prepare_pending_acknowledgement (struct Queue *queue, + struct DistanceVectorHop *dvh, + struct PendingMessage *pm) +{ + struct PendingAcknowledgement *pa; + + pa = GNUNET_new (struct PendingAcknowledgement); + pa->queue = queue; + pa->dvh = dvh; + pa->pm = pm; + do + { + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, + &pa->ack_uuid, + sizeof (pa->ack_uuid)); + } while (GNUNET_YES != GNUNET_CONTAINER_multishortmap_put ( + pending_acks, + &pa->ack_uuid.value, + pa, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_MDLL_insert (queue, queue->pa_head, queue->pa_tail, pa); + GNUNET_CONTAINER_MDLL_insert (pm, pm->pa_head, pm->pa_tail, pa); + if (NULL != dvh) + GNUNET_CONTAINER_MDLL_insert (dvh, dvh->pa_head, dvh->pa_tail, pa); + pa->transmission_time = GNUNET_TIME_absolute_get (); + pa->message_size = pm->bytes_msg; + return pa; +} + + /** * Fragment the given @a pm to the given @a mtu. Adds * additional fragments to the neighbour as well. If the * @a mtu is too small, generates and error for the @a pm * and returns NULL. * + * @param queue which queue to fragment for + * @param dvh path the message will take, or NULL * @param pm pending message to fragment for transmission - * @param mtu MTU to apply * @return new message to transmit */ static struct PendingMessage * -fragment_message (struct PendingMessage *pm, uint16_t mtu) +fragment_message (struct Queue *queue, + struct DistanceVectorHop *dvh, + struct PendingMessage *pm) { + struct PendingAcknowledgement *pa; struct PendingMessage *ff; + uint16_t mtu; + pa = prepare_pending_acknowledgement (queue, dvh, pm); + mtu = (0 == queue->mtu) + ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo) + : queue->mtu; set_pending_message_uuid (pm); /* This invariant is established in #handle_add_queue_message() */ GNUNET_assert (mtu > sizeof (struct TransportFragmentBoxMessage)); /* select fragment for transmission, descending the tree if it has - been expanded until we are at a leaf or at a fragment that is small enough + been expanded until we are at a leaf or at a fragment that is small + enough */ ff = pm; while (((ff->bytes_msg > mtu) || (pm == ff)) && @@ -6527,7 +6762,7 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu) tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT); tfb.header.size = htons (sizeof (struct TransportFragmentBoxMessage) + fragsize); - tfb.frag_uuid.uuid = htonl (pm->frag_uuidgen++); + tfb.ack_uuid = pa->ack_uuid; tfb.msg_uuid = pm->msg_uuid; tfb.frag_off = htons (ff->frag_off + xoff); tfb.msg_size = htons (pm->bytes_msg); @@ -6558,13 +6793,18 @@ fragment_message (struct PendingMessage *pm, uint16_t mtu) * @a pm). If the @a pm is already fragmented or reliability boxed, * or itself an ACK, this function simply returns @a pm. * + * @param queue which queue to prepare transmission for + * @param dvh path the message will take, or NULL * @param pm pending message to box for transmission over unreliabile queue * @return new message to transmit */ static struct PendingMessage * -reliability_box_message (struct PendingMessage *pm) +reliability_box_message (struct Queue *queue, + struct DistanceVectorHop *dvh, + struct PendingMessage *pm) { struct TransportReliabilityBoxMessage rbox; + struct PendingAcknowledgement *pa; struct PendingMessage *bpm; char *msg; @@ -6581,6 +6821,8 @@ reliability_box_message (struct PendingMessage *pm) client_send_response (pm, GNUNET_NO, 0); return NULL; } + pa = prepare_pending_acknowledgement (queue, dvh, pm); + bpm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (rbox) + pm->bytes_msg); bpm->target = pm->target; @@ -6593,7 +6835,8 @@ reliability_box_message (struct PendingMessage *pm) rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX); rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg); rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support - rbox.msg_uuid = pm->msg_uuid; + + rbox.ack_uuid = pa->ack_uuid; msg = (char *) &bpm[1]; memcpy (msg, &rbox, sizeof (rbox)); memcpy (&msg[sizeof (rbox)], &pm[1], pm->bytes_msg); @@ -6698,11 +6941,7 @@ transmit_on_queue (void *cls) (NULL != pm->head_frag /* fragments already exist, should respect that even if MTU is 0 for this queue */) ) - s = fragment_message (s, - (0 == queue->mtu) - ? UINT16_MAX - - sizeof (struct GNUNET_TRANSPORT_SendMessageTo) - : queue->mtu); + s = fragment_message (queue, NULL /*FIXME! */, s); if (NULL == s) { /* Fragmentation failed, try next message... */ @@ -6710,7 +6949,7 @@ transmit_on_queue (void *cls) return; } if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) - s = reliability_box_message (s); + s = reliability_box_message (queue, NULL /* FIXME! */, s); if (NULL == s) { /* Reliability boxing failed, try next message... */ @@ -7509,8 +7748,8 @@ handle_queue_create_ok (void *cls, /** - * Communicator tells us that our request to create a queue failed. This usually - * indicates that the provided address is simply invalid or that the + * Communicator tells us that our request to create a queue failed. This + * usually indicates that the provided address is simply invalid or that the * communicator's resources are exhausted. * * @param cls the `struct TransportClient` @@ -7803,7 +8042,8 @@ handle_address_consider_verify ( (void) cls; // OPTIMIZE-FIXME: checking that we know this address already should // be done BEFORE checking the signature => HELLO API change! - // OPTIMIZE-FIXME: pre-check: rate-limit signature verification / validation?! + // OPTIMIZE-FIXME: pre-check: rate-limit signature verification / + // validation?! address = GNUNET_HELLO_extract_address (&hdr[1], ntohs (hdr->header.size) - sizeof (*hdr), @@ -7950,6 +8190,50 @@ free_validation_state_cb (void *cls, } +/** + * Free pending acknowledgement. + * + * @param cls NULL + * @param key unused + * @param value a `struct PendingAcknowledgement` + * @return #GNUNET_OK (always) + */ +static int +free_pending_ack_cb (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct PendingAcknowledgement *pa = value; + + (void) cls; + (void) key; + free_pending_acknowledgement (pa); + return GNUNET_OK; +} + + +/** + * Free acknowledgement cummulator. + * + * @param cls NULL + * @param pid unused + * @param value a `struct AcknowledgementCummulator` + * @return #GNUNET_OK (always) + */ +static int +free_ack_cummulator_cb (void *cls, + const struct GNUNET_PeerIdentity *pid, + void *value) +{ + struct AcknowledgementCummulator *ac = value; + + (void) cls; + (void) pid; + GNUNET_free (ac); + return GNUNET_OK; +} + + /** * Function called when the service shuts down. Unloads our plugins * and cancels pending validations. @@ -7983,6 +8267,16 @@ do_shutdown (void *cls) GNUNET_free (GST_my_private_key); GST_my_private_key = NULL; } + GNUNET_CONTAINER_multipeermap_iterate (ack_cummulators, + &free_ack_cummulator_cb, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (ack_cummulators); + ack_cummulators = NULL; + GNUNET_CONTAINER_multishortmap_iterate (pending_acks, + &free_pending_ack_cb, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (pending_acks); + pending_acks = NULL; GNUNET_CONTAINER_multipeermap_destroy (neighbours); neighbours = NULL; GNUNET_CONTAINER_multipeermap_iterate (backtalkers, @@ -8034,6 +8328,8 @@ run (void *cls, /* setup globals */ GST_cfg = c; backtalkers = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES); + pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES); + ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); -- 2.25.1