From ff10602f5ab7df06dc850206159e76bd7a7891ea Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 22 Jan 2019 22:55:05 +0100 Subject: [PATCH] working on crazy fragmentation logic --- src/include/gnunet_protocols.h | 31 +++ src/transport/gnunet-service-tng.c | 380 +++++++++++++++++++++++++--- src/transport/transport.h | 4 +- src/transport/transport_api2_core.c | 6 +- 4 files changed, 376 insertions(+), 45 deletions(-) diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index af4dbd52f..8593005d7 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -3092,6 +3092,37 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION 1213 +/** + * Type of a fragment of a CORE message created by transport to adjust + * message length to a queue's MTU. + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT 1214 + +/** + * Acknowledgement generated for a fragment. + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK 1215 + +/** + * Wrapper around non-fragmented CORE message used to measure RTT + * and ensure reliability. + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX 1216 + +/** + * Confirmation for a #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX. + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK 1217 + +/** + * Message sent for topology discovery at transport level. + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN 1218 + +/** + * Source-routed transport message based DV information gathered. + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX 1219 /** * Message sent to indicate to the transport that a monitor diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 3365ea5d5..76d5265a8 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -428,31 +428,50 @@ struct TransportDVLearn /** * Outer layer of an encapsulated message send over multiple hops. + * The path given only includes the identities of the subsequent + * peers, i.e. it will be empty if we are the receiver. Each + * forwarding peer should scan the list from the end, and if it can, + * forward to the respective peer. The list should then be shortened + * by all the entries up to and including that peer. Each hop should + * also increment @e total_hops to allow the receiver to get a precise + * estimate on the number of hops the message travelled. Senders must + * provide a learned path that thus should work, but intermediaries + * know of a shortcut, they are allowed to send the message via that + * shortcut. + * + * If a peer finds itself still on the list, it must drop the message. */ struct TransportDVBox { /** - * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV + * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX */ struct GNUNET_MessageHeader header; + /** + * Number of total hops this messages travelled. In NBO. + * @e origin sets this to zero, to be incremented at + * each hop. + */ + uint16_t total_hops GNUNET_PACKED; + /** * Number of hops this messages includes. In NBO. */ uint16_t num_hops GNUNET_PACKED; /** - * Position of our peer in the sequence. - * To be incremented at each hop. In NBO. + * Identity of the peer that originated the message. */ - uint16_t current_hop GNUNET_PACKED; + struct GNUNET_PeerIdentity origin; /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values; - the first is the sender, the last the receiver; the current - peer may be one in the middle. */ + excluding the @e origin and the current peer, the last must be + the ultimate target; if @e num_hops is zero, the receiver of this + message is the ultimate target. */ /* Followed by the actual message, which itself may be - another box, but not a DV_LEARN message! */ + another box, but not a DV_LEARN or DV_BOX message! */ }; @@ -696,7 +715,60 @@ struct Neighbour /** - * Transmission request from CORE that is awaiting delivery. + * Types of different pending messages. + */ +enum PendingMessageType +{ + + /** + * Ordinary message received from the CORE service. + */ + PMT_CORE = 0, + + /** + * Fragment box. + */ + PMT_FRAGMENT_BOX = 1, + + /** + * Reliability box. + */ + PMT_RELIABILITY_BOX = 2, + + /** + * Any type of acknowledgement. + */ + PMT_ACKNOWLEDGEMENT = 3 + + +}; + + +/** + * Transmission request that is awaiting delivery. The original + * transmission requests from CORE may be too big for some queues. + * In this case, a *tree* of fragments is created. At each + * level of the tree, fragments are kept in a DLL ordered by which + * fragment should be sent next (at the head). The tree is searched + * top-down, with the original message at the root. + * + * To select a node for transmission, first it is checked if the + * current node's message fits with the MTU. If it does not, we + * either calculate the next fragment (based on @e frag_off) from the + * current node, or, if all fragments have already been created, + * descend to the @e head_frag. Even though the node was already + * fragmented, the fragment may be too big if the fragment was + * generated for a queue with a larger MTU. In this case, the node + * may be fragmented again, thus creating a tree. + * + * When acknowledgements for fragments are received, the tree + * must be pruned, removing those parts that were already + * acknowledged. When fragments are sent over a reliable + * channel, they can be immediately removed. + * + * If a message is ever fragmented, then the original "full" message + * is never again transmitted (even if it fits below the MTU), and + * only (remaining) fragments are sent. */ struct PendingMessage { @@ -711,25 +783,50 @@ struct PendingMessage struct PendingMessage *prev_neighbour; /** - * Kept in a MDLL of messages from this @a client. + * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE) */ struct PendingMessage *next_client; - + /** - * Kept in a MDLL of messages from this @a client. + * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE) */ struct PendingMessage *prev_client; + /** + * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOx) + */ + struct PendingMessage *next_frag; + + /** + * Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX) + */ + struct PendingMessage *prev_frag; + /** * Target of the request. */ struct Neighbour *target; - + /** - * Client that issued the transmission request. + * Client that issued the transmission request, if @e pmt is #PMT_CORE. */ struct TransportClient *client; + + /** + * Head of a MDLL of fragments created for this core message. + */ + struct PendingMessage *head_frag; + + /** + * Tail of a MDLL of fragments created for this core message. + */ + struct PendingMessage *tail_frag; + /** + * Our parent in the fragmentation tree. + */ + struct PendingMessage *frag_parent; + /** * At what time should we give up on the transmission (and no longer retry)? */ @@ -739,12 +836,38 @@ struct PendingMessage * What is the earliest time for us to retry transmission of this message? */ struct GNUNET_TIME_Absolute next_attempt; + + /** + * UUID to use for this message (used for reassembly of fragments, only + * initialized if @e msg_uuid_set is #GNUNET_YES). + */ + struct GNUNET_ShortHashCode msg_uuid; + + /** + * Counter incremented per generated fragment. + */ + uint32_t frag_uuidgen; + /** + * Type of the pending message. + */ + enum PendingMessageType pmt; + /** * Size of the original message. */ - uint32_t bytes_msg; + uint16_t bytes_msg; + /** + * Offset at which we should generate the next fragment. + */ + uint16_t frag_off; + + /** + * #GNUNET_YES once @e msg_uuid was initialized + */ + int16_t msg_uuid_set; + /* Followed by @e bytes_msg to transmit */ }; @@ -1482,6 +1605,28 @@ check_client_send (void *cls, } +/** + * Free fragment tree below @e root, excluding @e root itself. + * + * @param root root of the tree to free + */ +static void +free_fragment_tree (struct PendingMessage *root) +{ + struct PendingMessage *frag; + + while (NULL != (frag = root->head_frag)) + { + free_fragment_tree (frag); + GNUNET_CONTAINER_MDLL_remove (frag, + root->head_frag, + root->tail_frag, + frag); + GNUNET_free (frag); + } +} + + /** * Send a response to the @a pm that we have processed a * "send" request with status @a success. We @@ -1509,7 +1654,7 @@ client_send_response (struct PendingMessage *pm, env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); som->success = htonl ((uint32_t) success); - som->bytes_msg = htonl (pm->bytes_msg); + som->bytes_msg = htons (pm->bytes_msg); som->bytes_physical = htonl (bytes_physical); som->peer = target->pid; GNUNET_MQ_send (tc->mq, @@ -1523,6 +1668,7 @@ client_send_response (struct PendingMessage *pm, target->pending_msg_head, target->pending_msg_tail, pm); + free_fragment_tree (pm); GNUNET_free (pm); } @@ -2024,18 +2170,98 @@ static struct PendingMessage * fragment_message (struct PendingMessage *pm, uint16_t mtu) { - if (0) + struct PendingMessage *ff; + + if (GNUNET_NO == pm->msg_uuid_set) { - /* mtu too small */ - // FIMXE: bitch - client_send_response (pm, - GNUNET_NO, - 0); - return NULL; + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, + &pm->msg_uuid, + sizeof (pm->msg_uuid)); + pm->msg_uuid_set = GNUNET_YES; + } + + /* This invariant is established in #handle_add_queue_message() */ + GNUNET_assert (mtu > sizeof (struct TransportFragmentBox)); + + /* select fragment for transmission, descending the tree if it has + been expanded until we are at a leaf or at a fragment that is small enough */ + ff = pm; + while ( ( (ff->bytes_msg > mtu) || + (pm == ff) ) && + (ff->frag_off == ff->bytes_msg) && + (NULL != ff->head_frag) ) + { + ff = ff->head_frag; /* descent into fragmented fragments */ } - /* FIXME: return first fragment here! */ - return NULL; + if ( ( (ff->bytes_msg > mtu) || + (pm == ff) ) && + (pm->frag_off < pm->bytes_msg) ) + { + /* Did not yet calculate all fragments, calculate next fragment */ + struct PendingMessage *frag; + struct TransportFragmentBox tfb; + const char *orig; + char *msg; + uint16_t fragmax; + uint16_t fragsize; + uint16_t msize; + uint16_t xoff = 0; + + orig = (const char *) &ff[1]; + msize = ff->bytes_msg; + if (pm != ff) + { + const struct TransportFragmentBox *tfbo; + + tfbo = (const struct TransportFragmentBox *) orig; + orig += sizeof (struct TransportFragmentBox); + msize -= sizeof (struct TransportFragmentBox); + xoff = ntohs (tfbo->frag_off); + } + fragmax = mtu - sizeof (struct TransportFragmentBox); + fragsize = GNUNET_MIN (msize - ff->frag_off, + fragmax); + frag = GNUNET_malloc (sizeof (struct PendingMessage) + + sizeof (struct TransportFragmentBox) + + fragsize); + frag->target = pm->target; + frag->frag_parent = ff; + frag->timeout = pm->timeout; + frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize; + frag->pmt = PMT_FRAGMENT_BOX; + msg = (char *) &frag[1]; + tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT); + tfb.header.size = htons (sizeof (struct TransportFragmentBox) + + fragsize); + tfb.frag_uuid = htonl (pm->frag_uuidgen++); + tfb.msg_uuid = pm->msg_uuid; + tfb.frag_off = htons (ff->frag_off + xoff); + tfb.msg_size = htons (pm->bytes_msg); + memcpy (msg, + &tfb, + sizeof (tfb)); + memcpy (&msg[sizeof (tfb)], + &orig[ff->frag_off], + fragsize); + GNUNET_CONTAINER_MDLL_insert (frag, + ff->head_frag, + ff->tail_frag, + frag); + ff->frag_off += fragsize; + ff = frag; + } + + /* Move head to the tail and return it */ + GNUNET_CONTAINER_MDLL_remove (frag, + ff->frag_parent->head_frag, + ff->frag_parent->tail_frag, + ff); + GNUNET_CONTAINER_MDLL_insert_tail (frag, + ff->frag_parent->head_frag, + ff->frag_parent->tail_frag, + ff); + return ff; } @@ -2052,11 +2278,12 @@ fragment_message (struct PendingMessage *pm, static struct PendingMessage * reliability_box_message (struct PendingMessage *pm) { - if (0) // FIXME + if (PMT_CORE != pm->pmt) { - /* already fragmented or reliability boxed, do nothing */ + /* already fragmented or reliability boxed, or control message: do nothing */ return pm; } + if (0) // FIXME { /* failed hard */ @@ -2086,6 +2313,7 @@ transmit_on_queue (void *cls) struct GNUNET_ATS_Session *queue = cls; struct Neighbour *n = queue->neighbour; struct PendingMessage *pm; + struct PendingMessage *s; uint32_t overhead; queue->transmit_task = NULL; @@ -2100,38 +2328,102 @@ transmit_on_queue (void *cls) overhead = 0; if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) overhead += sizeof (struct TransportReliabilityBox); - if ( (0 != queue->mtu) && - (pm->bytes_msg + overhead > queue->mtu) ) - pm = fragment_message (pm, - queue->mtu); - if (NULL == pm) + s = pm; + if ( ( (0 != queue->mtu) && + (pm->bytes_msg + overhead > queue->mtu) ) || + (NULL != pm->head_frag /* fragments already exist, should + respect that even if MTU is 0 for + this queue */) ) + s = fragment_message (s, + (0 == queue->mtu) + ? UINT16_MAX /* no real maximum */ + : queue->mtu); + if (NULL == s) { /* Fragmentation failed, try next message... */ schedule_transmit_on_queue (queue); return; } if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc) - pm = reliability_box_message (pm); - if (NULL == pm) + s = reliability_box_message (s); + if (NULL == s) { /* Reliability boxing failed, try next message... */ schedule_transmit_on_queue (queue); return; } - // FIXME: actually do transmission + // FIXME: actually give 's' to communicator for transmission here! - // FIXME: unless 'pm' is an ACK or control, move 'pm' back in the - // transmission queue (simplistic: to the end, better: with position - // depending on type, timeout, etc.) - - // FIXME: do something similar in defragmentation / reliability ACK handling! - if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) + // FIXME: do something similar to the logic below + // in defragmentation / reliability ACK handling! + + /* Check if this transmission somehow conclusively finished handing 'pm' + even without any explicit ACKs */ + if ( (PMT_CORE == s->pmt) && + (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) ) { + /* Full message sent, and over reliabile channel */ client_send_response (pm, GNUNET_YES, pm->bytes_msg); } + else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) && + (PMT_FRAGMENT_BOX == s->pmt) ) + { + struct PendingMessage *pos; + + /* Fragment sent over reliabile channel */ + free_fragment_tree (s); + pos = s->frag_parent; + GNUNET_CONTAINER_MDLL_remove (frag, + pos->head_frag, + pos->tail_frag, + s); + GNUNET_free (s); + /* check if subtree is done */ + while ( (NULL == pos->head_frag) && + (pos->frag_off == pos->bytes_msg) && + (pos != pm) ) + { + s = pos; + pos = s->frag_parent; + GNUNET_CONTAINER_MDLL_remove (frag, + pos->head_frag, + pos->tail_frag, + s); + GNUNET_free (s); + } + + /* Was this the last applicable fragmment? */ + if ( (NULL == pm->head_frag) && + (pm->frag_off == pm->bytes_msg) ) + client_send_response (pm, + GNUNET_YES, + pm->bytes_msg /* FIXME: calculate and add overheads! */); + } + else if (PMT_CORE != pm->pmt) + { + /* This was an acknowledgement of some type, always free */ + + struct Neighbour *neighbour = pm->target; + GNUNET_CONTAINER_MDLL_remove (neighbour, + neighbour->pending_msg_head, + neighbour->pending_msg_tail, + pm); + GNUNET_free (pm); + } + else + { + /* message not finished, waiting for acknowledgement */ + // FIXME: update time by which we might retransmit 's' based on + // queue characteristics (i.e. RTT) + + // FIXME: move 'pm' back in the transmission queue (simplistic: to + // the end, better: with position depending on type, timeout, + // etc.) + } + /* finally, re-schedule self */ schedule_transmit_on_queue (queue); } @@ -2217,6 +2509,14 @@ handle_add_queue_message (void *cls, const char *addr; uint16_t addr_len; + if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox)) + { + /* MTU so small as to be useless for transmissions, + required for #fragment_message()! */ + GNUNET_break_op (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } neighbour = lookup_neighbour (&aqm->receiver); if (NULL == neighbour) { diff --git a/src/transport/transport.h b/src/transport/transport.h index 00d475e2b..6b1a2cac1 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -223,12 +223,12 @@ struct SendOkMessage * in either case, it is now OK for this client to * send us another message for the given peer. */ - uint32_t success GNUNET_PACKED; + uint16_t success GNUNET_PACKED; /** * Size of message sent */ - uint32_t bytes_msg GNUNET_PACKED; + uint16_t bytes_msg GNUNET_PACKED; /** * Size of message sent over wire diff --git a/src/transport/transport_api2_core.c b/src/transport/transport_api2_core.c index 607f26777..f00d00a44 100644 --- a/src/transport/transport_api2_core.c +++ b/src/transport/transport_api2_core.c @@ -600,15 +600,15 @@ handle_send_ok (void *cls, { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; - uint32_t bytes_msg; + uint16_t bytes_msg; uint32_t bytes_physical; - bytes_msg = ntohl (okm->bytes_msg); + bytes_msg = ntohs (okm->bytes_msg); bytes_physical = ntohl (okm->bytes_physical); LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving SEND_OK message, transmission to %s %s.\n", GNUNET_i2s (&okm->peer), - (GNUNET_OK == ntohl (okm->success)) + (GNUNET_OK == ntohs (okm->success)) ? "succeeded" : "failed"); n = neighbour_find (h, -- 2.25.1