X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmesh%2Fgnunet-service-mesh.c;h=7ebe7d199d188696f36268b002a872fa3a1b1ae1;hb=b552dea05cbfacacf1c65c6eb1f54220f4e4beb5;hp=524b89c027213ab8d7afa588ae08530fed69696a;hpb=a577a67e0ec909911b814265902861cfb18640c7;p=oweals%2Fgnunet.git diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index 524b89c02..7ebe7d199 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2001 - 2011 Christian Grothoff (and other contributing authors) + (C) 2001-2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -37,11 +37,11 @@ * TODO: * - error reporting (CREATE/CHANGE/ADD/DEL?) -- new message! * - partial disconnect reporting -- same as error reporting? - * - add vs create? change vs. keep-alive? same msg or different ones? -- thinking... - * - speed requirement specification (change?) in mesh API -- API call * - add ping message * - relay corking down to core * - set ttl relative to tree depth + * - Add data ACK count in path ACK + * - Make common GNUNET_MESH_Data header for unicast, to_orig, multicast * TODO END */ @@ -57,8 +57,9 @@ #define MESH_BLOOM_SIZE 128 -#define MESH_DEBUG_DHT GNUNET_YES +#define MESH_DEBUG_DHT GNUNET_NO #define MESH_DEBUG_CONNECTION GNUNET_NO +#define MESH_DEBUG_TIMING __LINUX__ && GNUNET_YES #if MESH_DEBUG_CONNECTION #define DEBUG_CONN(...) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__) @@ -72,6 +73,30 @@ #define DEBUG_DHT(...) #endif +#if MESH_DEBUG_TIMING +#include +double __sum; +uint64_t __count; +struct timespec __mesh_start; +struct timespec __mesh_end; +#define INTERVAL_START clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &(__mesh_start)) +#define INTERVAL_END \ +do {\ + clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &(__mesh_end));\ + double __diff = __mesh_end.tv_nsec - __mesh_start.tv_nsec;\ + if (__diff < 0) __diff += 1000000000;\ + __sum += __diff;\ + __count++;\ +} while (0) +#define INTERVAL_SHOW \ +if (0 < __count)\ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "AVG process time: %f ns\n", __sum/__count) +#else +#define INTERVAL_START +#define INTERVAL_END +#define INTERVAL_SHOW +#endif + /******************************************************************************/ /************************ DATA STRUCTURES ****************************/ /******************************************************************************/ @@ -89,12 +114,11 @@ struct MeshData /** Tunnel it belongs to. */ struct MeshTunnel *t; - /** In case of a multicast, task to allow a client to send more data if - * some neighbor is too slow. */ - GNUNET_SCHEDULER_TaskIdentifier *task; + /** How many remaining neighbors still hav't got it. */ + unsigned int reference_counter; /** How many remaining neighbors we need to send this to. */ - unsigned int *reference_counter; + unsigned int total_out; /** Size of the data. */ size_t data_len; @@ -287,51 +311,67 @@ struct MeshTunnel MESH_TunnelNumber local_tid_dest; /** - * Local count ID of the last packet seen/sent. + * Is the speed on the tunnel limited to the slowest peer? */ - uint32_t pid; + int speed_min; + + /** + * Is the tunnel bufferless (minimum latency)? + */ + int nobuffer; + + /** + * Packet ID of the last fwd packet seen (sent/retransmitted/received). + */ + uint32_t fwd_pid; /** + * Packet ID of the last bck packet sent (unique counter per hop). + */ + uint32_t bck_pid; + + /** * SKIP value for this tunnel. */ uint32_t skip; /** * MeshTunnelChildInfo of all children, indexed by GNUNET_PEER_Id. + * Contains the Flow Control info: FWD ACK value received, + * last BCK ACK sent, PID and SKIP values. */ struct GNUNET_CONTAINER_MultiHashMap *children_fc; /** - * Last ACK sent towards the origin. + * Last ACK sent towards the origin (for traffic towards leaf node). */ - uint32_t last_ack; + uint32_t last_fwd_ack; - /** - * How many messages are in the queue. - */ - unsigned int queue_n; + /** + * BCK ACK value received from the hop towards the owner of the tunnel, + * (previous node / owner): up to what message PID can we sent back to him. + */ + uint32_t bck_ack; /** - * How many messages do we accept in the queue. + * How many messages are in the forward queue (towards leaves). */ - unsigned int queue_max; + unsigned int fwd_queue_n; /** - * Is the speed on the tunnel limited to the slowest peer? + * How many messages do we accept in the forward queue. */ - int speed_min; + unsigned int fwd_queue_max; /** - * Is the tunnel bufferless (minimum latency)? + * How many messages are in the backward queue (towards origin). */ - int nobuffer; + unsigned int bck_queue_n; /** - * Flag to signal the destruction of the tunnel. - * If this is set GNUNET_YES the tunnel will be destroyed - * when the queue is empty. - */ - int destroy; + * How many messages do we accept in the backward queue. + */ + unsigned int bck_queue_max; /** * Last time the tunnel was used @@ -365,19 +405,12 @@ struct MeshTunnel struct MeshClient **clients; /** - * FWD ACK value of each active client: up to what message can we transmit - * to a leaf client. - */ - uint32_t *clients_acks; - - /** - * BCK ACK value of the root client, owner of the tunnel, - * up to what message PID can we sent him. + * Flow control info for each client. */ - uint32_t root_client_ack; + struct MeshTunnelClientInfo *clients_fc; /** - * Number of elements in clients/clients_acks + * Number of elements in clients/clients_fc */ unsigned int nclients; @@ -426,18 +459,25 @@ struct MeshTunnel */ struct MeshRegexSearchContext *regex_ctx; - /** - * Task to keep the used paths alive - */ + /** + * Task to keep the used paths alive + */ GNUNET_SCHEDULER_TaskIdentifier path_refresh_task; - /** - * Task to destroy the tunnel after timeout - * - * FIXME: merge the two? a tunnel will have either - * a path refresh OR a timeout, never both! - */ + /** + * Task to destroy the tunnel after timeout + * + * FIXME: merge the two? a tunnel will have either + * a path refresh OR a timeout, never both! + */ GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Flag to signal the destruction of the tunnel. + * If this is set GNUNET_YES the tunnel will be destroyed + * when the queue is empty. + */ + int destroy; }; @@ -464,15 +504,60 @@ struct MeshTunnelChildInfo /** * Maximum PID allowed (FWD ACK received). */ - uint32_t max_pid; + uint32_t fwd_ack; /** * Last ACK sent to that child (BCK ACK). */ - uint32_t last_ack; + uint32_t bck_ack; + + /** + * Circular buffer pointing to MeshPeerQueue elements for all + * payload traffic going to this child. + * Size determined by the tunnel queue size (@c t->fwd_queue_max). + */ + struct MeshPeerQueue **send_buffer; + + /** + * Index of the oldest element in the send_buffer. + */ + unsigned int send_buffer_start; + + /** + * How many elements are already in the buffer. + */ + unsigned int send_buffer_n; +}; + + +/** + * Info about a leaf client of a tunnel, needed to perform flow control. + */ +struct MeshTunnelClientInfo +{ + /** + * PID of the last packet sent to the client (FWD). + */ + uint32_t fwd_pid; + + /** + * PID of the last packet received from the client (BCK). + */ + uint32_t bck_pid; + + /** + * Maximum PID allowed (FWD ACK received). + */ + uint32_t fwd_ack; + + /** + * Last ACK sent to that child (BCK ACK). + */ + uint32_t bck_ack; }; + /** * Info collected during iteration of child nodes in order to get the ACK value * for a tunnel. @@ -484,6 +569,11 @@ struct MeshTunnelChildIteratorContext */ struct MeshTunnel *t; + /** + * Is this context initialized? Is the value in max_child_ack valid? + */ + int init; + /** * Maximum child ACK so far. */ @@ -678,6 +768,16 @@ struct MeshRegexSearchContext */ struct MeshRegexSearchInfo *info; + /** + * We just want to look for one edge, the longer the better. + * Keep its length. + */ + unsigned int longest_match; + + /** + * Destination hash of the longest match. + */ + struct GNUNET_HashCode hash; }; /******************************************************************************/ @@ -705,13 +805,15 @@ mesh_debug (void *cls, int success) } #endif +unsigned int debug_fwd_ack; +unsigned int debug_bck_ack; + #endif /******************************************************************************/ /*********************** GLOBAL VARIABLES ****************************/ /******************************************************************************/ - /** * Configuration parameters */ @@ -725,6 +827,12 @@ static long long unsigned int dht_replication_level; static long long unsigned int max_tunnels; static long long unsigned int max_msgs_queue; + +/** + * Hostkey generation context + */ +static struct GNUNET_CRYPTO_RsaKeyGenerationContext *keygen; + /** * DLL with all the clients, head. */ @@ -1021,16 +1129,6 @@ tunnel_notify_connection_broken (struct MeshTunnel *t, GNUNET_PEER_Id p1, static uint32_t tunnel_get_fwd_ack (struct MeshTunnel *t); -/** - * Get the current ack value for a tunnel, for data going from leaves to root, - * taking in account the tunnel mode and the status of all children and clients. - * - * @param t Tunnel. - * - * @return Maximum PID allowed. - */ -static uint32_t -tunnel_get_bck_ack (struct MeshTunnel *t); /** * Add a client to a tunnel, initializing all needed data structures. @@ -1043,20 +1141,18 @@ tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c); /** - * Iterator over edges in a regex block retrieved from the DHT. + * Jump to the next edge, with the longest matching token. * - * @param cls Closure. - * @param token Token that follows to next state. - * @param len Lenght of token. - * @param key Hash of next state. + * @param block Block found in the DHT. + * @param size Size of the block. + * @param ctx Context of the search. * * @return GNUNET_YES if should keep iterating, GNUNET_NO otherwise. */ -static int -regex_edge_iterator (void *cls, - const char *token, - size_t len, - const struct GNUNET_HashCode *key); +static void +regex_next_edge (const struct MeshRegexBlock *block, + size_t size, + struct MeshRegexSearchContext *ctx); /** @@ -1072,9 +1168,17 @@ regex_find_path (const struct GNUNET_HashCode *key, /** - * Queue and pass message to core when possible. + * @brief Queue and pass message to core when possible. + * + * If type is payload (UNICAST, TO_ORIGIN, MULTICAST) checks for queue status + * and accounts for it. In case the queue is full, the message is dropped and + * a break issued. + * + * Otherwise, message is treated as internal and allowed to go regardless of + * queue status. * - * @param cls Closure (type dependant). + * @param cls Closure (@c type dependant). It will be used by queue_send to + * build the message to be sent if not already prebuilt. * @param type Type of the message, 0 for a raw message. * @param size Size of the message. * @param dst Neighbor to send message to. @@ -1084,6 +1188,7 @@ static void queue_add (void *cls, uint16_t type, size_t size, struct MeshPeerInfo *dst, struct MeshTunnel *t); + /** * Free a transmission that was already queued with all resources * associated to the request. @@ -1094,6 +1199,34 @@ queue_add (void *cls, uint16_t type, size_t size, static void queue_destroy (struct MeshPeerQueue *queue, int clear_cls); + +/** + * @brief Get the next transmittable message from the queue. + * + * This will be the head, except in the case of being a data packet + * not allowed by the destination peer. + * + * @param peer Destination peer. + * + * @return The next viable MeshPeerQueue element to send to that peer. + * NULL when there are no transmittable messages. + */ +struct MeshPeerQueue * +queue_get_next (const struct MeshPeerInfo *peer); + + +/** + * Core callback to write a queued packet to core buffer + * + * @param cls Closure (peer info). + * @param size Number of bytes available in buf. + * @param buf Where the to write the message. + * + * @return number of bytes written to buf + */ +static size_t +queue_send (void *cls, size_t size, void *buf); + /******************************************************************************/ /************************ ITERATORS ****************************/ /******************************************************************************/ @@ -1129,8 +1262,7 @@ regex_result_iterator (void *cls, ntohl(block->accepting)); } - (void) GNUNET_MESH_regex_block_iterate (block, SIZE_MAX, - ®ex_edge_iterator, ctx); + regex_next_edge(block, SIZE_MAX, ctx); return GNUNET_YES; } @@ -1153,9 +1285,7 @@ regex_edge_iterator (void *cls, const struct GNUNET_HashCode *key) { struct MeshRegexSearchContext *ctx = cls; - struct MeshRegexSearchContext *new_ctx; struct MeshRegexSearchInfo *info = ctx->info; - struct GNUNET_DHT_GetHandle *get_h; char *current; size_t current_len; @@ -1179,40 +1309,93 @@ regex_edge_iterator (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Token doesn't match, END\n"); return GNUNET_YES; // Token doesn't match } + + if (len > ctx->longest_match) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Token is longer, KEEP\n"); + ctx->longest_match = len; + ctx->hash = *key; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Token is not longer, IGNORE\n"); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* End of regex edge iterator\n"); + return GNUNET_YES; +} + + +/** + * Jump to the next edge, with the longest matching token. + * + * @param block Block found in the DHT. + * @param size Size of the block. + * @param ctx Context of the search. + * + * @return GNUNET_YES if should keep iterating, GNUNET_NO otherwise. + */ +static void +regex_next_edge (const struct MeshRegexBlock *block, + size_t size, + struct MeshRegexSearchContext *ctx) +{ + struct MeshRegexSearchContext *new_ctx; + struct MeshRegexSearchInfo *info = ctx->info; + struct GNUNET_DHT_GetHandle *get_h; + + int result; + + /* Find the longest match for the current string position, + * among tokens in the given block */ + ctx->longest_match = 0; + result = GNUNET_MESH_regex_block_iterate (block, size, + ®ex_edge_iterator, ctx); + GNUNET_break (GNUNET_OK == result || SIZE_MAX == size); + + /* Did anything match? */ + if (0 == ctx->longest_match) + return; + new_ctx = GNUNET_malloc (sizeof (struct MeshRegexSearchContext)); new_ctx->info = info; - new_ctx->position = ctx->position + len; + new_ctx->position = ctx->position + ctx->longest_match; GNUNET_array_append (info->contexts, info->n_contexts, new_ctx); + + /* Check whether we already have a DHT GET running for it */ if (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_contains(info->dht_get_handles, key)) + GNUNET_CONTAINER_multihashmap_contains(info->dht_get_handles, &ctx->hash)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* GET running, END\n"); - GNUNET_CONTAINER_multihashmap_get_multiple (info->dht_get_results, key, + GNUNET_CONTAINER_multihashmap_get_multiple (info->dht_get_results, + &ctx->hash, ®ex_result_iterator, new_ctx); - return GNUNET_YES; // We are already looking for it + return; // We are already looking for it } + /* Start search in DHT */ get_h = GNUNET_DHT_get_start (dht_handle, /* handle */ GNUNET_BLOCK_TYPE_MESH_REGEX, /* type */ - key, /* key to search */ + &ctx->hash, /* key to search */ dht_replication_level, /* replication level */ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, NULL, /* xquery */ // FIXME BLOOMFILTER 0, /* xquery bits */ // FIXME BLOOMFILTER SIZE &dht_get_string_handler, new_ctx); if (GNUNET_OK != - GNUNET_CONTAINER_multihashmap_put(info->dht_get_handles, key, get_h, + GNUNET_CONTAINER_multihashmap_put(info->dht_get_handles, + &ctx->hash, + get_h, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)) { GNUNET_break (0); - return GNUNET_YES; + return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* End of regex edge iterator\n"); - return GNUNET_YES; } + /** * Iterator over hash map entries to cancel DHT GET requests after a * successful connect_by_string. @@ -1267,8 +1450,11 @@ regex_free_result (void *cls, * @param edges edges leaving current state. */ void -regex_iterator (void *cls, const struct GNUNET_HashCode *key, const char *proof, - int accepting, unsigned int num_edges, +regex_iterator (void *cls, + const struct GNUNET_HashCode *key, + const char *proof, + int accepting, + unsigned int num_edges, const struct GNUNET_REGEX_Edge *edges) { struct MeshRegexBlock *block; @@ -1523,6 +1709,7 @@ announce_application (void *cls, const struct GNUNET_HashCode * key, void *value block.id = my_full_id; c = GNUNET_CONTAINER_multihashmap_get (applications, key); + GNUNET_assert(NULL != c); block.type = (long) GNUNET_CONTAINER_multihashmap_get (c->apps, key); if (0 == block.type) { @@ -1652,55 +1839,6 @@ announce_id (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /****************** GENERAL HELPER FUNCTIONS ************************/ /******************************************************************************/ -/** - * Check if one pid is bigger than other, accounting for overflow. - * - * @param bigger Argument that should be bigger. - * @param smaller Argument that should be smaller. - * - * @return True if big is bigger than small - */ -static int -is_pid_bigger (uint32_t bigger, uint32_t smaller) -{ - return (GNUNET_YES == PID_OVERFLOW(smaller, bigger) || - (bigger > smaller && GNUNET_NO == PID_OVERFLOW(bigger, smaller))); -} - -/** - * Get the higher ACK value out of two values, taking in account overflow. - * - * @param a First ACK value. - * @param b Second ACK value. - * - * @return Highest ACK value from the two. - */ -static uint32_t -max_pid (uint32_t a, uint32_t b) -{ - if (is_pid_bigger(a, b)) - return a; - return b; -} - - -/** - * Get the lower ACK value out of two values, taking in account overflow. - * - * @param a First ACK value. - * @param b Second ACK value. - * - * @return Lowest ACK value from the two. - */ -static uint32_t -min_pid (uint32_t a, uint32_t b) -{ - if (is_pid_bigger(a, b)) - return b; - return a; -} - - /** * Decrements the reference counter and frees all resources if needed * @@ -1710,23 +1848,9 @@ min_pid (uint32_t a, uint32_t b) static void data_descriptor_decrement_rc (struct MeshData *mesh_data) { - /* Make sure it's a multicast packet */ - GNUNET_assert (NULL != mesh_data->reference_counter); - - if (0 == --(*(mesh_data->reference_counter))) + if (0 == --(mesh_data->reference_counter)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Last copy!\n"); - if (NULL != mesh_data->task) - { - if (GNUNET_SCHEDULER_NO_TASK != *(mesh_data->task)) - { - GNUNET_SCHEDULER_cancel (*(mesh_data->task)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " notifying client...\n"); - GNUNET_SERVER_receive_done (mesh_data->t->owner->handle, GNUNET_OK); - } - GNUNET_free (mesh_data->task); - } - GNUNET_free (mesh_data->reference_counter); GNUNET_free (mesh_data->data); GNUNET_free (mesh_data); } @@ -1763,6 +1887,11 @@ client_get (struct GNUNET_SERVER_Client *client) * @param c Client to check * * @return GNUNET_YES or GNUNET_NO, depending on subscription status + * + * FIXME: use of crypto_hash slows it down + * The hash function alone takes 8-10us out of the ~55us for the whole + * process of retransmitting the message from one local client to another. + * Find faster implementation! */ static int client_is_subscribed (uint16_t message_type, struct MeshClient *c) @@ -1771,37 +1900,12 @@ client_is_subscribed (uint16_t message_type, struct MeshClient *c) if (NULL == c->types) return GNUNET_NO; + GNUNET_CRYPTO_hash (&message_type, sizeof (uint16_t), &hc); return GNUNET_CONTAINER_multihashmap_contains (c->types, &hc); } -/** - * Allow a client to send more data after transmitting a multicast message - * which some neighbor has not yet accepted altough a reasonable time has - * passed. - * - * @param cls Closure (DataDescriptor containing the task identifier) - * @param tc Task Context - * - * FIXME reference counter cshould be just int - */ -static void -client_allow_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct MeshData *mdata = cls; - - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - GNUNET_assert (NULL != mdata->reference_counter); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "CLIENT ALLOW SEND DESPITE %u COPIES PENDING\n", - *(mdata->reference_counter)); - *(mdata->task) = GNUNET_SCHEDULER_NO_TASK; - GNUNET_SERVER_receive_done (mdata->t->owner->handle, GNUNET_OK); -} - - /** * Check whether client wants traffic from a tunnel. * @@ -1913,15 +2017,16 @@ client_delete_tunnel (struct MeshClient *c, struct MeshTunnel *t) * * @param msg Pointer to the message itself * @param payload Pointer to the payload of the message. + * @param t The tunnel to whose clients this message goes. + * * @return number of clients this message was sent to */ static unsigned int send_subscribed_clients (const struct GNUNET_MessageHeader *msg, - const struct GNUNET_MessageHeader *payload) + const struct GNUNET_MessageHeader *payload, + struct MeshTunnel *t) { - struct GNUNET_PeerIdentity *oid; struct MeshClient *c; - struct MeshTunnel *t; MESH_TunnelNumber *tid; unsigned int count; uint16_t type; @@ -1929,7 +2034,8 @@ send_subscribed_clients (const struct GNUNET_MessageHeader *msg, type = ntohs (payload->type); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending to clients...\n"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "message of type %u\n", type); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "message of type %s\n", + GNUNET_MESH_DEBUG_M2S (type)); memcpy (cbuf, msg, sizeof (cbuf)); switch (htons (msg->type)) @@ -1938,30 +2044,21 @@ send_subscribed_clients (const struct GNUNET_MessageHeader *msg, struct GNUNET_MESH_Multicast *mc; struct GNUNET_MESH_ToOrigin *to; - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - uc = (struct GNUNET_MESH_Unicast *) cbuf; - tid = &uc->tid; - oid = &uc->oid; - break; - case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: - mc = (struct GNUNET_MESH_Multicast *) cbuf; - tid = &mc->tid; - oid = &mc->oid; - break; - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - to = (struct GNUNET_MESH_ToOrigin *) cbuf; - tid = &to->tid; - oid = &to->oid; - break; - default: - GNUNET_break (0); - return 0; - } - t = tunnel_get (oid, ntohl (*tid)); - if (NULL == t) - { - GNUNET_break (0); - return 0; + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + uc = (struct GNUNET_MESH_Unicast *) cbuf; + tid = &uc->tid; + break; + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + mc = (struct GNUNET_MESH_Multicast *) cbuf; + tid = &mc->tid; + break; + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + to = (struct GNUNET_MESH_ToOrigin *) cbuf; + tid = &to->tid; + break; + default: + GNUNET_break (0); + return 0; } for (count = 0, c = clients; c != NULL; c = c->next) @@ -2010,6 +2107,7 @@ send_subscribed_clients (const struct GNUNET_MessageHeader *msg, *) cbuf, GNUNET_NO); } } + return count; } @@ -2036,73 +2134,21 @@ send_client_peer_connected (const struct MeshTunnel *t, const GNUNET_PEER_Id id) /** - * Notify a the client of a tunnel about how many more - * payload packages will we accept on a given tunnel, - * distinguiching between root and leaf clients. + * Notify all clients (not depending on registration status) that the incoming + * tunnel is no longer valid. * - * @param c Client whom to send the ACK. - * @param t Tunnel on which to send the ACK. + * @param t Tunnel that was destroyed. */ static void -send_client_tunnel_ack (struct MeshClient *c, struct MeshTunnel *t) +send_clients_tunnel_destroy (struct MeshTunnel *t) { - struct GNUNET_MESH_LocalAck msg; - MESH_TunnelNumber tid; - uint32_t ack; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending client ACK on tunnel %X\n", - t->local_tid); - if (NULL == c) - return; + struct GNUNET_MESH_TunnelMessage msg; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " to client %u\n", c->id); - - if (c == t->owner) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " (owner, FWD ACK)\n"); - ack = tunnel_get_fwd_ack (t); - tid = t->local_tid; - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " (leaf, BCK ACK)\n"); - ack = tunnel_get_bck_ack (t); - tid = t->local_tid_dest; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ack); - if (t->last_ack == ack) - return; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending!\n"); - t->last_ack = ack; - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK); - msg.tunnel_id = htonl (tid); - msg.max_pid = htonl (ack); - - GNUNET_SERVER_notification_context_unicast (nc, c->handle, - &msg.header, GNUNET_NO); -} - - -/** - * Notify all clients (not depending on registration status) that the incoming - * tunnel is no longer valid. - * - * @param t Tunnel that was destroyed. - */ -static void -send_clients_tunnel_destroy (struct MeshTunnel *t) -{ - struct GNUNET_MESH_TunnelMessage msg; - - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY); - msg.tunnel_id = htonl (t->local_tid_dest); - GNUNET_SERVER_notification_context_broadcast (nc, &msg.header, GNUNET_NO); -} + msg.header.size = htons (sizeof (msg)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_DESTROY); + msg.tunnel_id = htonl (t->local_tid_dest); + GNUNET_SERVER_notification_context_broadcast (nc, &msg.header, GNUNET_NO); +} /** @@ -2266,14 +2312,15 @@ send_core_data_raw (void *cls, size_t size, void *buf) * @param t Tunnel on which this message is transmitted. */ static void -send_message (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - struct MeshTunnel *t) +send_prebuilt_message (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + struct MeshTunnel *t) { struct MeshTransmissionDescriptor *info; struct MeshPeerInfo *neighbor; struct MeshPeerPath *p; size_t size; + uint16_t type; // GNUNET_TRANSPORT_try_connect(); FIXME use? @@ -2282,7 +2329,8 @@ send_message (const struct GNUNET_MessageHeader *message, info->mesh_data = GNUNET_malloc (sizeof (struct MeshData)); info->mesh_data->data = GNUNET_malloc (size); memcpy (info->mesh_data->data, message, size); - if (ntohs(message->type) == GNUNET_MESSAGE_TYPE_MESH_UNICAST) + type = ntohs(message->type); + if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) { struct GNUNET_MESH_Unicast *m; @@ -2290,8 +2338,8 @@ send_message (const struct GNUNET_MessageHeader *message, m->ttl = htonl (ntohl (m->ttl) - 1); } info->mesh_data->data_len = size; - info->mesh_data->reference_counter = GNUNET_malloc (sizeof (unsigned int)); - *info->mesh_data->reference_counter = 1; + info->mesh_data->reference_counter = 1; + info->mesh_data->total_out = 1; neighbor = peer_info_get (peer); for (p = neighbor->path_head; NULL != p; p = p->next) { @@ -2302,6 +2350,30 @@ send_message (const struct GNUNET_MessageHeader *message, } if (NULL == p) { +#if MESH_DEBUG + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " %s IS NOT DIRECTLY CONNECTED\n", + GNUNET_i2s(peer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " PATHS TO %s:\n", + GNUNET_i2s(peer)); + for (p = neighbor->path_head; NULL != p; p = p->next) + { + struct GNUNET_PeerIdentity debug_id; + unsigned int i; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " path with %u hops through:\n", + p->length); + for (i = 0; i < p->length; i++) + { + GNUNET_PEER_resolve(p->peers[i], &debug_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " hop %u: %s\n", + i, GNUNET_i2s(&debug_id)); + } + } +#endif GNUNET_break (0); // FIXME sometimes fails (testing disconnect?) GNUNET_free (info->mesh_data->data); GNUNET_free (info->mesh_data); @@ -2309,8 +2381,10 @@ send_message (const struct GNUNET_MessageHeader *message, return; } info->peer = neighbor; + if (GNUNET_MESSAGE_TYPE_MESH_PATH_ACK == type) + type = 0; queue_add (info, - 0, + type, size, neighbor, t); @@ -2406,12 +2480,39 @@ send_destroy_path (struct MeshTunnel *t, GNUNET_PEER_Id destination) { GNUNET_PEER_resolve (p->peers[i], &pi[i]); } - send_message (&msg->header, tree_get_first_hop (t->tree, destination), t); + send_prebuilt_message (&msg->header, tree_get_first_hop (t->tree, destination), t); } path_destroy (p); } +/** + * Sends a PATH ACK message in reponse to a received PATH_CREATE directed to us. + * + * @param t Tunnel which to confirm. + */ +static void +send_path_ack (struct MeshTunnel *t) +{ + struct MeshTransmissionDescriptor *info; + struct GNUNET_PeerIdentity id; + GNUNET_PEER_Id peer; + + peer = tree_get_predecessor (t->tree); + GNUNET_PEER_resolve (peer, &id); + info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); + info->origin = &t->id; + info->peer = GNUNET_CONTAINER_multihashmap_get (peers, &id.hashPubKey); + GNUNET_assert (NULL != info->peer); + + queue_add (info, + GNUNET_MESSAGE_TYPE_MESH_PATH_ACK, + sizeof (struct GNUNET_MESH_PathACK), + info->peer, + t); +} + + /** * Try to establish a new connection to this peer. * Use the best path for the given tunnel. @@ -2970,10 +3071,10 @@ tunnel_delete_active_client (struct MeshTunnel *t, const struct MeshClient *c) if (t->clients[i] == c) { t->clients[i] = t->clients[t->nclients - 1]; - t->clients_acks[i] = t->clients_acks[t->nclients - 1]; + t->clients_fc[i] = t->clients_fc[t->nclients - 1]; GNUNET_array_grow (t->clients, t->nclients, t->nclients - 1); t->nclients++; - GNUNET_array_grow (t->clients_acks, t->nclients, t->nclients - 1); + GNUNET_array_grow (t->clients_fc, t->nclients, t->nclients - 1); break; } } @@ -3019,7 +3120,10 @@ tunnel_delete_client (struct MeshTunnel *t, const struct MeshClient *c) /** - * Iterator to free MeshTunnelChildInfo of tunnel children. + * @brief Iterator to destroy MeshTunnelChildInfo of tunnel children. + * + * Destroys queue elements of all waiting transmissions and frees all memory + * used by the struct and its elements. * * @param cls Closure (tunnel info). * @param key Hash of GNUNET_PEER_Id (unused). @@ -3032,7 +3136,22 @@ tunnel_destroy_child (void *cls, const struct GNUNET_HashCode * key, void *value) { - GNUNET_free (value); + struct MeshTunnelChildInfo *cinfo = value; + struct MeshTunnel *t = cls; + unsigned int c; + unsigned int i; + + for (c = 0; c < cinfo->send_buffer_n; c++) + { + i = (cinfo->send_buffer_start + c) % t->fwd_queue_max; + if (NULL != cinfo->send_buffer[i]) + queue_destroy (cinfo->send_buffer[i], GNUNET_YES); + else + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u %u\n", c, cinfo->send_buffer_n); + } + GNUNET_free_non_null (cinfo->send_buffer); + GNUNET_free (cinfo); return GNUNET_YES; } @@ -3154,6 +3273,26 @@ tunnel_add_path (struct MeshTunnel *t, struct MeshPeerPath *p, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tunnel_add_path END\n"); } +/** + * Add a client to a tunnel, initializing all needed data structures. + * + * @param t Tunnel to which add the client. + * @param c Client which to add to the tunnel. + */ +static void +tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c) +{ + struct MeshTunnelClientInfo clinfo; + + GNUNET_array_append (t->clients, t->nclients, c); + clinfo.fwd_ack = t->fwd_pid + 1; + clinfo.bck_ack = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE - 1; + clinfo.fwd_pid = t->fwd_pid; + clinfo.bck_pid = (uint32_t) -1; // Expected next: 0 + t->nclients--; + GNUNET_array_append (t->clients_fc, t->nclients, clinfo); +} + /** * Notifies a tunnel that a connection has broken that affects at least @@ -3197,7 +3336,7 @@ tunnel_notify_connection_broken (struct MeshTunnel *t, GNUNET_PEER_Id p1, msg.peer1 = my_full_id; GNUNET_PEER_resolve (pid, &msg.peer2); GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &neighbor); - send_message (&msg.header, &neighbor, t); + send_prebuilt_message (&msg.header, &neighbor, t); } } return pid; @@ -3216,19 +3355,21 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) struct MeshData *mdata = cls; struct MeshTransmissionDescriptor *info; struct GNUNET_PeerIdentity neighbor; + struct GNUNET_MessageHeader *msg; info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); info->mesh_data = mdata; - (*(mdata->reference_counter)) ++; + (mdata->reference_counter) ++; info->destination = neighbor_id; GNUNET_PEER_resolve (neighbor_id, &neighbor); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending to %s...\n", GNUNET_i2s (&neighbor)); info->peer = peer_info_get (&neighbor); GNUNET_assert (NULL != info->peer); + msg = (struct GNUNET_MessageHeader *) mdata->data; queue_add(info, - GNUNET_MESSAGE_TYPE_MESH_MULTICAST, + ntohs (msg->type), info->mesh_data->data_len, info->peer, mdata->t); @@ -3236,25 +3377,23 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) /** - * Send a message in a tunnel in multicast, sending a copy to each child node + * Queue a message in a tunnel in multicast, sending a copy to each child node * down the local one in the tunnel tree. * * @param t Tunnel in which to send the data. * @param msg Message to be sent. - * @param internal Has the service generated this message? */ static void tunnel_send_multicast (struct MeshTunnel *t, - const struct GNUNET_MessageHeader *msg, - int internal) + const struct GNUNET_MessageHeader *msg) { struct MeshData *mdata; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending a multicast packet...\n"); + mdata = GNUNET_malloc (sizeof (struct MeshData)); mdata->data_len = ntohs (msg->size); - mdata->reference_counter = GNUNET_malloc (sizeof (unsigned int)); mdata->t = t; mdata->data = GNUNET_malloc (mdata->data_len); memcpy (mdata->data, msg, mdata->data_len); @@ -3263,6 +3402,21 @@ tunnel_send_multicast (struct MeshTunnel *t, struct GNUNET_MESH_Multicast *mcast; mcast = (struct GNUNET_MESH_Multicast *) mdata->data; + if (t->fwd_queue_n >= t->fwd_queue_max) + { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " queue full!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " message from %s!\n", + GNUNET_i2s(&mcast->oid)); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " message at %s!\n", + GNUNET_i2s(&my_full_id)); + GNUNET_free (mdata->data); + GNUNET_free (mdata); + return; + } + t->fwd_queue_n++; mcast->ttl = htonl (ntohl (mcast->ttl) - 1); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " data packet, ttl: %u\n", ntohl (mcast->ttl)); @@ -3271,30 +3425,19 @@ tunnel_send_multicast (struct MeshTunnel *t, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " not a data packet, no ttl\n"); } - if (NULL != t->owner && GNUNET_YES != t->owner->shutting_down - && GNUNET_NO == internal) - { - mdata->task = GNUNET_malloc (sizeof (GNUNET_SCHEDULER_TaskIdentifier)); - (*(mdata->task)) = - GNUNET_SCHEDULER_add_delayed (unacknowledged_wait_time, &client_allow_send, - mdata); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "timeout task %u\n", - *(mdata->task)); - } tree_iterate_children (t->tree, &tunnel_send_multicast_iterator, mdata); - if (*(mdata->reference_counter) == 0) + if (mdata->reference_counter == 0) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " no one to send data to\n"); GNUNET_free (mdata->data); - GNUNET_free (mdata->reference_counter); - if (NULL != mdata->task) - { - GNUNET_SCHEDULER_cancel(*(mdata->task)); - GNUNET_free (mdata->task); - GNUNET_SERVER_receive_done (t->owner->handle, GNUNET_OK); - } - // FIXME change order? GNUNET_free (mdata); + t->fwd_queue_n--; + } + else + { + mdata->total_out = mdata->reference_counter; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending a multicast packet done\n"); @@ -3330,6 +3473,76 @@ tunnel_add_skip (void *cls, } +/** + * @brief Get neighbor's Flow Control information. + * + * Retrieves the MeshTunnelChildInfo containing Flow Control data about a direct + * descendant of the local node in a certain tunnel. + * If the info is not yet there (recently created path), creates the data struct + * and inserts it into the tunnel info, initialized to the current tunnel ACK + * values. + * + * @param t Tunnel related. + * @param peer Neighbor whose Flow Control info is needed. + * + * @return Neighbor's Flow Control info. + */ +static struct MeshTunnelChildInfo * +tunnel_get_neighbor_fc (const struct MeshTunnel *t, + const struct GNUNET_PeerIdentity *peer) +{ + struct MeshTunnelChildInfo *cinfo; + cinfo = GNUNET_CONTAINER_multihashmap_get (t->children_fc, + &peer->hashPubKey); + if (NULL == cinfo) + { + uint32_t delta; + + cinfo = GNUNET_malloc (sizeof (struct MeshTunnelChildInfo)); + cinfo->id = GNUNET_PEER_intern (peer); + cinfo->skip = t->fwd_pid; + + delta = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE - 1; + cinfo->fwd_ack = t->fwd_pid + delta; + cinfo->bck_ack = delta; + + cinfo->send_buffer = + GNUNET_malloc (sizeof(struct MeshPeerQueue *) * t->fwd_queue_max); + + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (t->children_fc, + &peer->hashPubKey, + cinfo, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + } + return cinfo; +} + + +/** + * Get the Flow Control info of a client. + * + * @param t Tunnel on which to look. + * @param c Client whose ACK to get. + * + * @return ACK value. + */ +static struct MeshTunnelClientInfo * +tunnel_get_client_fc (struct MeshTunnel *t, + struct MeshClient *c) +{ + unsigned int i; + + for (i = 0; i < t->nclients; i++) + { + if (t->clients[i] != c) + continue; + return &t->clients_fc[i]; + } + GNUNET_assert (0); + return NULL; // avoid compiler / coverity complaints +} + /** * Iterator to get the appropiate ACK value from all children nodes. @@ -3338,8 +3551,8 @@ tunnel_add_skip (void *cls, * @param id Id of the child node. */ static void -tunnel_get_child_ack (void *cls, - GNUNET_PEER_Id id) +tunnel_get_child_fwd_ack (void *cls, + GNUNET_PEER_Id id) { struct GNUNET_PeerIdentity peer_id; struct MeshTunnelChildInfo *cinfo; @@ -3348,28 +3561,15 @@ tunnel_get_child_ack (void *cls, uint32_t ack; GNUNET_PEER_resolve (id, &peer_id); - cinfo = GNUNET_CONTAINER_multihashmap_get (t->children_fc, - &peer_id.hashPubKey); - if (NULL == cinfo) - { - cinfo = GNUNET_malloc (sizeof (struct MeshTunnelChildInfo)); - cinfo->id = id; - cinfo->pid = t->pid; - cinfo->skip = t->pid; - cinfo->max_pid = ack = t->pid + 1; - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put(t->children_fc, - &peer_id.hashPubKey, - cinfo, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); - } - else - { - ack = cinfo->max_pid; - } + cinfo = tunnel_get_neighbor_fc (t, &peer_id); + ack = cinfo->fwd_ack; - if (0 == ctx->max_child_ack) + ctx->nchildren++; + if (GNUNET_NO == ctx->init) + { ctx->max_child_ack = ack; + ctx->init = GNUNET_YES; + } if (GNUNET_YES == t->speed_min) { @@ -3390,44 +3590,34 @@ tunnel_get_child_ack (void *cls, * * @param t Tunnel. * - * @return Maximum PID allowed (uint32 MAX), -1 if node has no children. + * @return Maximum PID allowed (uint32 MAX), -1LL if node has no children. */ static int64_t -tunnel_get_children_ack (struct MeshTunnel *t) +tunnel_get_children_fwd_ack (struct MeshTunnel *t) { struct MeshTunnelChildIteratorContext ctx; ctx.t = t; ctx.max_child_ack = 0; ctx.nchildren = 0; - tree_iterate_children (t->tree, tunnel_get_child_ack, &ctx); + ctx.init = GNUNET_NO; + tree_iterate_children (t->tree, tunnel_get_child_fwd_ack, &ctx); if (0 == ctx.nchildren) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " tunnel has no children, no FWD ACK\n"); return -1LL; + } - return (int64_t) ctx.max_child_ack; -} - - -/** - * Add a client to a tunnel, initializing all needed data structures. - * - * @param t Tunnel to which add the client. - * @param c Client which to add to the tunnel. - */ -static void -tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c) -{ - uint32_t ack; + if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ctx.max_child_ack, t->fwd_pid)) + ctx.max_child_ack = t->fwd_pid + 1; // Might overflow, it's ok. - GNUNET_array_append (t->clients, t->nclients, c); - t->nclients--; - ack = t->pid + 1; - GNUNET_array_append (t->clients_acks, t->nclients, ack); + return (int64_t) ctx.max_child_ack; } /** - * Set the ACK value of a client in a particular tunnel. + * Set the FWD ACK value of a client in a particular tunnel. * * @param t Tunnel affected. * @param c Client whose ACK to set. @@ -3444,38 +3634,13 @@ tunnel_set_client_fwd_ack (struct MeshTunnel *t, { if (t->clients[i] != c) continue; - t->clients_acks[i] = ack; + t->clients_fc[i].fwd_ack = ack; return; } GNUNET_break (0); } -/** - * Get the ACK value of a client in a particular tunnel. - * - * @param t Tunnel on which to look. - * @param c Client whose ACK to get. - * - * @return ACK value. - */ -uint32_t // FIXME static when used!! -tunnel_get_client_ack (struct MeshTunnel *t, - struct MeshClient *c) -{ - unsigned int i; - - for (i = 0; i < t->nclients; i++) - { - if (t->clients[i] != c) - continue; - return t->clients_acks[i]; - } - GNUNET_break (0); - return t->clients_acks[0]; -} - - /** * Get the highest ACK value of all clients in a particular tunnel, * according to the buffering/speed settings. @@ -3483,33 +3648,42 @@ tunnel_get_client_ack (struct MeshTunnel *t, * @param t Tunnel on which to look. * * @return Corresponding ACK value (max uint32_t). - * If no clients are suscribed, -1. + * If no clients are suscribed, -1LL. */ static int64_t -tunnel_get_clients_ack (struct MeshTunnel *t) +tunnel_get_clients_fwd_ack (struct MeshTunnel *t) { unsigned int i; int64_t ack; - for (ack = -1, i = 0; i < t->nclients; i++) + if (0 == t->nclients) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " tunnel has no clients, no FWD ACK\n"); + return -1LL; + } + + for (ack = -1LL, i = 0; i < t->nclients; i++) { - if (-1 == ack || - (GNUNET_YES == t->speed_min && t->clients_acks[i] < ack) || - (GNUNET_NO == t->speed_min && t->clients_acks[i] > ack)) + if (-1LL == ack || + (GNUNET_YES == t->speed_min && + GNUNET_YES == GMC_is_pid_bigger (ack, t->clients_fc[i].fwd_ack)) || + (GNUNET_NO == t->speed_min && + GNUNET_YES == GMC_is_pid_bigger (t->clients_fc[i].fwd_ack, ack))) { - ack = t->clients_acks[i]; + ack = t->clients_fc[i].fwd_ack; } } - if (GNUNET_YES == t->nobuffer && ack > t->pid) - ack = t->pid + 1; + if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ack, t->fwd_pid)) + ack = (uint32_t) t->fwd_pid + 1; // Might overflow, it's ok. return (uint32_t) ack; } /** - * Get the current ack value for a tunnel, taking in account the tunnel + * Get the current fwd ack value for a tunnel, taking in account the tunnel * mode and the status of all children nodes. * * @param t Tunnel. @@ -3519,55 +3693,114 @@ tunnel_get_clients_ack (struct MeshTunnel *t) static uint32_t tunnel_get_fwd_ack (struct MeshTunnel *t) { + uint32_t ack; uint32_t count; uint32_t buffer_free; int64_t child_ack; int64_t client_ack; - uint32_t ack; - count = t->pid - t->skip; - buffer_free = t->queue_max - t->queue_n; - ack = count + buffer_free; // Might overflow 32bits, it's ok! - child_ack = tunnel_get_children_ack (t); - client_ack = tunnel_get_clients_ack (t); - if (-1 == child_ack) + count = t->fwd_pid - t->skip; + buffer_free = t->fwd_queue_max - t->fwd_queue_n; + ack = count; + child_ack = tunnel_get_children_fwd_ack (t); + client_ack = tunnel_get_clients_fwd_ack (t); + if (-1LL == child_ack) { // Node has no children, child_ack AND core buffer are irrelevant. - GNUNET_break (-1 != client_ack); // No children and no clients? Not good! + GNUNET_break (-1LL != client_ack); // No children AND no clients? Not good! return (uint32_t) client_ack; } - + if (-1LL == client_ack) + { + client_ack = ack + buffer_free; // Might overflow 32 bits, it's ok! + } if (GNUNET_YES == t->speed_min) { - ack = min_pid (child_ack, ack); - ack = min_pid (client_ack, ack); + ack = GMC_min_pid ((uint32_t) child_ack, ack) + buffer_free; // Might overflow 32 bits, it's ok!; + ack = GMC_min_pid ((uint32_t) client_ack, ack); } else { - ack = max_pid (child_ack, ack); - ack = max_pid (client_ack, ack); + ack = GMC_max_pid ((uint32_t) child_ack, ack) + buffer_free; // Might overflow 32 bits, it's ok!; + ack = GMC_max_pid ((uint32_t) client_ack, ack); } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "c %u, bf %u, ch %u, cl %u\n", - count, buffer_free, child_ack, client_ack); + if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ack, t->fwd_pid)) + ack = t->fwd_pid + 1; // Might overflow 32 bits, it's ok! + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "c %u, bf %u, ch %lld, cl %lld, ACK: %u\n", + count, buffer_free, child_ack, client_ack, ack); return ack; } + /** - * Get the current ack value for a tunnel, taking in account the tunnel - * mode and the status of all children nodes. - * - * @param t Tunnel. + * Build a local ACK message and send it to a local client. + * + * @param t Tunnel on which to send the ACK. + * @param c Client to whom send the ACK. + * @param ack Value of the ACK. + */ +static void +send_local_ack (struct MeshTunnel *t, struct MeshClient *c, uint32_t ack) +{ + struct GNUNET_MESH_LocalAck msg; + + msg.header.size = htons (sizeof (msg)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK); + msg.tunnel_id = htonl (t->owner == c ? t->local_tid : t->local_tid_dest); + msg.max_pid = htonl (ack); + GNUNET_SERVER_notification_context_unicast(nc, + c->handle, + &msg.header, + GNUNET_NO); +} + +/** + * Build an ACK message and queue it to send to the given peer. + * + * @param t Tunnel on which to send the ACK. + * @param peer Peer to whom send the ACK. + * @param ack Value of the ACK. + */ +static void +send_ack (struct MeshTunnel *t, struct GNUNET_PeerIdentity *peer, uint32_t ack) +{ + struct GNUNET_MESH_ACK msg; + + GNUNET_PEER_resolve (t->id.oid, &msg.oid); + msg.header.size = htons (sizeof (msg)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK); + msg.pid = htonl (ack); + msg.tid = htonl (t->id.tid); + + send_prebuilt_message (&msg.header, peer, t); +} + + +/** + * Notify a the owner of a tunnel about how many more + * payload packages will we accept on a given tunnel. * - * @return Maximum PID allowed. + * @param t Tunnel on which to send the ACK. */ -static uint32_t -tunnel_get_bck_ack (struct MeshTunnel *t) +static void +tunnel_send_client_fwd_ack (struct MeshTunnel *t) { uint32_t ack; - ack = 0; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending client FWD ACK on tunnel %X\n", + t->local_tid); + + ack = tunnel_get_fwd_ack (t); - return ack; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ack); + if (t->last_fwd_ack == ack) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending!\n"); + t->last_fwd_ack = ack; + send_local_ack (t, t->owner, ack); } @@ -3585,57 +3818,141 @@ tunnel_get_bck_ack (struct MeshTunnel *t) static void tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) { - struct GNUNET_MESH_ACK msg; struct GNUNET_PeerIdentity id; uint32_t ack; if (NULL != t->owner) { - send_client_tunnel_ack (t->owner, t); + tunnel_send_client_fwd_ack (t); return; } /* Is it after unicast / multicast retransmission? */ - if (GNUNET_MESSAGE_TYPE_MESH_ACK != type) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ACK via DATA retransmission\n"); - if (GNUNET_YES == t->nobuffer) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n"); - return; - } - if (t->queue_max > t->queue_n * 2) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n"); - return; - } - } - - /* Ok, ACK might be necessary, what PID to ACK? */ - ack = tunnel_get_fwd_ack (t); - - /* If speed_min and not all children have ack'd, dont send yet */ - if (ack == t->last_ack) + switch (type) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, not ready\n"); - return; + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ACK due to FWD DATA retransmission\n"); + if (GNUNET_YES == t->nobuffer) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n"); + return; + } + break; + case GNUNET_MESSAGE_TYPE_MESH_ACK: + case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK: + break; + default: + GNUNET_break (0); } - t->last_ack = ack; - msg.pid = htonl (ack); + /* Check if we need no retransmit the ACK */ + if (t->fwd_queue_max > t->fwd_queue_n * 4 && + GMC_is_pid_bigger(t->last_fwd_ack, t->fwd_pid)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " t->qmax: %u, t->qn: %u\n", + t->fwd_queue_max, t->fwd_queue_n); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " t->pid: %u, t->ack: %u\n", + t->fwd_pid, t->last_fwd_ack); + return; + } + + /* Ok, ACK might be necessary, what PID to ACK? */ + ack = tunnel_get_fwd_ack (t); + + /* If speed_min and not all children have ack'd, dont send yet */ + if (ack == t->last_fwd_ack) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not ready\n"); + return; + } + t->last_fwd_ack = ack; GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id); + send_ack (t, &id, ack); + debug_fwd_ack++; +} - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK); - msg.tid = htonl (t->id.tid); - GNUNET_PEER_resolve(t->id.oid, &msg.oid); - send_message (&msg.header, &id, t); + +/** + * Iterator to send a child node a BCK ACK to allow him to send more + * to_origin data. + * + * @param cls Closure (tunnel). + * @param id Id of the child node. + */ +static void +tunnel_send_child_bck_ack (void *cls, + GNUNET_PEER_Id id) +{ + struct MeshTunnel *t = cls; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_PeerIdentity peer; + + GNUNET_PEER_resolve (id, &peer); + cinfo = tunnel_get_neighbor_fc (t, &peer); + + if (cinfo->bck_ack != cinfo->pid && + GNUNET_NO == GMC_is_pid_bigger (cinfo->bck_ack, cinfo->pid)) + return; + + cinfo->bck_ack++; + send_ack (t, &peer, cinfo->bck_ack); +} + + +/** + * @brief Send BCK ACKs to clients to allow them more to_origin traffic + * + * Iterates over all clients and sends BCK ACKs to the ones that need it. + * + * @param t Tunnel on which to send the BCK ACKs. + */ +static void +tunnel_send_clients_bck_ack (struct MeshTunnel *t) +{ + unsigned int i; + unsigned int tunnel_delta; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Sending BCK ACK to clients\n"); + + tunnel_delta = t->bck_ack - t->bck_pid; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " tunnel delta: %u\n", tunnel_delta); + + /* Find client whom to allow to send to origin (with lowest buffer space) */ + for (i = 0; i < t->nclients; i++) + { + struct MeshTunnelClientInfo *clinfo; + unsigned int delta; + + clinfo = &t->clients_fc[i]; + delta = clinfo->bck_ack - clinfo->bck_pid; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " client %u delta: %u\n", + t->clients[i]->id, delta); + + if ((GNUNET_NO == t->nobuffer && tunnel_delta > delta) || + (GNUNET_YES == t->nobuffer && 0 == delta)) + { + uint32_t ack; + + ack = clinfo->bck_pid; + ack += t->nobuffer ? 1 : tunnel_delta; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " sending ack to client %u: %u\n", + t->clients[i]->id, ack); + send_local_ack (t, t->clients[i], ack); + clinfo->bck_ack = ack; + } + } } /** - * Send an ACK informing the children nodes about the available buffer space. - * In case there is no child node, inform the destination clients. + * Send an ACK informing the children nodes and destination clients about + * the available buffer space. * If buffering is off, send only on behalf of root (can be self). * If buffering is on, send when sent to predecessor and buffer space is free. * Note that although the name is bck_ack, the BCK mean backwards *traffic*, @@ -3647,51 +3964,112 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) static void tunnel_send_bck_ack (struct MeshTunnel *t, uint16_t type) { - struct GNUNET_MESH_ACK msg; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending BCK ACK on tunnel %u [%u] due to %s\n", + t->id.oid, t->id.tid, GNUNET_MESH_DEBUG_M2S(type)); + /* Is it after data to_origin retransmission? */ + switch (type) + { + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + if (GNUNET_YES == t->nobuffer) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " Not sending ACK, nobuffer\n"); + return; + } + break; + case GNUNET_MESSAGE_TYPE_MESH_ACK: + case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK: + break; + default: + GNUNET_break (0); + } + + tunnel_send_clients_bck_ack (t); + tree_iterate_children (t->tree, &tunnel_send_child_bck_ack, t); +} + + +/** + * @brief Re-initiate traffic to this peer if necessary. + * + * Check if there is traffic queued towards this peer + * and the core transmit handle is NULL (traffic was stalled). + * If so, call core tmt rdy. + * + * @param cls Closure (unused) + * @param peer_id Short ID of peer to which initiate traffic. + */ +static void +peer_unlock_queue(void *cls, GNUNET_PEER_Id peer_id) +{ + struct MeshPeerInfo *peer; struct GNUNET_PeerIdentity id; - uint32_t ack; + struct MeshPeerQueue *q; + size_t size; - if (NULL != t->owner) - { - send_client_tunnel_ack (t->owner, t); + peer = peer_info_get_short(peer_id); + if (NULL != peer->core_transmit) return; - } - /* Is it after unicast / multicast retransmission? */ - if (GNUNET_MESSAGE_TYPE_MESH_ACK != type) + + q = queue_get_next(peer); + if (NULL == q) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ACK via DATA retransmission\n"); - if (GNUNET_YES == t->nobuffer) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n"); - return; - } - if (t->queue_max > t->queue_n * 2) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n"); - return; - } + /* Might br multicast traffic already sent to this particular peer but + * not to other children in this tunnel. + * This way t->queue_n would be > 0 but the queue of this particular peer + * would be empty. + */ + return; } + size = q->size; + GNUNET_PEER_resolve (peer->id, &id); + peer->core_transmit = + GNUNET_CORE_notify_transmit_ready(core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + size, + &queue_send, + peer); + return; +} - /* Ok, ACK might be necessary, what PID to ACK? */ - ack = tunnel_get_bck_ack (t); - /* If speed_min and not all children have ack'd, dont send yet */ - if (ack == t->last_ack) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, not ready\n"); +/** + * @brief Allow transmission of FWD traffic on this tunnel + * + * Check if there is traffic queued towards any children + * and the core transmit handle is NULL, and if so, call core tmt rdy. + * + * @param t Tunnel on which to unlock FWD traffic. + */ +static void +tunnel_unlock_fwd_queues (struct MeshTunnel *t) +{ + if (0 == t->fwd_queue_n) return; - } - t->last_ack = ack; - msg.pid = htonl (ack); + tree_iterate_children (t->tree, &peer_unlock_queue, NULL); +} - GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id); - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK); - msg.tid = htonl (t->id.tid); - GNUNET_PEER_resolve(t->id.oid, &msg.oid); - send_message (&msg.header, &id, t); +/** + * @brief Allow transmission of BCK traffic on this tunnel + * + * Check if there is traffic queued towards the root of the tree + * and the core transmit handle is NULL, and if so, call core tmt rdy. + * + * @param t Tunnel on which to unlock BCK traffic. + */ +static void +tunnel_unlock_bck_queue (struct MeshTunnel *t) +{ + if (0 == t->bck_queue_n) + return; + + peer_unlock_queue(NULL, tree_get_predecessor(t->tree)); } @@ -3710,7 +4088,7 @@ tunnel_send_destroy (struct MeshTunnel *t) msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY); GNUNET_PEER_resolve (t->id.oid, &msg.oid); msg.tid = htonl (t->id.tid); - tunnel_send_multicast (t, &msg.header, GNUNET_NO); + tunnel_send_multicast (t, &msg.header); } @@ -3734,6 +4112,13 @@ tunnel_cancel_queues (void *cls, GNUNET_PEER_Id neighbor_id) next = pq->next; if (pq->tunnel == t) { + if (GNUNET_MESSAGE_TYPE_MESH_MULTICAST == pq->type || + GNUNET_MESSAGE_TYPE_MESH_UNICAST == pq->type || + GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == pq->type) + { + // Should have been removed on destroy children + GNUNET_break (0); + } queue_destroy (pq, GNUNET_YES); } } @@ -3762,8 +4147,6 @@ tunnel_destroy (struct MeshTunnel *t) if (NULL == t) return GNUNET_OK; - tree_iterate_children (t->tree, &tunnel_cancel_queues, t); - r = GNUNET_OK; c = t->owner; #if MESH_DEBUG @@ -3781,16 +4164,21 @@ tunnel_destroy (struct MeshTunnel *t) GNUNET_CRYPTO_hash (&t->id, sizeof (struct MESH_TunnelID), &hash); if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } - GNUNET_CRYPTO_hash (&t->local_tid, sizeof (MESH_TunnelNumber), &hash); - if (NULL != c && - GNUNET_YES != - GNUNET_CONTAINER_multihashmap_remove (c->own_tunnels, &hash, t)) + if (NULL != c) { - r = GNUNET_SYSERR; + GNUNET_CRYPTO_hash (&t->local_tid, sizeof (MESH_TunnelNumber), &hash); + if (GNUNET_YES != + GNUNET_CONTAINER_multihashmap_remove (c->own_tunnels, &hash, t)) + { + GNUNET_break (0); + r = GNUNET_SYSERR; + } } + GNUNET_CRYPTO_hash (&t->local_tid_dest, sizeof (MESH_TunnelNumber), &hash); for (i = 0; i < t->nclients; i++) { @@ -3798,6 +4186,7 @@ tunnel_destroy (struct MeshTunnel *t) if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (c->incoming_tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } } @@ -3807,18 +4196,22 @@ tunnel_destroy (struct MeshTunnel *t) if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (c->ignore_tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } } + if (t->nclients > 0) { if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (incoming_tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } GNUNET_free (t->clients); } + if (NULL != t->peers) { GNUNET_CONTAINER_multihashmap_iterate (t->peers, &peer_info_delete_tunnel, @@ -3831,6 +4224,8 @@ tunnel_destroy (struct MeshTunnel *t) t); GNUNET_CONTAINER_multihashmap_destroy (t->children_fc); + tree_iterate_children (t->tree, &tunnel_cancel_queues, t); + tree_destroy (t->tree); if (NULL != t->regex_ctx) @@ -3868,17 +4263,21 @@ tunnel_new (GNUNET_PEER_Id owner, { struct MeshTunnel *t; struct GNUNET_HashCode hash; - + if (n_tunnels >= max_tunnels && NULL == client) return NULL; t = GNUNET_malloc (sizeof (struct MeshTunnel)); t->id.oid = owner; t->id.tid = tid; - t->queue_max = (max_msgs_queue / max_tunnels) + 1; + t->fwd_queue_max = (max_msgs_queue / max_tunnels) + 1; + t->bck_queue_max = t->fwd_queue_max; t->tree = tree_new (owner); t->owner = client; - t->root_client_ack = 1; + t->fwd_pid = (uint32_t) -1; // Next (expected) = 0 + t->bck_pid = (uint32_t) -1; // Next (expected) = 0 + t->bck_ack = INITIAL_WINDOW_SIZE - 1; + t->last_fwd_ack = INITIAL_WINDOW_SIZE - 1; t->local_tid = local; t->children_fc = GNUNET_CONTAINER_multihashmap_create (8); n_tunnels++; @@ -3943,14 +4342,13 @@ tunnel_delete_peer (struct MeshTunnel *t, GNUNET_PEER_Id peer) * @param key the hash of the local tunnel id (used to access the hashmap) * @param value the value stored at the key (tunnel to destroy) * - * @return GNUNET_OK on success + * @return GNUNET_OK, keep iterating. */ static int tunnel_destroy_iterator (void *cls, const struct GNUNET_HashCode * key, void *value) { struct MeshTunnel *t = value; struct MeshClient *c = cls; - int r; send_client_tunnel_disconnect(t, c); if (c != t->owner) @@ -3962,8 +4360,10 @@ tunnel_destroy_iterator (void *cls, const struct GNUNET_HashCode * key, void *va return GNUNET_OK; } tunnel_send_destroy(t); - r = tunnel_destroy (t); - return r; + t->owner = NULL; + t->destroy = GNUNET_YES; + + return GNUNET_OK; } @@ -3977,10 +4377,15 @@ static void tunnel_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct MeshTunnel *t = cls; + struct GNUNET_PeerIdentity id; + t->timeout_task = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - t->timeout_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_PEER_resolve(t->id.oid, &id); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Tunnel %s [%X] timed out. Destroying.\n", + GNUNET_i2s(&id), t->id.tid); tunnel_destroy (t); } @@ -3988,6 +4393,8 @@ tunnel_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * Resets the tunnel timeout. Starts it if no timeout was running. * * @param t Tunnel whose timeout to reset. + * + * TODO use heap to improve efficiency of scheduler. */ static void tunnel_reset_timeout (struct MeshTunnel *t) @@ -4039,11 +4446,13 @@ send_core_path_create (void *cls, size_t size, void *buf) msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE); msg->tid = ntohl (t->id.tid); + opt = 0; if (GNUNET_YES == t->speed_min) - opt = MESH_TUNNEL_OPT_SPEED_MIN; + opt |= MESH_TUNNEL_OPT_SPEED_MIN; if (GNUNET_YES == t->nobuffer) opt |= MESH_TUNNEL_OPT_NOBUFFER; msg->opt = htonl(opt); + msg->reserved = 0; peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1]; for (i = 0; i < p->length; i++) @@ -4099,14 +4508,15 @@ send_core_data_multicast (void *cls, size_t size, void *buf) mc = (struct GNUNET_MESH_Multicast *) mh; mh = (struct GNUNET_MessageHeader *) &mc[1]; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " multicast, payload type %u\n", ntohs (mh->type)); + " multicast, payload type %s\n", + GNUNET_MESH_DEBUG_M2S (ntohs (mh->type))); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " multicast, payload size %u\n", ntohs (mh->size)); } else { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type %u\n", - ntohs (mh->type)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type %s\n", + GNUNET_MESH_DEBUG_M2S (ntohs (mh->type))); } } #endif @@ -4164,36 +4574,161 @@ queue_destroy (struct MeshPeerQueue *queue, int clear_cls) { struct MeshTransmissionDescriptor *dd; struct MeshPathInfo *path_info; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_PeerIdentity id; + unsigned int i; + unsigned int max; if (GNUNET_YES == clear_cls) { switch (queue->type) { - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type payload\n"); + case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " cancelling TUNNEL_DESTROY\n"); + /* fall through */ + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " type prebuilt (payload, tunnel destroy)\n"); dd = queue->cls; data_descriptor_decrement_rc (dd->mesh_data); break; - case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: + case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type create path\n"); path_info = queue->cls; path_destroy (path_info->path); break; - default: + default: GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type unknown!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " type %s unknown!\n", + GNUNET_MESH_DEBUG_M2S(queue->type)); } GNUNET_free_non_null (queue->cls); } GNUNET_CONTAINER_DLL_remove (queue->peer->queue_head, queue->peer->queue_tail, queue); + + /* Delete from child_fc in the appropiate tunnel */ + max = queue->tunnel->fwd_queue_max; + GNUNET_PEER_resolve (queue->peer->id, &id); + cinfo = tunnel_get_neighbor_fc (queue->tunnel, &id); + for (i = 0; i < cinfo->send_buffer_n; i++) + { + unsigned int i2; + i2 = (cinfo->send_buffer_start + i) % max; + if (cinfo->send_buffer[i2] == queue) + { + /* Found corresponding entry in the send_buffer. Move all others back. */ + unsigned int j; + unsigned int j2; + unsigned int j3; + + for (j = i, j2 = 0, j3 = 0; j < cinfo->send_buffer_n - 1; j++) + { + j2 = (cinfo->send_buffer_start + j) % max; + j3 = (cinfo->send_buffer_start + j + 1) % max; + cinfo->send_buffer[j2] = cinfo->send_buffer[j3]; + } + + cinfo->send_buffer[j3] = NULL; + + cinfo->send_buffer_n--; + } + } + //queue-> + GNUNET_free (queue); } +/** + * @brief Get the next transmittable message from the queue. + * + * This will be the head, except in the case of being a data packet + * not allowed by the destination peer. + * + * @param peer Destination peer. + * + * @return The next viable MeshPeerQueue element to send to that peer. + * NULL when there are no transmittable messages. + */ +struct MeshPeerQueue * +queue_get_next (const struct MeshPeerInfo *peer) +{ + struct MeshPeerQueue *q; + struct MeshTunnel *t; + struct MeshTransmissionDescriptor *info; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_MESH_Unicast *ucast; + struct GNUNET_MESH_ToOrigin *to_orig; + struct GNUNET_MESH_Multicast *mcast; + struct GNUNET_PeerIdentity id; + uint32_t pid; + uint32_t ack; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* selecting message\n"); + for (q = peer->queue_head; NULL != q; q = q->next) + { + t = q->tunnel; + info = q->cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* %s\n", + GNUNET_MESH_DEBUG_M2S(q->type)); + switch (q->type) + { + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + ucast = (struct GNUNET_MESH_Unicast *) info->mesh_data->data; + pid = ntohl (ucast->pid); + GNUNET_PEER_resolve (info->peer->id, &id); + cinfo = tunnel_get_neighbor_fc(t, &id); + ack = cinfo->fwd_ack; + break; + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + to_orig = (struct GNUNET_MESH_ToOrigin *) info->mesh_data->data; + pid = ntohl (to_orig->pid); + ack = t->bck_ack; + break; + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + mcast = (struct GNUNET_MESH_Multicast *) info->mesh_data->data; + if (GNUNET_MESSAGE_TYPE_MESH_MULTICAST != ntohs(mcast->header.type)) + { + // Not a multicast payload: multicast control traffic (destroy, etc) + return q; + } + pid = ntohl (mcast->pid); + GNUNET_PEER_resolve (info->peer->id, &id); + cinfo = tunnel_get_neighbor_fc(t, &id); + ack = cinfo->fwd_ack; + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* OK!\n"); + return q; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* ACK: %u, PID: %u\n", + ack, pid); + if (GNUNET_NO == GMC_is_pid_bigger(pid, ack)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* OK!\n"); + return q; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* NEXT!\n"); + } + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* nothing found\n"); + return NULL; +} + + /** * Core callback to write a queued packet to core buffer * @@ -4210,92 +4745,166 @@ queue_send (void *cls, size_t size, void *buf) struct GNUNET_MessageHeader *msg; struct MeshPeerQueue *queue; struct MeshTunnel *t; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_PeerIdentity dst_id; size_t data_size; peer->core_transmit = NULL; - queue = peer->queue_head; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* Queue send\n"); + queue = queue_get_next (peer); - /* If queue is empty, send should have been cancelled */ + /* Queue has no internal mesh traffic nor sendable payload */ if (NULL == queue) { - GNUNET_break(0); - return 0; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not ready, return\n"); + if (NULL == peer->queue_head) + GNUNET_break (0); // Should've been canceled + return 0; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not empty\n"); + GNUNET_PEER_resolve (peer->id, &dst_id); /* Check if buffer size is enough for the message */ if (queue->size > size) { - struct GNUNET_PeerIdentity id; - - GNUNET_PEER_resolve (peer->id, &id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* not enough room, reissue\n"); peer->core_transmit = - GNUNET_CORE_notify_transmit_ready(core_handle, - 0, - 0, - GNUNET_TIME_UNIT_FOREVER_REL, - &id, - queue->size, - &queue_send, - peer); + GNUNET_CORE_notify_transmit_ready (core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &dst_id, + queue->size, + &queue_send, + peer); return 0; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* size ok\n"); t = queue->tunnel; - t->queue_n--; + if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == queue->type) + { + t->fwd_queue_n--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* unicast: t->q (%u/%u)\n", + t->fwd_queue_n, t->fwd_queue_max); + } + else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == queue->type) + { + t->bck_queue_n--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* to origin\n"); + } /* Fill buf */ switch (queue->type) { - case 0: // RAW data (preconstructed message, retransmission, etc) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* raw\n"); - data_size = send_core_data_raw (queue->cls, size, buf); - msg = (struct GNUNET_MessageHeader *) buf; - switch (ntohs (msg->type)) // Type of payload - { - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST); - break; - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - tunnel_send_bck_ack(t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); - break; - default: - break; - } - break; - case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* multicast\n"); - data_size = send_core_data_multicast(queue->cls, size, buf); - tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST); - break; - case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path create\n"); - data_size = send_core_path_create(queue->cls, size, buf); + case 0: + case GNUNET_MESSAGE_TYPE_MESH_ACK: + case GNUNET_MESSAGE_TYPE_MESH_PATH_BROKEN: + case GNUNET_MESSAGE_TYPE_MESH_PATH_DESTROY: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* raw: %s\n", + GNUNET_MESH_DEBUG_M2S (queue->type)); + /* Fall through */ + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + data_size = send_core_data_raw (queue->cls, size, buf); + msg = (struct GNUNET_MessageHeader *) buf; + switch (ntohs (msg->type)) // Type of preconstructed message + { + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST); break; - case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path ack\n"); - data_size = send_core_path_ack(queue->cls, size, buf); + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); break; - default: - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* type unknown\n"); - data_size = 0; + default: + break; + } + break; + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* multicast\n"); + { + struct MeshTransmissionDescriptor *info = queue->cls; + + if ((1 == info->mesh_data->reference_counter + && GNUNET_YES == t->speed_min) + || + (info->mesh_data->total_out == info->mesh_data->reference_counter + && GNUNET_NO == t->speed_min)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* considered sent\n"); + t->fwd_queue_n--; + } + } + data_size = send_core_data_multicast(queue->cls, size, buf); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path create\n"); + data_size = send_core_path_create (queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path ack\n"); + data_size = send_core_path_ack (queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path keepalive\n"); + data_size = send_core_data_multicast (queue->cls, size, buf); + break; + default: + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* type unknown: %u\n", + queue->type); + data_size = 0; + } + switch (queue->type) + { + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + cinfo = tunnel_get_neighbor_fc (t, &dst_id); + if (cinfo->send_buffer[cinfo->send_buffer_start] != queue) + { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "at pos %u (%p) != %p\n", + cinfo->send_buffer_start, + cinfo->send_buffer[cinfo->send_buffer_start], + queue); + } + if (cinfo->send_buffer_n > 0) + { + cinfo->send_buffer[cinfo->send_buffer_start] = NULL; + cinfo->send_buffer_n--; + cinfo->send_buffer_start++; + cinfo->send_buffer_start %= t->fwd_queue_max; + } + else + { + GNUNET_break (0); + } + break; + default: + break; } /* Free queue, but cls was freed by send_core_* */ queue_destroy (queue, GNUNET_NO); - if (GNUNET_YES == t->destroy && 0 == t->queue_n) + if (GNUNET_YES == t->destroy && 0 == t->fwd_queue_n) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* destroying tunnel!\n"); tunnel_destroy (t); } /* If more data in queue, send next */ - if (NULL != peer->queue_head) + queue = queue_get_next(peer); + if (NULL != queue) { struct GNUNET_PeerIdentity id; @@ -4307,19 +4916,34 @@ queue_send (void *cls, size_t size, void *buf) 0, GNUNET_TIME_UNIT_FOREVER_REL, &id, - peer->queue_head->size, + queue->size, &queue_send, peer); } + else + { + if (NULL != peer->queue_head) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "********* %s stalled\n", + GNUNET_i2s(&my_full_id)); + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* return %d\n", data_size); return data_size; } /** - * Queue and pass message to core when possible. + * @brief Queue and pass message to core when possible. + * + * If type is payload (UNICAST, TO_ORIGIN, MULTICAST) checks for queue status + * and accounts for it. In case the queue is full, the message is dropped and + * a break issued. + * + * Otherwise, message is treated as internal and allowed to go regardless of + * queue status. * - * @param cls Closure (type dependant). + * @param cls Closure (@c type dependant). It will be used by queue_send to + * build the message to be sent if not already prebuilt. * @param type Type of the message, 0 for a raw message. * @param size Size of the message. * @param dst Neighbor to send message to. @@ -4329,39 +4953,81 @@ static void queue_add (void *cls, uint16_t type, size_t size, struct MeshPeerInfo *dst, struct MeshTunnel *t) { - struct MeshPeerQueue *queue; + struct MeshPeerQueue *queue; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_PeerIdentity id; + unsigned int *max; + unsigned int *n; + unsigned int i; - if (t->queue_n >= t->queue_max) + n = NULL; + if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type || + GNUNET_MESSAGE_TYPE_MESH_MULTICAST == type) + { + n = &t->fwd_queue_n; + max = &t->fwd_queue_max; + } + else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type) + { + n = &t->bck_queue_n; + max = &t->bck_queue_max; + } + if (NULL != n) { + if (*n >= *max) { if (NULL == t->owner) GNUNET_break_op(0); // TODO: kill connection? else GNUNET_break(0); + GNUNET_STATISTICS_update(stats, "# messages dropped (buffer full)", + 1, GNUNET_NO); return; // Drop message } - t->queue_n++; - queue = GNUNET_malloc (sizeof (struct MeshPeerQueue)); - queue->cls = cls; - queue->type = type; - queue->size = size; - queue->peer = dst; - queue->tunnel = t; - GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue); - if (NULL == dst->core_transmit) - { - struct GNUNET_PeerIdentity id; + (*n)++; + } + queue = GNUNET_malloc (sizeof (struct MeshPeerQueue)); + queue->cls = cls; + queue->type = type; + queue->size = size; + queue->peer = dst; + queue->tunnel = t; + GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue); + GNUNET_PEER_resolve (dst->id, &id); + if (NULL == dst->core_transmit) + { + dst->core_transmit = + GNUNET_CORE_notify_transmit_ready (core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + size, + &queue_send, + dst); + } + if (NULL == n) // Is this internal mesh traffic? + return; - GNUNET_PEER_resolve (dst->id, &id); - dst->core_transmit = - GNUNET_CORE_notify_transmit_ready(core_handle, - 0, - 0, - GNUNET_TIME_UNIT_FOREVER_REL, - &id, - size, - &queue_send, - dst); - } + // It's payload, keep track of buffer per peer. + cinfo = tunnel_get_neighbor_fc(t, &id); + i = (cinfo->send_buffer_start + cinfo->send_buffer_n) % t->fwd_queue_max; + if (NULL != cinfo->send_buffer[i]) + { + GNUNET_break (cinfo->send_buffer_n == t->fwd_queue_max); // aka i == start + queue_destroy (cinfo->send_buffer[cinfo->send_buffer_start], GNUNET_YES); + cinfo->send_buffer_start++; + cinfo->send_buffer_start %= t->fwd_queue_max; + } + else + { + cinfo->send_buffer_n++; + } + cinfo->send_buffer[i] = queue; + if (cinfo->send_buffer_n > t->fwd_queue_max) + { + GNUNET_break (0); + cinfo->send_buffer_n = t->fwd_queue_max; + } } @@ -4446,14 +5112,22 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_YES : GNUNET_NO; t->nobuffer = (0 != (opt & MESH_TUNNEL_OPT_NOBUFFER)) ? GNUNET_YES : GNUNET_NO; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " speed_min: %d, nobuffer:%d\n", + t->speed_min, t->nobuffer); if (GNUNET_YES == t->nobuffer) - t->queue_max = 1; + { + t->bck_queue_max = 1; + t->fwd_queue_max = 1; + } + // FIXME only assign a local tid if a local client is interested (on demand) while (NULL != tunnel_get_incoming (next_local_tid)) next_local_tid = (next_local_tid + 1) | GNUNET_MESH_LOCAL_TUNNEL_ID_SERV; t->local_tid_dest = next_local_tid++; next_local_tid = next_local_tid | GNUNET_MESH_LOCAL_TUNNEL_ID_SERV; + // FIXME end tunnel_reset_timeout (t); GNUNET_CRYPTO_hash (&t->local_tid_dest, sizeof (MESH_TunnelNumber), &hash); @@ -4506,21 +5180,19 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, /* create path: self not found in path through self */ GNUNET_break_op (0); path_destroy (path); - /* FIXME error. destroy tunnel? leave for timeout? */ - return 0; + tunnel_destroy (t); + return GNUNET_OK; } path_add_to_peers (path, GNUNET_NO); tunnel_add_path (t, path, own_pos); if (own_pos == size - 1) { /* It is for us! Send ack. */ - struct MeshTransmissionDescriptor *info; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " It's for us!\n"); peer_info_add_path_to_origin (orig_peer_info, path, GNUNET_NO); if (NULL == t->peers) { - /* New tunnel! Notify clients on data. */ + /* New tunnel! Notify clients on first payload message. */ t->peers = GNUNET_CONTAINER_multihashmap_create (4); } GNUNET_break (GNUNET_SYSERR != @@ -4529,15 +5201,7 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, peer_info_get (&my_full_id), GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE)); - info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); - info->origin = &t->id; - info->peer = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey); - GNUNET_assert (NULL != info->peer); - queue_add(info, - GNUNET_MESSAGE_TYPE_MESH_PATH_ACK, - sizeof (struct GNUNET_MESH_PathACK), - info->peer, - t); + send_path_ack (t); } else { @@ -4629,7 +5293,7 @@ handle_mesh_path_destroy (void *cls, const struct GNUNET_PeerIdentity *peer, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Own position: %u\n", own_pos); if (own_pos < path->length - 1) - send_message (message, &pi[own_pos + 1], t); + send_prebuilt_message (message, &pi[own_pos + 1], t); else send_client_tunnel_disconnect(t, NULL); @@ -4763,6 +5427,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got a unicast packet from %s\n", GNUNET_i2s (peer)); + /* Check size */ size = ntohs (message->size); if (size < sizeof (struct GNUNET_MESH_Unicast) + @@ -4772,8 +5437,9 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } msg = (struct GNUNET_MESH_Unicast *) message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %u\n", - ntohs (msg[1].header.type)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %s\n", + GNUNET_MESH_DEBUG_M2S (ntohs (msg[1].header.type))); + /* Check tunnel */ t = tunnel_get (&msg->oid, ntohl (msg->tid)); if (NULL == t) { @@ -4783,9 +5449,9 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } pid = ntohl (msg->pid); - if (t->pid == pid) + if (t->fwd_pid == pid) { - GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO); + GNUNET_STATISTICS_update (stats, "# duplicate PID drops", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " Already seen pid %u, DROPPING!\n", pid); return GNUNET_OK; @@ -4795,8 +5461,20 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " pid %u not seen yet, forwarding\n", pid); } - t->skip += (pid - t->pid) - 1; - t->pid = pid; + + t->skip += (pid - t->fwd_pid) - 1; + t->fwd_pid = pid; + + if (GMC_is_pid_bigger (pid, t->last_fwd_ack)) + { + GNUNET_STATISTICS_update (stats, "# unsolicited unicast", 1, GNUNET_NO); + GNUNET_break_op (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received PID %u, ACK %u\n", + pid, t->last_fwd_ack); + return GNUNET_OK; + } + tunnel_reset_timeout (t); dest_id = GNUNET_PEER_search (&msg->destination); if (dest_id == myid) @@ -4804,9 +5482,8 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " it's for us! sending to clients...\n"); GNUNET_STATISTICS_update (stats, "# unicast received", 1, GNUNET_NO); - send_subscribed_clients (message, (struct GNUNET_MessageHeader *) &msg[1]); - // FIXME send after client processes the packet - tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); + send_subscribed_clients (message, &msg[1].header, t); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST); return GNUNET_OK; } ttl = ntohl (msg->ttl); @@ -4815,33 +5492,27 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, { GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n"); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); return GNUNET_OK; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n"); - GNUNET_STATISTICS_update (stats, "# unicast forwarded", 1, GNUNET_NO); neighbor = tree_get_first_hop (t->tree, dest_id); - cinfo = GNUNET_CONTAINER_multihashmap_get (t->children_fc, - &neighbor->hashPubKey); - if (NULL == cinfo) - { - cinfo = GNUNET_malloc (sizeof (struct MeshTunnelChildInfo)); - cinfo->id = GNUNET_PEER_intern (neighbor); - cinfo->skip = pid; - cinfo->max_pid = pid + t->queue_max - t->queue_n; // FIXME review - - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (t->children_fc, - &neighbor->hashPubKey, - cinfo, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); - } + cinfo = tunnel_get_neighbor_fc (t, neighbor); cinfo->pid = pid; GNUNET_CONTAINER_multihashmap_iterate (t->children_fc, &tunnel_add_skip, &neighbor); - send_message (message, neighbor, t); + if (GNUNET_YES == t->nobuffer && + GNUNET_YES == GMC_is_pid_bigger (pid, cinfo->fwd_ack)) + { + GNUNET_STATISTICS_update (stats, "# unsolicited unicast", 1, GNUNET_NO); + GNUNET_break_op (0); + return GNUNET_OK; + } + send_prebuilt_message (message, neighbor, t); + GNUNET_STATISTICS_update (stats, "# unicast forwarded", 1, GNUNET_NO); return GNUNET_OK; } @@ -4890,12 +5561,13 @@ handle_mesh_data_multicast (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } pid = ntohl (msg->pid); - if (t->pid == pid) + if (t->fwd_pid == pid) { /* already seen this packet, drop */ - GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_STATISTICS_update (stats, "# duplicate PID drops", 1, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Already seen pid %u, DROPPING!\n", pid); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); return GNUNET_OK; } else @@ -4903,8 +5575,8 @@ handle_mesh_data_multicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " pid %u not seen yet, forwarding\n", pid); } - t->skip += (pid - t->pid) - 1; - t->pid = pid; + t->skip += (pid - t->fwd_pid) - 1; + t->fwd_pid = pid; tunnel_reset_timeout (t); /* Transmit to locally interested clients */ @@ -4912,17 +5584,19 @@ handle_mesh_data_multicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_CONTAINER_multihashmap_contains (t->peers, &my_full_id.hashPubKey)) { GNUNET_STATISTICS_update (stats, "# multicast received", 1, GNUNET_NO); - send_subscribed_clients (message, &msg[1].header); + send_subscribed_clients (message, &msg[1].header, t); + tunnel_send_fwd_ack(t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ttl: %u\n", ntohl (msg->ttl)); if (ntohl (msg->ttl) == 0) { GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n"); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); return GNUNET_OK; } GNUNET_STATISTICS_update (stats, "# multicast forwarded", 1, GNUNET_NO); - tunnel_send_multicast (t, message, GNUNET_NO); + tunnel_send_multicast (t, message); return GNUNET_OK; } @@ -4951,6 +5625,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, struct MeshTunnel *t; size_t size; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got a ToOrigin packet from %s\n", GNUNET_i2s (peer)); size = ntohs (message->size); @@ -4961,8 +5636,8 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } msg = (struct GNUNET_MESH_ToOrigin *) message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %u\n", - ntohs (msg[1].header.type)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %s\n", + GNUNET_MESH_DEBUG_M2S (ntohs (msg[1].header.type))); t = tunnel_get (&msg->oid, ntohl (msg->tid)); if (NULL == t) @@ -4973,22 +5648,13 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } - if (t->id.oid == myid) + if (NULL != t->owner) { char cbuf[size]; struct GNUNET_MESH_ToOrigin *copy; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " it's for us! sending to clients...\n"); - if (NULL == t->owner) - { - /* got data packet for ownerless tunnel */ - GNUNET_STATISTICS_update (stats, "# data on ownerless tunnel", - 1, GNUNET_NO); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " no clients!\n"); - GNUNET_break_op (0); - return GNUNET_OK; - } /* TODO signature verification */ memcpy (cbuf, message, size); copy = (struct GNUNET_MESH_ToOrigin *) cbuf; @@ -4996,6 +5662,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_STATISTICS_update (stats, "# to origin received", 1, GNUNET_NO); GNUNET_SERVER_notification_context_unicast (nc, t->owner->handle, ©->header, GNUNET_NO); + tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); return GNUNET_OK; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -5009,7 +5676,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id); - send_message (message, &id, t); + send_prebuilt_message (message, &id, t); GNUNET_STATISTICS_update (stats, "# to origin forwarded", 1, GNUNET_NO); return GNUNET_OK; @@ -5035,15 +5702,13 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer, unsigned int atsi_count) { struct GNUNET_MESH_ACK *msg; - struct MeshTunnelChildInfo *cinfo; struct MeshTunnel *t; uint32_t ack; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got an ACK packet from %s\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK packet from %s!\n", GNUNET_i2s (peer)); msg = (struct GNUNET_MESH_ACK *) message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %u\n", - ntohs (msg[1].header.type)); + t = tunnel_get (&msg->oid, ntohl (msg->tid)); if (NULL == t) @@ -5054,15 +5719,26 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } ack = ntohl (msg->pid); - cinfo = GNUNET_CONTAINER_multihashmap_get (t->children_fc, - &peer->hashPubKey); - if (NULL == cinfo) + + /* Is this a forward or backward ACK? */ + if (tree_get_predecessor(t->tree) != GNUNET_PEER_search(peer)) { - GNUNET_break_op (0); - return GNUNET_OK; + struct MeshTunnelChildInfo *cinfo; + + debug_bck_ack++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK\n"); + cinfo = tunnel_get_neighbor_fc (t, peer); + cinfo->fwd_ack = ack; + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); + tunnel_unlock_fwd_queues (t); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " BCK ACK\n"); + t->bck_ack = ack; + tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); + tunnel_unlock_bck_queue (t); } - cinfo->max_pid = ack; - tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); return GNUNET_OK; } @@ -5164,11 +5840,56 @@ handle_mesh_path_ack (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_break (0); return GNUNET_OK; } - send_message (message, &id, t); + send_prebuilt_message (message, &id, t); return GNUNET_OK; } +/** + * Core handler for mesh keepalives. + * + * @param cls closure + * @param message message + * @param peer peer identity this notification is about + * @param atsi performance data + * @param atsi_count number of records in 'atsi' + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + * + * TODO: Check who we got this from, to validate route. + */ +static int +handle_mesh_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + struct GNUNET_MESH_TunnelKeepAlive *msg; + struct MeshTunnel *t; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got a keepalive packet from %s\n", + GNUNET_i2s (peer)); + + msg = (struct GNUNET_MESH_TunnelKeepAlive *) message; + t = tunnel_get (&msg->oid, ntohl (msg->tid)); + + if (NULL == t) + { + /* TODO notify that we dont know that tunnel */ + GNUNET_STATISTICS_update (stats, "# keepalive on unknown tunnel", 1, GNUNET_NO); + GNUNET_break_op (0); + return GNUNET_OK; + } + + tunnel_reset_timeout (t); + + GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO); + tunnel_send_multicast (t, message); + return GNUNET_OK; + } + + + /** * Functions to handle messages from core */ @@ -5181,6 +5902,8 @@ static struct GNUNET_CORE_MessageHandler core_handlers[] = { sizeof (struct GNUNET_MESH_TunnelDestroy)}, {&handle_mesh_data_unicast, GNUNET_MESSAGE_TYPE_MESH_UNICAST, 0}, {&handle_mesh_data_multicast, GNUNET_MESSAGE_TYPE_MESH_MULTICAST, 0}, + {&handle_mesh_keepalive, GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE, + sizeof (struct GNUNET_MESH_TunnelKeepAlive)}, {&handle_mesh_data_to_orig, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN, 0}, {&handle_mesh_ack, GNUNET_MESSAGE_TYPE_MESH_ACK, sizeof (struct GNUNET_MESH_ACK)}, @@ -5258,45 +5981,34 @@ notify_client_connection_failure (void *cls, size_t size, void *buf) * * @param cls Closure (tunnel for which to send the keepalive). * @param tc Notification context. - * - * TODO: implement explicit multicast keepalive? */ static void path_refresh (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct MeshTunnel *t = cls; - struct GNUNET_MessageHeader *payload; - struct GNUNET_MESH_Multicast *msg; - size_t size = - sizeof (struct GNUNET_MESH_Multicast) + - sizeof (struct GNUNET_MessageHeader); + struct GNUNET_MESH_TunnelKeepAlive *msg; + size_t size = sizeof (struct GNUNET_MESH_TunnelKeepAlive); char cbuf[size]; + t->path_refresh_task = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) { return; } - t->path_refresh_task = GNUNET_SCHEDULER_NO_TASK; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending keepalive for tunnel %d\n", t->id.tid); - msg = (struct GNUNET_MESH_Multicast *) cbuf; + msg = (struct GNUNET_MESH_TunnelKeepAlive *) cbuf; msg->header.size = htons (size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_MULTICAST); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE); msg->oid = my_full_id; msg->tid = htonl (t->id.tid); - msg->ttl = htonl (default_ttl); - msg->pid = htonl (t->pid + 1); - t->pid++; - payload = (struct GNUNET_MessageHeader *) &msg[1]; - payload->size = htons (sizeof (struct GNUNET_MessageHeader)); - payload->type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE); - tunnel_send_multicast (t, &msg->header, GNUNET_YES); + tunnel_send_multicast (t, &msg->header); t->path_refresh_task = GNUNET_SCHEDULER_add_delayed (refresh_path_time, &path_refresh, t); - return; + tunnel_reset_timeout(t); } @@ -5489,7 +6201,8 @@ dht_get_string_handler (void *cls, struct GNUNET_TIME_Absolute exp, const struct GNUNET_PeerIdentity *get_path, unsigned int get_path_length, const struct GNUNET_PeerIdentity *put_path, - unsigned int put_path_length, enum GNUNET_BLOCK_Type type, + unsigned int put_path_length, + enum GNUNET_BLOCK_Type type, size_t size, const void *data) { const struct MeshRegexBlock *block = data; @@ -5534,9 +6247,9 @@ dht_get_string_handler (void *cls, struct GNUNET_TIME_Absolute exp, } return; } - GNUNET_break (GNUNET_OK == - GNUNET_MESH_regex_block_iterate (block, size, - ®ex_edge_iterator, ctx)); + + regex_next_edge (block, size, ctx); + return; } @@ -5781,6 +6494,7 @@ handle_local_tunnel_create (void *cls, struct GNUNET_SERVER_Client *client, struct GNUNET_MESH_TunnelMessage *t_msg; struct MeshTunnel *t; struct MeshClient *c; + MESH_TunnelNumber tid; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new tunnel requested\n"); @@ -5803,14 +6517,15 @@ handle_local_tunnel_create (void *cls, struct GNUNET_SERVER_Client *client, t_msg = (struct GNUNET_MESH_TunnelMessage *) message; /* Sanity check for tunnel numbering */ - if (0 == (ntohl (t_msg->tunnel_id) & GNUNET_MESH_LOCAL_TUNNEL_ID_CLI)) + tid = ntohl (t_msg->tunnel_id); + if (0 == (tid & GNUNET_MESH_LOCAL_TUNNEL_ID_CLI)) { GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } /* Sanity check for duplicate tunnel IDs */ - if (NULL != tunnel_get_by_local_id (c, ntohl (t_msg->tunnel_id))) + if (NULL != tunnel_get_by_local_id (c, tid)) { GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); @@ -5819,7 +6534,7 @@ handle_local_tunnel_create (void *cls, struct GNUNET_SERVER_Client *client, while (NULL != tunnel_get_by_pi (myid, next_tid)) next_tid = (next_tid + 1) & ~GNUNET_MESH_LOCAL_TUNNEL_ID_CLI; - t = tunnel_new (myid, next_tid++, c, ntohl (t_msg->tunnel_id)); + t = tunnel_new (myid, next_tid++, c, tid); if (NULL == t) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Tunnel creation failed.\n"); @@ -5864,6 +6579,8 @@ handle_local_tunnel_destroy (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + /* Message sanity check */ if (sizeof (struct GNUNET_MESH_TunnelMessage) != ntohs (message->size)) { @@ -5871,7 +6588,7 @@ handle_local_tunnel_destroy (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + tunnel_msg = (struct GNUNET_MESH_TunnelMessage *) message; /* Retrieve tunnel */ @@ -5944,6 +6661,7 @@ handle_local_tunnel_speed (void *cls, struct GNUNET_SERVER_Client *client, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + tunnel_msg = (struct GNUNET_MESH_TunnelMessage *) message; /* Retrieve tunnel */ @@ -5998,8 +6716,8 @@ handle_local_tunnel_buffer (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + tunnel_msg = (struct GNUNET_MESH_TunnelMessage *) message; /* Retrieve tunnel */ @@ -6054,8 +6772,10 @@ handle_local_connect_add (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); peer_msg = (struct GNUNET_MESH_PeerControl *) message; + /* Sanity check for message size */ if (sizeof (struct GNUNET_MESH_PeerControl) != ntohs (peer_msg->header.size)) { @@ -6118,7 +6838,10 @@ handle_local_connect_del (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + peer_msg = (struct GNUNET_MESH_PeerControl *) message; + /* Sanity check for message size */ if (sizeof (struct GNUNET_MESH_PeerControl) != ntohs (peer_msg->header.size)) { @@ -6174,6 +6897,8 @@ handle_local_connect_del (void *cls, struct GNUNET_SERVER_Client *client, * @param cls closure * @param client identification of the client * @param message the actual message (PeerControl) + * + * FIXME implement DHT block bloomfilter */ static void handle_local_blacklist (void *cls, struct GNUNET_SERVER_Client *client, @@ -6192,6 +6917,8 @@ handle_local_blacklist (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + peer_msg = (struct GNUNET_MESH_PeerControl *) message; /* Sanity check for message size */ @@ -6244,6 +6971,8 @@ handle_local_unblacklist (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + peer_msg = (struct GNUNET_MESH_PeerControl *) message; /* Sanity check for message size */ @@ -6314,8 +7043,10 @@ handle_local_connect_by_type (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); connect_msg = (struct GNUNET_MESH_ConnectPeerByType *) message; + /* Sanity check for message size */ if (sizeof (struct GNUNET_MESH_ConnectPeerByType) != ntohs (connect_msg->header.size)) @@ -6429,6 +7160,7 @@ handle_local_connect_by_string (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); /* Message size sanity check */ if (sizeof(struct GNUNET_MESH_ConnectPeerByString) >= size) @@ -6546,7 +7278,10 @@ handle_local_unicast (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + data_msg = (struct GNUNET_MESH_Unicast *) message; + /* Sanity check for message size */ size = ntohs (message->size); if (sizeof (struct GNUNET_MESH_Unicast) + @@ -6585,26 +7320,37 @@ handle_local_unicast (void *cls, struct GNUNET_SERVER_Client *client, return; } + /* PID should be as expected */ + if (ntohl (data_msg->pid) != t->fwd_pid + 1) + { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Unicast PID, expected %u, got %u\n", + t->fwd_pid + 1, ntohl (data_msg->pid)); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + /* Ok, everything is correct, send the message * (pretend we got it from a mesh peer) */ { + /* Work around const limitation */ char buf[ntohs (message->size)] GNUNET_ALIGN; struct GNUNET_MESH_Unicast *copy; - /* Work around const limitation */ copy = (struct GNUNET_MESH_Unicast *) buf; memcpy (buf, data_msg, size); copy->oid = my_full_id; copy->tid = htonl (t->id.tid); copy->ttl = htonl (default_ttl); - copy->pid = htonl (t->pid + 1); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " calling generic handler...\n"); handle_mesh_data_unicast (NULL, &my_full_id, ©->header, NULL, 0); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "receive done OK\n"); GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; } @@ -6621,12 +7367,14 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { struct GNUNET_MESH_ToOrigin *data_msg; - struct GNUNET_PeerIdentity id; + struct MeshTunnelClientInfo *clinfo; struct MeshClient *c; struct MeshTunnel *t; MESH_TunnelNumber tid; size_t size; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got a ToOrigin request from a client!\n"); /* Sanity check for client registration */ if (NULL == (c = client_get (client))) { @@ -6634,7 +7382,10 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + data_msg = (struct GNUNET_MESH_ToOrigin *) message; + /* Sanity check for message size */ size = ntohs (message->size); if (sizeof (struct GNUNET_MESH_ToOrigin) + @@ -6647,8 +7398,7 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client, /* Tunnel exists? */ tid = ntohl (data_msg->tid); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Got a ToOrigin request from a client! Tunnel %X\n", tid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " on tunnel %X\n", tid); if (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV) { GNUNET_break (0); @@ -6658,23 +7408,38 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client, t = tunnel_get_by_local_id (c, tid); if (NULL == t) { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id); GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } /* It should be sent by someone who has this as incoming tunnel. */ - if (-1 == client_knows_tunnel (c, t)) + if (GNUNET_NO == client_knows_tunnel (c, t)) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + /* PID should be as expected */ + clinfo = tunnel_get_client_fc (t, c); + if (ntohl (data_msg->pid) != clinfo->bck_pid + 1) { GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "To Origin PID, expected %u, got %u\n", + clinfo->bck_pid + 1, + ntohl (data_msg->pid)); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - GNUNET_PEER_resolve (t->id.oid, &id); /* Ok, everything is correct, send the message * (pretend we got it from a mesh peer) */ + clinfo->bck_pid++; { char buf[ntohs (message->size)] GNUNET_ALIGN; struct GNUNET_MESH_ToOrigin *copy; @@ -6682,14 +7447,26 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client, /* Work around const limitation */ copy = (struct GNUNET_MESH_ToOrigin *) buf; memcpy (buf, data_msg, size); - copy->oid = id; + GNUNET_PEER_resolve (t->id.oid, ©->oid); copy->tid = htonl (t->id.tid); + copy->ttl = htonl (default_ttl); + if (ntohl (copy->pid) != (t->bck_pid + 1)) + { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "To Origin PID, expected %u, got %u\n", + t->bck_pid + 1, + ntohl (copy->pid)); + return; + } + t->bck_pid++; copy->sender = my_full_id; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " calling generic handler...\n"); handle_mesh_data_to_orig (NULL, &my_full_id, ©->header, NULL, 0); } GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; } @@ -6720,7 +7497,10 @@ handle_local_multicast (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); + data_msg = (struct GNUNET_MESH_Multicast *) message; + /* Sanity check for message size */ if (sizeof (struct GNUNET_MESH_Multicast) + sizeof (struct GNUNET_MessageHeader) > ntohs (data_msg->header.size)) @@ -6735,6 +7515,9 @@ handle_local_multicast (void *cls, struct GNUNET_SERVER_Client *client, t = tunnel_get_by_local_id (c, tid); if (NULL == t) { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id); GNUNET_break (0); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; @@ -6748,6 +7531,17 @@ handle_local_multicast (void *cls, struct GNUNET_SERVER_Client *client, return; } + /* PID should be as expected */ + if (ntohl (data_msg->pid) != t->fwd_pid + 1) + { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Multicast PID, expected %u, got %u\n", + t->fwd_pid + 1, ntohl (data_msg->pid)); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + { char buf[ntohs (message->size)] GNUNET_ALIGN; struct GNUNET_MESH_Multicast *copy; @@ -6757,13 +7551,13 @@ handle_local_multicast (void *cls, struct GNUNET_SERVER_Client *client, copy->oid = my_full_id; copy->tid = htonl (t->id.tid); copy->ttl = htonl (default_ttl); - copy->pid = htonl (t->pid + 1); + GNUNET_assert (ntohl (copy->pid) == (t->fwd_pid + 1)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " calling generic handler...\n"); handle_mesh_data_multicast (client, &my_full_id, ©->header, NULL, 0); } - /* receive done gets called when last copy is sent to a neighbor */ + GNUNET_SERVER_receive_done (t->owner->handle, GNUNET_OK); return; } @@ -6783,6 +7577,7 @@ handle_local_ack (void *cls, struct GNUNET_SERVER_Client *client, struct MeshTunnel *t; struct MeshClient *c; MESH_TunnelNumber tid; + uint32_t ack; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got a local ACK\n"); /* Sanity check for client registration */ @@ -6792,8 +7587,10 @@ handle_local_ack (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); msg = (struct GNUNET_MESH_LocalAck *) message; + /* Tunnel exists? */ tid = ntohl (msg->tunnel_id); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " on tunnel %X\n", tid); @@ -6801,24 +7598,31 @@ handle_local_ack (void *cls, struct GNUNET_SERVER_Client *client, if (NULL == t) { GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - /* Does client own tunnel? Is this and ACK for BCK traffic? */ + ack = ntohl (msg->max_pid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ack); + + /* Does client own tunnel? I.E: Is this and ACK for BCK traffic? */ if (NULL != t->owner && t->owner->handle == client) { - GNUNET_break (0); - // FIXME TODO + /* The client owns the tunnel, ACK is for data to_origin, send BCK ACK. */ + t->bck_ack = ack; + tunnel_send_bck_ack(t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK); } else { /* The client doesn't own the tunnel, this ACK is for FWD traffic. */ - tunnel_set_client_fwd_ack (t, c, ntohl (msg->max_pid)); - tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST); + tunnel_set_client_fwd_ack (t, c, ack); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK); } - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; } @@ -6890,13 +7694,16 @@ static void core_init (void *cls, struct GNUNET_CORE_Handle *server, const struct GNUNET_PeerIdentity *identity) { + static int i = 0; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core init\n"); core_handle = server; if (0 != memcmp (identity, &my_full_id, sizeof (my_full_id)) || NULL == server) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Wrong CORE service\n")); - GNUNET_SCHEDULER_shutdown (); + GNUNET_SCHEDULER_shutdown (); // Try gracefully + if (10 < i++) + GNUNET_abort(); // Try harder } return; } @@ -6965,13 +7772,15 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) while (NULL != q) { n = q->next; - if (q->peer == pi) - { - /* try to reroute this traffic instead */ - queue_destroy(q, GNUNET_YES); - } + /* TODO try to reroute this traffic instead */ + queue_destroy(q, GNUNET_YES); q = n; } + if (NULL != pi->core_transmit) + { + GNUNET_CORE_notify_transmit_ready_cancel(pi->core_transmit); + pi->core_transmit = NULL; + } peer_info_remove_path (pi, pi->id, myid); if (myid == pi->id) { @@ -7034,6 +7843,7 @@ shutdown_peer (void *cls, const struct GNUNET_HashCode * key, void *value) return GNUNET_YES; } + /** * Task run during shutdown. * @@ -7050,6 +7860,11 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_CORE_disconnect (core_handle); core_handle = NULL; } + if (NULL != keygen) + { + GNUNET_CRYPTO_rsa_key_create_stop (keygen); + keygen = NULL; + } GNUNET_CONTAINER_multihashmap_iterate (tunnels, &shutdown_tunnel, NULL); GNUNET_CONTAINER_multihashmap_iterate (peers, &shutdown_peer, NULL); if (dht_handle != NULL) @@ -7070,6 +7885,76 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "shut down\n"); } + +/** + * Callback for hostkey read/generation + * + * @param cls NULL + * @param pk the private key + * @param emsg error message + */ +static void +key_generation_cb (void *cls, + struct GNUNET_CRYPTO_RsaPrivateKey *pk, + const char *emsg) +{ + struct MeshPeerInfo *peer; + struct MeshPeerPath *p; + + keygen = NULL; + if (NULL == pk) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Mesh service could not access hostkey. Exiting.\n")); + GNUNET_SCHEDULER_shutdown (); + return; + } + my_private_key = pk; + GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key); + GNUNET_CRYPTO_hash (&my_public_key, sizeof (my_public_key), + &my_full_id.hashPubKey); + myid = GNUNET_PEER_intern (&my_full_id); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Mesh for peer [%s] starting\n", + GNUNET_i2s(&my_full_id)); + +// transport_handle = GNUNET_TRANSPORT_connect(c, +// &my_full_id, +// NULL, +// NULL, +// NULL, +// NULL); + + + + next_tid = 0; + next_local_tid = GNUNET_MESH_LOCAL_TUNNEL_ID_SERV; + + + GNUNET_SERVER_add_handlers (server_handle, client_handlers); + nc = GNUNET_SERVER_notification_context_create (server_handle, 1); + GNUNET_SERVER_disconnect_notify (server_handle, + &handle_local_client_disconnect, NULL); + + + clients = NULL; + clients_tail = NULL; + next_client_id = 0; + + announce_applications_task = GNUNET_SCHEDULER_NO_TASK; + announce_id_task = GNUNET_SCHEDULER_add_now (&announce_id, cls); + + /* Create a peer_info for the local peer */ + peer = peer_info_get (&my_full_id); + p = path_new (1); + p->peers[0] = myid; + GNUNET_PEER_change_rc (myid, 1); + peer_info_add_path (peer, p, GNUNET_YES); + GNUNET_SERVER_resume (server_handle); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Mesh service running\n"); +} + + /** * Process mesh requests. * @@ -7081,8 +7966,6 @@ static void run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) { - struct MeshPeerInfo *peer; - struct MeshPeerPath *p; char *keyfile; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "starting to run\n"); @@ -7223,73 +8106,28 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, dht_replication_level = 10; } - - my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile); - GNUNET_free (keyfile); - if (my_private_key == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Mesh service could not access hostkey. Exiting.\n")); - GNUNET_SCHEDULER_shutdown (); - return; - } - GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key); - GNUNET_CRYPTO_hash (&my_public_key, sizeof (my_public_key), - &my_full_id.hashPubKey); - myid = GNUNET_PEER_intern (&my_full_id); - -// transport_handle = GNUNET_TRANSPORT_connect(c, -// &my_full_id, -// NULL, -// NULL, -// NULL, -// NULL); - - dht_handle = GNUNET_DHT_connect (c, 64); - if (dht_handle == NULL) - { - GNUNET_break (0); - } - - stats = GNUNET_STATISTICS_create ("mesh", c); - - - next_tid = 0; - next_local_tid = GNUNET_MESH_LOCAL_TUNNEL_ID_SERV; - tunnels = GNUNET_CONTAINER_multihashmap_create (32); incoming_tunnels = GNUNET_CONTAINER_multihashmap_create (32); peers = GNUNET_CONTAINER_multihashmap_create (32); applications = GNUNET_CONTAINER_multihashmap_create (32); types = GNUNET_CONTAINER_multihashmap_create (32); - GNUNET_SERVER_add_handlers (server_handle, client_handlers); - nc = GNUNET_SERVER_notification_context_create (server_handle, 1); - GNUNET_SERVER_disconnect_notify (server_handle, - &handle_local_client_disconnect, NULL); - - - clients = NULL; - clients_tail = NULL; - next_client_id = 0; - - announce_applications_task = GNUNET_SCHEDULER_NO_TASK; - announce_id_task = GNUNET_SCHEDULER_add_now (&announce_id, cls); - - /* Create a peer_info for the local peer */ - peer = peer_info_get (&my_full_id); - p = path_new (1); - p->peers[0] = myid; - GNUNET_PEER_change_rc (myid, 1); - peer_info_add_path (peer, p, GNUNET_YES); + dht_handle = GNUNET_DHT_connect (c, 64); + if (NULL == dht_handle) + { + GNUNET_break (0); + } + stats = GNUNET_STATISTICS_create ("mesh", c); + GNUNET_SERVER_suspend (server_handle); /* Scheduled the task to clean up when shutdown is called */ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "end of run()\n"); + keygen = GNUNET_CRYPTO_rsa_key_create_start (keyfile, &key_generation_cb, NULL); + GNUNET_free (keyfile); } + /** * The main function for the mesh service. * @@ -7309,5 +8147,11 @@ main (int argc, char *const *argv) NULL)) ? 0 : 1; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "main() END\n"); + INTERVAL_SHOW; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Mesh for peer [%s] FWD ACKs %u, BCK ACKs %u\n", + GNUNET_i2s(&my_full_id), debug_fwd_ack, debug_bck_ack); + return ret; }