X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmesh%2Fgnunet-service-mesh.c;h=7ebe7d199d188696f36268b002a872fa3a1b1ae1;hb=b552dea05cbfacacf1c65c6eb1f54220f4e4beb5;hp=d633c477f9e332781f54edfab265f2ec9f7446e1;hpb=b8fc222875ead195a82fad6fd5d6967d08fb72d0;p=oweals%2Fgnunet.git diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index d633c477f..7ebe7d199 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2001 - 2011 Christian Grothoff (and other contributing authors) + (C) 2001-2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -37,7 +37,6 @@ * TODO: * - error reporting (CREATE/CHANGE/ADD/DEL?) -- new message! * - partial disconnect reporting -- same as error reporting? - * - add vs create? change vs. keep-alive? same msg or different ones? -- thinking... * - add ping message * - relay corking down to core * - set ttl relative to tree depth @@ -115,12 +114,11 @@ struct MeshData /** Tunnel it belongs to. */ struct MeshTunnel *t; - /** In case of a multicast, task to allow a client to send more data if - * some neighbor is too slow. */ - GNUNET_SCHEDULER_TaskIdentifier *task; + /** How many remaining neighbors still hav't got it. */ + unsigned int reference_counter; /** How many remaining neighbors we need to send this to. */ - unsigned int *reference_counter; + unsigned int total_out; /** Size of the data. */ size_t data_len; @@ -512,6 +510,23 @@ struct MeshTunnelChildInfo * Last ACK sent to that child (BCK ACK). */ uint32_t bck_ack; + + /** + * Circular buffer pointing to MeshPeerQueue elements for all + * payload traffic going to this child. + * Size determined by the tunnel queue size (@c t->fwd_queue_max). + */ + struct MeshPeerQueue **send_buffer; + + /** + * Index of the oldest element in the send_buffer. + */ + unsigned int send_buffer_start; + + /** + * How many elements are already in the buffer. + */ + unsigned int send_buffer_n; }; @@ -753,6 +768,16 @@ struct MeshRegexSearchContext */ struct MeshRegexSearchInfo *info; + /** + * We just want to look for one edge, the longer the better. + * Keep its length. + */ + unsigned int longest_match; + + /** + * Destination hash of the longest match. + */ + struct GNUNET_HashCode hash; }; /******************************************************************************/ @@ -780,6 +805,9 @@ mesh_debug (void *cls, int success) } #endif +unsigned int debug_fwd_ack; +unsigned int debug_bck_ack; + #endif /******************************************************************************/ @@ -799,6 +827,12 @@ static long long unsigned int dht_replication_level; static long long unsigned int max_tunnels; static long long unsigned int max_msgs_queue; + +/** + * Hostkey generation context + */ +static struct GNUNET_CRYPTO_RsaKeyGenerationContext *keygen; + /** * DLL with all the clients, head. */ @@ -1107,20 +1141,18 @@ tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c); /** - * Iterator over edges in a regex block retrieved from the DHT. + * Jump to the next edge, with the longest matching token. * - * @param cls Closure. - * @param token Token that follows to next state. - * @param len Lenght of token. - * @param key Hash of next state. + * @param block Block found in the DHT. + * @param size Size of the block. + * @param ctx Context of the search. * * @return GNUNET_YES if should keep iterating, GNUNET_NO otherwise. */ -static int -regex_edge_iterator (void *cls, - const char *token, - size_t len, - const struct GNUNET_HashCode *key); +static void +regex_next_edge (const struct MeshRegexBlock *block, + size_t size, + struct MeshRegexSearchContext *ctx); /** @@ -1136,9 +1168,17 @@ regex_find_path (const struct GNUNET_HashCode *key, /** - * Queue and pass message to core when possible. + * @brief Queue and pass message to core when possible. + * + * If type is payload (UNICAST, TO_ORIGIN, MULTICAST) checks for queue status + * and accounts for it. In case the queue is full, the message is dropped and + * a break issued. + * + * Otherwise, message is treated as internal and allowed to go regardless of + * queue status. * - * @param cls Closure (type dependant). + * @param cls Closure (@c type dependant). It will be used by queue_send to + * build the message to be sent if not already prebuilt. * @param type Type of the message, 0 for a raw message. * @param size Size of the message. * @param dst Neighbor to send message to. @@ -1222,8 +1262,7 @@ regex_result_iterator (void *cls, ntohl(block->accepting)); } - (void) GNUNET_MESH_regex_block_iterate (block, SIZE_MAX, - ®ex_edge_iterator, ctx); + regex_next_edge(block, SIZE_MAX, ctx); return GNUNET_YES; } @@ -1246,9 +1285,7 @@ regex_edge_iterator (void *cls, const struct GNUNET_HashCode *key) { struct MeshRegexSearchContext *ctx = cls; - struct MeshRegexSearchContext *new_ctx; struct MeshRegexSearchInfo *info = ctx->info; - struct GNUNET_DHT_GetHandle *get_h; char *current; size_t current_len; @@ -1272,40 +1309,93 @@ regex_edge_iterator (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Token doesn't match, END\n"); return GNUNET_YES; // Token doesn't match } + + if (len > ctx->longest_match) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Token is longer, KEEP\n"); + ctx->longest_match = len; + ctx->hash = *key; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Token is not longer, IGNORE\n"); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* End of regex edge iterator\n"); + return GNUNET_YES; +} + + +/** + * Jump to the next edge, with the longest matching token. + * + * @param block Block found in the DHT. + * @param size Size of the block. + * @param ctx Context of the search. + * + * @return GNUNET_YES if should keep iterating, GNUNET_NO otherwise. + */ +static void +regex_next_edge (const struct MeshRegexBlock *block, + size_t size, + struct MeshRegexSearchContext *ctx) +{ + struct MeshRegexSearchContext *new_ctx; + struct MeshRegexSearchInfo *info = ctx->info; + struct GNUNET_DHT_GetHandle *get_h; + + int result; + + /* Find the longest match for the current string position, + * among tokens in the given block */ + ctx->longest_match = 0; + result = GNUNET_MESH_regex_block_iterate (block, size, + ®ex_edge_iterator, ctx); + GNUNET_break (GNUNET_OK == result || SIZE_MAX == size); + + /* Did anything match? */ + if (0 == ctx->longest_match) + return; + new_ctx = GNUNET_malloc (sizeof (struct MeshRegexSearchContext)); new_ctx->info = info; - new_ctx->position = ctx->position + len; + new_ctx->position = ctx->position + ctx->longest_match; GNUNET_array_append (info->contexts, info->n_contexts, new_ctx); + + /* Check whether we already have a DHT GET running for it */ if (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_contains(info->dht_get_handles, key)) + GNUNET_CONTAINER_multihashmap_contains(info->dht_get_handles, &ctx->hash)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* GET running, END\n"); - GNUNET_CONTAINER_multihashmap_get_multiple (info->dht_get_results, key, + GNUNET_CONTAINER_multihashmap_get_multiple (info->dht_get_results, + &ctx->hash, ®ex_result_iterator, new_ctx); - return GNUNET_YES; // We are already looking for it + return; // We are already looking for it } + /* Start search in DHT */ get_h = GNUNET_DHT_get_start (dht_handle, /* handle */ GNUNET_BLOCK_TYPE_MESH_REGEX, /* type */ - key, /* key to search */ + &ctx->hash, /* key to search */ dht_replication_level, /* replication level */ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, NULL, /* xquery */ // FIXME BLOOMFILTER 0, /* xquery bits */ // FIXME BLOOMFILTER SIZE &dht_get_string_handler, new_ctx); if (GNUNET_OK != - GNUNET_CONTAINER_multihashmap_put(info->dht_get_handles, key, get_h, + GNUNET_CONTAINER_multihashmap_put(info->dht_get_handles, + &ctx->hash, + get_h, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)) { GNUNET_break (0); - return GNUNET_YES; + return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* End of regex edge iterator\n"); - return GNUNET_YES; } + /** * Iterator over hash map entries to cancel DHT GET requests after a * successful connect_by_string. @@ -1360,8 +1450,11 @@ regex_free_result (void *cls, * @param edges edges leaving current state. */ void -regex_iterator (void *cls, const struct GNUNET_HashCode *key, const char *proof, - int accepting, unsigned int num_edges, +regex_iterator (void *cls, + const struct GNUNET_HashCode *key, + const char *proof, + int accepting, + unsigned int num_edges, const struct GNUNET_REGEX_Edge *edges) { struct MeshRegexBlock *block; @@ -1616,6 +1709,7 @@ announce_application (void *cls, const struct GNUNET_HashCode * key, void *value block.id = my_full_id; c = GNUNET_CONTAINER_multihashmap_get (applications, key); + GNUNET_assert(NULL != c); block.type = (long) GNUNET_CONTAINER_multihashmap_get (c->apps, key); if (0 == block.type) { @@ -1754,23 +1848,9 @@ announce_id (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void data_descriptor_decrement_rc (struct MeshData *mesh_data) { - /* Make sure it's a multicast packet */ - GNUNET_assert (NULL != mesh_data->reference_counter); - - if (0 == --(*(mesh_data->reference_counter))) + if (0 == --(mesh_data->reference_counter)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Last copy!\n"); - if (NULL != mesh_data->task) - { - if (GNUNET_SCHEDULER_NO_TASK != *(mesh_data->task)) - { - GNUNET_SCHEDULER_cancel (*(mesh_data->task)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " notifying client...\n"); - GNUNET_SERVER_receive_done (mesh_data->t->owner->handle, GNUNET_OK); - } - GNUNET_free (mesh_data->task); - } - GNUNET_free (mesh_data->reference_counter); GNUNET_free (mesh_data->data); GNUNET_free (mesh_data); } @@ -1826,32 +1906,6 @@ client_is_subscribed (uint16_t message_type, struct MeshClient *c) } -/** - * Allow a client to send more data after transmitting a multicast message - * which some neighbor has not yet accepted altough a reasonable time has - * passed. - * - * @param cls Closure (DataDescriptor containing the task identifier) - * @param tc Task Context - * - * FIXME reference counter cshould be just int - */ -static void -client_allow_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct MeshData *mdata = cls; - - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - GNUNET_assert (NULL != mdata->reference_counter); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "CLIENT ALLOW SEND DESPITE %u COPIES PENDING\n", - *(mdata->reference_counter)); - *(mdata->task) = GNUNET_SCHEDULER_NO_TASK; - GNUNET_SERVER_receive_done (mdata->t->owner->handle, GNUNET_OK); -} - - /** * Check whether client wants traffic from a tunnel. * @@ -2258,9 +2312,9 @@ send_core_data_raw (void *cls, size_t size, void *buf) * @param t Tunnel on which this message is transmitted. */ static void -send_message (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - struct MeshTunnel *t) +send_prebuilt_message (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + struct MeshTunnel *t) { struct MeshTransmissionDescriptor *info; struct MeshPeerInfo *neighbor; @@ -2284,8 +2338,8 @@ send_message (const struct GNUNET_MessageHeader *message, m->ttl = htonl (ntohl (m->ttl) - 1); } info->mesh_data->data_len = size; - info->mesh_data->reference_counter = GNUNET_malloc (sizeof (unsigned int)); - *info->mesh_data->reference_counter = 1; + info->mesh_data->reference_counter = 1; + info->mesh_data->total_out = 1; neighbor = peer_info_get (peer); for (p = neighbor->path_head; NULL != p; p = p->next) { @@ -2328,16 +2382,7 @@ send_message (const struct GNUNET_MessageHeader *message, } info->peer = neighbor; if (GNUNET_MESSAGE_TYPE_MESH_PATH_ACK == type) - { - /* - * TODO: in this case we only need the service to retransmit - * the message down the path. If we pass the real type to queue_add, - * queue_send will try to build the message from scratch. This can - * probably be done by some other way instead of deleteing the type - * info. - */ type = 0; - } queue_add (info, type, size, @@ -2435,12 +2480,39 @@ send_destroy_path (struct MeshTunnel *t, GNUNET_PEER_Id destination) { GNUNET_PEER_resolve (p->peers[i], &pi[i]); } - send_message (&msg->header, tree_get_first_hop (t->tree, destination), t); + send_prebuilt_message (&msg->header, tree_get_first_hop (t->tree, destination), t); } path_destroy (p); } +/** + * Sends a PATH ACK message in reponse to a received PATH_CREATE directed to us. + * + * @param t Tunnel which to confirm. + */ +static void +send_path_ack (struct MeshTunnel *t) +{ + struct MeshTransmissionDescriptor *info; + struct GNUNET_PeerIdentity id; + GNUNET_PEER_Id peer; + + peer = tree_get_predecessor (t->tree); + GNUNET_PEER_resolve (peer, &id); + info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); + info->origin = &t->id; + info->peer = GNUNET_CONTAINER_multihashmap_get (peers, &id.hashPubKey); + GNUNET_assert (NULL != info->peer); + + queue_add (info, + GNUNET_MESSAGE_TYPE_MESH_PATH_ACK, + sizeof (struct GNUNET_MESH_PathACK), + info->peer, + t); +} + + /** * Try to establish a new connection to this peer. * Use the best path for the given tunnel. @@ -3048,7 +3120,10 @@ tunnel_delete_client (struct MeshTunnel *t, const struct MeshClient *c) /** - * Iterator to free MeshTunnelChildInfo of tunnel children. + * @brief Iterator to destroy MeshTunnelChildInfo of tunnel children. + * + * Destroys queue elements of all waiting transmissions and frees all memory + * used by the struct and its elements. * * @param cls Closure (tunnel info). * @param key Hash of GNUNET_PEER_Id (unused). @@ -3061,7 +3136,22 @@ tunnel_destroy_child (void *cls, const struct GNUNET_HashCode * key, void *value) { - GNUNET_free (value); + struct MeshTunnelChildInfo *cinfo = value; + struct MeshTunnel *t = cls; + unsigned int c; + unsigned int i; + + for (c = 0; c < cinfo->send_buffer_n; c++) + { + i = (cinfo->send_buffer_start + c) % t->fwd_queue_max; + if (NULL != cinfo->send_buffer[i]) + queue_destroy (cinfo->send_buffer[i], GNUNET_YES); + else + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u %u\n", c, cinfo->send_buffer_n); + } + GNUNET_free_non_null (cinfo->send_buffer); + GNUNET_free (cinfo); return GNUNET_YES; } @@ -3246,7 +3336,7 @@ tunnel_notify_connection_broken (struct MeshTunnel *t, GNUNET_PEER_Id p1, msg.peer1 = my_full_id; GNUNET_PEER_resolve (pid, &msg.peer2); GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &neighbor); - send_message (&msg.header, &neighbor, t); + send_prebuilt_message (&msg.header, &neighbor, t); } } return pid; @@ -3265,19 +3355,21 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) struct MeshData *mdata = cls; struct MeshTransmissionDescriptor *info; struct GNUNET_PeerIdentity neighbor; + struct GNUNET_MessageHeader *msg; info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); info->mesh_data = mdata; - (*(mdata->reference_counter)) ++; + (mdata->reference_counter) ++; info->destination = neighbor_id; GNUNET_PEER_resolve (neighbor_id, &neighbor); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending to %s...\n", GNUNET_i2s (&neighbor)); info->peer = peer_info_get (&neighbor); GNUNET_assert (NULL != info->peer); + msg = (struct GNUNET_MessageHeader *) mdata->data; queue_add(info, - GNUNET_MESSAGE_TYPE_MESH_MULTICAST, + ntohs (msg->type), info->mesh_data->data_len, info->peer, mdata->t); @@ -3290,12 +3382,10 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) * * @param t Tunnel in which to send the data. * @param msg Message to be sent. - * @param internal Has the service generated this message? */ static void tunnel_send_multicast (struct MeshTunnel *t, - const struct GNUNET_MessageHeader *msg, - int internal) + const struct GNUNET_MessageHeader *msg) { struct MeshData *mdata; @@ -3304,7 +3394,6 @@ tunnel_send_multicast (struct MeshTunnel *t, mdata = GNUNET_malloc (sizeof (struct MeshData)); mdata->data_len = ntohs (msg->size); - mdata->reference_counter = GNUNET_malloc (sizeof (unsigned int)); mdata->t = t; mdata->data = GNUNET_malloc (mdata->data_len); memcpy (mdata->data, msg, mdata->data_len); @@ -3312,17 +3401,22 @@ tunnel_send_multicast (struct MeshTunnel *t, { struct GNUNET_MESH_Multicast *mcast; + mcast = (struct GNUNET_MESH_Multicast *) mdata->data; if (t->fwd_queue_n >= t->fwd_queue_max) { GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " queue full!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " queue full!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " message from %s!\n", + GNUNET_i2s(&mcast->oid)); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " message at %s!\n", + GNUNET_i2s(&my_full_id)); GNUNET_free (mdata->data); - GNUNET_free (mdata->reference_counter); GNUNET_free (mdata); return; } t->fwd_queue_n++; - mcast = (struct GNUNET_MESH_Multicast *) mdata->data; mcast->ttl = htonl (ntohl (mcast->ttl) - 1); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " data packet, ttl: %u\n", ntohl (mcast->ttl)); @@ -3331,35 +3425,20 @@ tunnel_send_multicast (struct MeshTunnel *t, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " not a data packet, no ttl\n"); } - if (NULL != t->owner && - GNUNET_YES != t->owner->shutting_down && - GNUNET_NO == internal) - { - mdata->task = GNUNET_malloc (sizeof (GNUNET_SCHEDULER_TaskIdentifier)); - (*(mdata->task)) = - GNUNET_SCHEDULER_add_delayed (unacknowledged_wait_time, &client_allow_send, - mdata); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "timeout task %u\n", - *(mdata->task)); - } tree_iterate_children (t->tree, &tunnel_send_multicast_iterator, mdata); - if (*(mdata->reference_counter) == 0) + if (mdata->reference_counter == 0) { - GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " no one to send data to\n"); GNUNET_free (mdata->data); - GNUNET_free (mdata->reference_counter); - if (NULL != mdata->task) - { - GNUNET_SCHEDULER_cancel(*(mdata->task)); - GNUNET_free (mdata->task); - GNUNET_SERVER_receive_done (t->owner->handle, GNUNET_OK); - } GNUNET_free (mdata); t->fwd_queue_n--; } + else + { + mdata->total_out = mdata->reference_counter; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending a multicast packet done\n"); return; @@ -3427,6 +3506,9 @@ tunnel_get_neighbor_fc (const struct MeshTunnel *t, cinfo->fwd_ack = t->fwd_pid + delta; cinfo->bck_ack = delta; + cinfo->send_buffer = + GNUNET_malloc (sizeof(struct MeshPeerQueue *) * t->fwd_queue_max); + GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (t->children_fc, &peer->hashPubKey, @@ -3508,7 +3590,7 @@ tunnel_get_child_fwd_ack (void *cls, * * @param t Tunnel. * - * @return Maximum PID allowed (uint32 MAX), -1 if node has no children. + * @return Maximum PID allowed (uint32 MAX), -1LL if node has no children. */ static int64_t tunnel_get_children_fwd_ack (struct MeshTunnel *t) @@ -3517,6 +3599,7 @@ tunnel_get_children_fwd_ack (struct MeshTunnel *t) ctx.t = t; ctx.max_child_ack = 0; ctx.nchildren = 0; + ctx.init = GNUNET_NO; tree_iterate_children (t->tree, tunnel_get_child_fwd_ack, &ctx); if (0 == ctx.nchildren) @@ -3565,7 +3648,7 @@ tunnel_set_client_fwd_ack (struct MeshTunnel *t, * @param t Tunnel on which to look. * * @return Corresponding ACK value (max uint32_t). - * If no clients are suscribed, -1. + * If no clients are suscribed, -1LL. */ static int64_t tunnel_get_clients_fwd_ack (struct MeshTunnel *t) @@ -3580,9 +3663,9 @@ tunnel_get_clients_fwd_ack (struct MeshTunnel *t) return -1LL; } - for (ack = -1, i = 0; i < t->nclients; i++) + for (ack = -1LL, i = 0; i < t->nclients; i++) { - if (-1 == ack || + if (-1LL == ack || (GNUNET_YES == t->speed_min && GNUNET_YES == GMC_is_pid_bigger (ack, t->clients_fc[i].fwd_ack)) || (GNUNET_NO == t->speed_min && @@ -3610,37 +3693,41 @@ tunnel_get_clients_fwd_ack (struct MeshTunnel *t) static uint32_t tunnel_get_fwd_ack (struct MeshTunnel *t) { + uint32_t ack; uint32_t count; uint32_t buffer_free; int64_t child_ack; int64_t client_ack; - uint32_t ack; count = t->fwd_pid - t->skip; buffer_free = t->fwd_queue_max - t->fwd_queue_n; - ack = count + buffer_free; // Might overflow 32 bits, it's ok! + ack = count; child_ack = tunnel_get_children_fwd_ack (t); client_ack = tunnel_get_clients_fwd_ack (t); - if (-1 == child_ack) + if (-1LL == child_ack) { // Node has no children, child_ack AND core buffer are irrelevant. - GNUNET_break (-1 != client_ack); // No children AND no clients? Not good! // FIXME fc + GNUNET_break (-1LL != client_ack); // No children AND no clients? Not good! return (uint32_t) client_ack; } - + if (-1LL == client_ack) + { + client_ack = ack + buffer_free; // Might overflow 32 bits, it's ok! + } if (GNUNET_YES == t->speed_min) { - ack = GMC_min_pid ((uint32_t) child_ack, ack); + ack = GMC_min_pid ((uint32_t) child_ack, ack) + buffer_free; // Might overflow 32 bits, it's ok!; ack = GMC_min_pid ((uint32_t) client_ack, ack); } else { - ack = GMC_max_pid ((uint32_t) child_ack, ack); + ack = GMC_max_pid ((uint32_t) child_ack, ack) + buffer_free; // Might overflow 32 bits, it's ok!; ack = GMC_max_pid ((uint32_t) client_ack, ack); } if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ack, t->fwd_pid)) ack = t->fwd_pid + 1; // Might overflow 32 bits, it's ok! - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "c %u, bf %u, ch %u, cl %u, ACK: %u\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "c %u, bf %u, ch %lld, cl %lld, ACK: %u\n", count, buffer_free, child_ack, client_ack, ack); return ack; } @@ -3686,7 +3773,7 @@ send_ack (struct MeshTunnel *t, struct GNUNET_PeerIdentity *peer, uint32_t ack) msg.pid = htonl (ack); msg.tid = htonl (t->id.tid); - send_message (&msg.header, peer, t); + send_prebuilt_message (&msg.header, peer, t); } @@ -3751,18 +3838,6 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, nobuffer\n"); return; } - if (t->fwd_queue_max > t->fwd_queue_n * 2 && - GMC_is_pid_bigger(t->last_fwd_ack, t->fwd_pid)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " t->qmax: %u, t->qn: %u\n", - t->fwd_queue_max, t->fwd_queue_n); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " t->pid: %u, t->ack: %u\n", - t->fwd_pid, t->last_fwd_ack); - return; - } break; case GNUNET_MESSAGE_TYPE_MESH_ACK: case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK: @@ -3771,6 +3846,20 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) GNUNET_break (0); } + /* Check if we need no retransmit the ACK */ + if (t->fwd_queue_max > t->fwd_queue_n * 4 && + GMC_is_pid_bigger(t->last_fwd_ack, t->fwd_pid)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " t->qmax: %u, t->qn: %u\n", + t->fwd_queue_max, t->fwd_queue_n); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " t->pid: %u, t->ack: %u\n", + t->fwd_pid, t->last_fwd_ack); + return; + } + /* Ok, ACK might be necessary, what PID to ACK? */ ack = tunnel_get_fwd_ack (t); @@ -3784,6 +3873,7 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) t->last_fwd_ack = ack; GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id); send_ack (t, &id, ack); + debug_fwd_ack++; } @@ -3998,7 +4088,7 @@ tunnel_send_destroy (struct MeshTunnel *t) msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY); GNUNET_PEER_resolve (t->id.oid, &msg.oid); msg.tid = htonl (t->id.tid); - tunnel_send_multicast (t, &msg.header, GNUNET_NO); + tunnel_send_multicast (t, &msg.header); } @@ -4022,6 +4112,13 @@ tunnel_cancel_queues (void *cls, GNUNET_PEER_Id neighbor_id) next = pq->next; if (pq->tunnel == t) { + if (GNUNET_MESSAGE_TYPE_MESH_MULTICAST == pq->type || + GNUNET_MESSAGE_TYPE_MESH_UNICAST == pq->type || + GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == pq->type) + { + // Should have been removed on destroy children + GNUNET_break (0); + } queue_destroy (pq, GNUNET_YES); } } @@ -4050,8 +4147,6 @@ tunnel_destroy (struct MeshTunnel *t) if (NULL == t) return GNUNET_OK; - tree_iterate_children (t->tree, &tunnel_cancel_queues, t); - r = GNUNET_OK; c = t->owner; #if MESH_DEBUG @@ -4069,16 +4164,21 @@ tunnel_destroy (struct MeshTunnel *t) GNUNET_CRYPTO_hash (&t->id, sizeof (struct MESH_TunnelID), &hash); if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } - GNUNET_CRYPTO_hash (&t->local_tid, sizeof (MESH_TunnelNumber), &hash); - if (NULL != c && - GNUNET_YES != - GNUNET_CONTAINER_multihashmap_remove (c->own_tunnels, &hash, t)) + if (NULL != c) { - r = GNUNET_SYSERR; + GNUNET_CRYPTO_hash (&t->local_tid, sizeof (MESH_TunnelNumber), &hash); + if (GNUNET_YES != + GNUNET_CONTAINER_multihashmap_remove (c->own_tunnels, &hash, t)) + { + GNUNET_break (0); + r = GNUNET_SYSERR; + } } + GNUNET_CRYPTO_hash (&t->local_tid_dest, sizeof (MESH_TunnelNumber), &hash); for (i = 0; i < t->nclients; i++) { @@ -4086,6 +4186,7 @@ tunnel_destroy (struct MeshTunnel *t) if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (c->incoming_tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } } @@ -4095,18 +4196,22 @@ tunnel_destroy (struct MeshTunnel *t) if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (c->ignore_tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } } + if (t->nclients > 0) { if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (incoming_tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } GNUNET_free (t->clients); } + if (NULL != t->peers) { GNUNET_CONTAINER_multihashmap_iterate (t->peers, &peer_info_delete_tunnel, @@ -4119,6 +4224,8 @@ tunnel_destroy (struct MeshTunnel *t) t); GNUNET_CONTAINER_multihashmap_destroy (t->children_fc); + tree_iterate_children (t->tree, &tunnel_cancel_queues, t); + tree_destroy (t->tree); if (NULL != t->regex_ctx) @@ -4235,14 +4342,13 @@ tunnel_delete_peer (struct MeshTunnel *t, GNUNET_PEER_Id peer) * @param key the hash of the local tunnel id (used to access the hashmap) * @param value the value stored at the key (tunnel to destroy) * - * @return GNUNET_OK on success + * @return GNUNET_OK, keep iterating. */ static int tunnel_destroy_iterator (void *cls, const struct GNUNET_HashCode * key, void *value) { struct MeshTunnel *t = value; struct MeshClient *c = cls; - int r; send_client_tunnel_disconnect(t, c); if (c != t->owner) @@ -4254,8 +4360,10 @@ tunnel_destroy_iterator (void *cls, const struct GNUNET_HashCode * key, void *va return GNUNET_OK; } tunnel_send_destroy(t); - r = tunnel_destroy (t); - return r; + t->owner = NULL; + t->destroy = GNUNET_YES; + + return GNUNET_OK; } @@ -4269,10 +4377,15 @@ static void tunnel_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct MeshTunnel *t = cls; + struct GNUNET_PeerIdentity id; + t->timeout_task = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - t->timeout_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_PEER_resolve(t->id.oid, &id); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Tunnel %s [%X] timed out. Destroying.\n", + GNUNET_i2s(&id), t->id.tid); tunnel_destroy (t); } @@ -4333,11 +4446,13 @@ send_core_path_create (void *cls, size_t size, void *buf) msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE); msg->tid = ntohl (t->id.tid); + opt = 0; if (GNUNET_YES == t->speed_min) - opt = MESH_TUNNEL_OPT_SPEED_MIN; + opt |= MESH_TUNNEL_OPT_SPEED_MIN; if (GNUNET_YES == t->nobuffer) opt |= MESH_TUNNEL_OPT_NOBUFFER; msg->opt = htonl(opt); + msg->reserved = 0; peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1]; for (i = 0; i < p->length; i++) @@ -4459,32 +4574,72 @@ queue_destroy (struct MeshPeerQueue *queue, int clear_cls) { struct MeshTransmissionDescriptor *dd; struct MeshPathInfo *path_info; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_PeerIdentity id; + unsigned int i; + unsigned int max; if (GNUNET_YES == clear_cls) { switch (queue->type) { - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type payload\n"); + case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " cancelling TUNNEL_DESTROY\n"); + /* fall through */ + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " type prebuilt (payload, tunnel destroy)\n"); dd = queue->cls; data_descriptor_decrement_rc (dd->mesh_data); break; - case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: + case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type create path\n"); path_info = queue->cls; path_destroy (path_info->path); break; - default: + default: GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type unknown!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " type %s unknown!\n", + GNUNET_MESH_DEBUG_M2S(queue->type)); } GNUNET_free_non_null (queue->cls); } GNUNET_CONTAINER_DLL_remove (queue->peer->queue_head, queue->peer->queue_tail, queue); + + /* Delete from child_fc in the appropiate tunnel */ + max = queue->tunnel->fwd_queue_max; + GNUNET_PEER_resolve (queue->peer->id, &id); + cinfo = tunnel_get_neighbor_fc (queue->tunnel, &id); + for (i = 0; i < cinfo->send_buffer_n; i++) + { + unsigned int i2; + i2 = (cinfo->send_buffer_start + i) % max; + if (cinfo->send_buffer[i2] == queue) + { + /* Found corresponding entry in the send_buffer. Move all others back. */ + unsigned int j; + unsigned int j2; + unsigned int j3; + + for (j = i, j2 = 0, j3 = 0; j < cinfo->send_buffer_n - 1; j++) + { + j2 = (cinfo->send_buffer_start + j) % max; + j3 = (cinfo->send_buffer_start + j + 1) % max; + cinfo->send_buffer[j2] = cinfo->send_buffer[j3]; + } + + cinfo->send_buffer[j3] = NULL; + + cinfo->send_buffer_n--; + } + } + //queue-> + GNUNET_free (queue); } @@ -4538,6 +4693,11 @@ queue_get_next (const struct MeshPeerInfo *peer) break; case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: mcast = (struct GNUNET_MESH_Multicast *) info->mesh_data->data; + if (GNUNET_MESSAGE_TYPE_MESH_MULTICAST != ntohs(mcast->header.type)) + { + // Not a multicast payload: multicast control traffic (destroy, etc) + return q; + } pid = ntohl (mcast->pid); GNUNET_PEER_resolve (info->peer->id, &id); cinfo = tunnel_get_neighbor_fc(t, &id); @@ -4585,40 +4745,40 @@ queue_send (void *cls, size_t size, void *buf) struct GNUNET_MessageHeader *msg; struct MeshPeerQueue *queue; struct MeshTunnel *t; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_PeerIdentity dst_id; size_t data_size; peer->core_transmit = NULL; - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* Queue send\n"); - queue = queue_get_next(peer); + queue = queue_get_next (peer); - /* Queue has no internal mesh traffic not sendable payload */ + /* Queue has no internal mesh traffic nor sendable payload */ if (NULL == queue) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not ready, return\n"); if (NULL == peer->queue_head) - GNUNET_break(0); // Should've been canceled + GNUNET_break (0); // Should've been canceled return 0; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not empty\n"); + GNUNET_PEER_resolve (peer->id, &dst_id); /* Check if buffer size is enough for the message */ if (queue->size > size) { - struct GNUNET_PeerIdentity id; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not enough room, reissue\n"); - GNUNET_PEER_resolve (peer->id, &id); peer->core_transmit = - GNUNET_CORE_notify_transmit_ready(core_handle, - 0, - 0, - GNUNET_TIME_UNIT_FOREVER_REL, - &id, - queue->size, - &queue_send, - peer); + GNUNET_CORE_notify_transmit_ready (core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &dst_id, + queue->size, + &queue_send, + peer); return 0; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* size ok\n"); @@ -4658,7 +4818,7 @@ queue_send (void *cls, size_t size, void *buf) tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST); break; case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - tunnel_send_bck_ack(t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); + tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); break; default: break; @@ -4666,18 +4826,34 @@ queue_send (void *cls, size_t size, void *buf) break; case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* multicast\n"); + { + struct MeshTransmissionDescriptor *info = queue->cls; + + if ((1 == info->mesh_data->reference_counter + && GNUNET_YES == t->speed_min) + || + (info->mesh_data->total_out == info->mesh_data->reference_counter + && GNUNET_NO == t->speed_min)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* considered sent\n"); + t->fwd_queue_n--; + } + } data_size = send_core_data_multicast(queue->cls, size, buf); - // FIXME fc substract when? depending on the tunnel conf. - // t->fwd_queue_n--; tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST); break; case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path create\n"); - data_size = send_core_path_create(queue->cls, size, buf); + data_size = send_core_path_create (queue->cls, size, buf); break; case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path ack\n"); - data_size = send_core_path_ack(queue->cls, size, buf); + data_size = send_core_path_ack (queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path keepalive\n"); + data_size = send_core_data_multicast (queue->cls, size, buf); break; default: GNUNET_break (0); @@ -4686,6 +4862,36 @@ queue_send (void *cls, size_t size, void *buf) queue->type); data_size = 0; } + switch (queue->type) + { + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + cinfo = tunnel_get_neighbor_fc (t, &dst_id); + if (cinfo->send_buffer[cinfo->send_buffer_start] != queue) + { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "at pos %u (%p) != %p\n", + cinfo->send_buffer_start, + cinfo->send_buffer[cinfo->send_buffer_start], + queue); + } + if (cinfo->send_buffer_n > 0) + { + cinfo->send_buffer[cinfo->send_buffer_start] = NULL; + cinfo->send_buffer_n--; + cinfo->send_buffer_start++; + cinfo->send_buffer_start %= t->fwd_queue_max; + } + else + { + GNUNET_break (0); + } + break; + default: + break; + } /* Free queue, but cls was freed by send_core_* */ queue_destroy (queue, GNUNET_NO); @@ -4697,7 +4903,8 @@ queue_send (void *cls, size_t size, void *buf) } /* If more data in queue, send next */ - if (NULL != peer->queue_head) + queue = queue_get_next(peer); + if (NULL != queue) { struct GNUNET_PeerIdentity id; @@ -4709,19 +4916,34 @@ queue_send (void *cls, size_t size, void *buf) 0, GNUNET_TIME_UNIT_FOREVER_REL, &id, - peer->queue_head->size, + queue->size, &queue_send, peer); } + else + { + if (NULL != peer->queue_head) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "********* %s stalled\n", + GNUNET_i2s(&my_full_id)); + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* return %d\n", data_size); return data_size; } /** - * Queue and pass message to core when possible. + * @brief Queue and pass message to core when possible. + * + * If type is payload (UNICAST, TO_ORIGIN, MULTICAST) checks for queue status + * and accounts for it. In case the queue is full, the message is dropped and + * a break issued. + * + * Otherwise, message is treated as internal and allowed to go regardless of + * queue status. * - * @param cls Closure (type dependant). + * @param cls Closure (@c type dependant). It will be used by queue_send to + * build the message to be sent if not already prebuilt. * @param type Type of the message, 0 for a raw message. * @param size Size of the message. * @param dst Neighbor to send message to. @@ -4732,8 +4954,11 @@ queue_add (void *cls, uint16_t type, size_t size, struct MeshPeerInfo *dst, struct MeshTunnel *t) { struct MeshPeerQueue *queue; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_PeerIdentity id; unsigned int *max; unsigned int *n; + unsigned int i; n = NULL; if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type || @@ -4754,6 +4979,8 @@ queue_add (void *cls, uint16_t type, size_t size, GNUNET_break_op(0); // TODO: kill connection? else GNUNET_break(0); + GNUNET_STATISTICS_update(stats, "# messages dropped (buffer full)", + 1, GNUNET_NO); return; // Drop message } (*n)++; @@ -4765,20 +4992,41 @@ queue_add (void *cls, uint16_t type, size_t size, queue->peer = dst; queue->tunnel = t; GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue); + GNUNET_PEER_resolve (dst->id, &id); if (NULL == dst->core_transmit) { - struct GNUNET_PeerIdentity id; - - GNUNET_PEER_resolve (dst->id, &id); dst->core_transmit = - GNUNET_CORE_notify_transmit_ready(core_handle, - 0, - 0, - GNUNET_TIME_UNIT_FOREVER_REL, - &id, - size, - &queue_send, - dst); + GNUNET_CORE_notify_transmit_ready (core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + size, + &queue_send, + dst); + } + if (NULL == n) // Is this internal mesh traffic? + return; + + // It's payload, keep track of buffer per peer. + cinfo = tunnel_get_neighbor_fc(t, &id); + i = (cinfo->send_buffer_start + cinfo->send_buffer_n) % t->fwd_queue_max; + if (NULL != cinfo->send_buffer[i]) + { + GNUNET_break (cinfo->send_buffer_n == t->fwd_queue_max); // aka i == start + queue_destroy (cinfo->send_buffer[cinfo->send_buffer_start], GNUNET_YES); + cinfo->send_buffer_start++; + cinfo->send_buffer_start %= t->fwd_queue_max; + } + else + { + cinfo->send_buffer_n++; + } + cinfo->send_buffer[i] = queue; + if (cinfo->send_buffer_n > t->fwd_queue_max) + { + GNUNET_break (0); + cinfo->send_buffer_n = t->fwd_queue_max; } } @@ -4864,6 +5112,9 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_YES : GNUNET_NO; t->nobuffer = (0 != (opt & MESH_TUNNEL_OPT_NOBUFFER)) ? GNUNET_YES : GNUNET_NO; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " speed_min: %d, nobuffer:%d\n", + t->speed_min, t->nobuffer); if (GNUNET_YES == t->nobuffer) { @@ -4937,13 +5188,11 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, if (own_pos == size - 1) { /* It is for us! Send ack. */ - struct MeshTransmissionDescriptor *info; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " It's for us!\n"); peer_info_add_path_to_origin (orig_peer_info, path, GNUNET_NO); if (NULL == t->peers) { - /* New tunnel! Notify clients on data. */ + /* New tunnel! Notify clients on first payload message. */ t->peers = GNUNET_CONTAINER_multihashmap_create (4); } GNUNET_break (GNUNET_SYSERR != @@ -4952,15 +5201,7 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, peer_info_get (&my_full_id), GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE)); - info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); - info->origin = &t->id; - info->peer = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey); - GNUNET_assert (NULL != info->peer); - queue_add(info, - GNUNET_MESSAGE_TYPE_MESH_PATH_ACK, - sizeof (struct GNUNET_MESH_PathACK), - info->peer, - t); + send_path_ack (t); } else { @@ -5052,7 +5293,7 @@ handle_mesh_path_destroy (void *cls, const struct GNUNET_PeerIdentity *peer, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Own position: %u\n", own_pos); if (own_pos < path->length - 1) - send_message (message, &pi[own_pos + 1], t); + send_prebuilt_message (message, &pi[own_pos + 1], t); else send_client_tunnel_disconnect(t, NULL); @@ -5270,7 +5511,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_break_op (0); return GNUNET_OK; } - send_message (message, neighbor, t); + send_prebuilt_message (message, neighbor, t); GNUNET_STATISTICS_update (stats, "# unicast forwarded", 1, GNUNET_NO); return GNUNET_OK; } @@ -5355,7 +5596,7 @@ handle_mesh_data_multicast (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } GNUNET_STATISTICS_update (stats, "# multicast forwarded", 1, GNUNET_NO); - tunnel_send_multicast (t, message, GNUNET_NO); + tunnel_send_multicast (t, message); return GNUNET_OK; } @@ -5435,7 +5676,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id); - send_message (message, &id, t); + send_prebuilt_message (message, &id, t); GNUNET_STATISTICS_update (stats, "# to origin forwarded", 1, GNUNET_NO); return GNUNET_OK; @@ -5484,6 +5725,7 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer, { struct MeshTunnelChildInfo *cinfo; + debug_bck_ack++; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK\n"); cinfo = tunnel_get_neighbor_fc (t, peer); cinfo->fwd_ack = ack; @@ -5598,11 +5840,56 @@ handle_mesh_path_ack (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_break (0); return GNUNET_OK; } - send_message (message, &id, t); + send_prebuilt_message (message, &id, t); return GNUNET_OK; } +/** + * Core handler for mesh keepalives. + * + * @param cls closure + * @param message message + * @param peer peer identity this notification is about + * @param atsi performance data + * @param atsi_count number of records in 'atsi' + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + * + * TODO: Check who we got this from, to validate route. + */ +static int +handle_mesh_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + struct GNUNET_MESH_TunnelKeepAlive *msg; + struct MeshTunnel *t; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got a keepalive packet from %s\n", + GNUNET_i2s (peer)); + + msg = (struct GNUNET_MESH_TunnelKeepAlive *) message; + t = tunnel_get (&msg->oid, ntohl (msg->tid)); + + if (NULL == t) + { + /* TODO notify that we dont know that tunnel */ + GNUNET_STATISTICS_update (stats, "# keepalive on unknown tunnel", 1, GNUNET_NO); + GNUNET_break_op (0); + return GNUNET_OK; + } + + tunnel_reset_timeout (t); + + GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO); + tunnel_send_multicast (t, message); + return GNUNET_OK; + } + + + /** * Functions to handle messages from core */ @@ -5615,6 +5902,8 @@ static struct GNUNET_CORE_MessageHandler core_handlers[] = { sizeof (struct GNUNET_MESH_TunnelDestroy)}, {&handle_mesh_data_unicast, GNUNET_MESSAGE_TYPE_MESH_UNICAST, 0}, {&handle_mesh_data_multicast, GNUNET_MESSAGE_TYPE_MESH_MULTICAST, 0}, + {&handle_mesh_keepalive, GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE, + sizeof (struct GNUNET_MESH_TunnelKeepAlive)}, {&handle_mesh_data_to_orig, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN, 0}, {&handle_mesh_ack, GNUNET_MESSAGE_TYPE_MESH_ACK, sizeof (struct GNUNET_MESH_ACK)}, @@ -5692,46 +5981,34 @@ notify_client_connection_failure (void *cls, size_t size, void *buf) * * @param cls Closure (tunnel for which to send the keepalive). * @param tc Notification context. - * - * TODO: implement explicit multicast keepalive? */ static void path_refresh (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct MeshTunnel *t = cls; - struct GNUNET_MessageHeader *payload; - struct GNUNET_MESH_Multicast *msg; - size_t size = - sizeof (struct GNUNET_MESH_Multicast) + - sizeof (struct GNUNET_MessageHeader); + struct GNUNET_MESH_TunnelKeepAlive *msg; + size_t size = sizeof (struct GNUNET_MESH_TunnelKeepAlive); char cbuf[size]; + t->path_refresh_task = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) { return; } - t->path_refresh_task = GNUNET_SCHEDULER_NO_TASK; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending keepalive for tunnel %d\n", t->id.tid); - msg = (struct GNUNET_MESH_Multicast *) cbuf; + msg = (struct GNUNET_MESH_TunnelKeepAlive *) cbuf; msg->header.size = htons (size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_MULTICAST); - // FIXME: change type to != MULTICAST + msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE); msg->oid = my_full_id; msg->tid = htonl (t->id.tid); - msg->ttl = htonl (default_ttl); - msg->pid = htonl (t->fwd_pid + 1); - t->fwd_pid++; - payload = (struct GNUNET_MessageHeader *) &msg[1]; - payload->size = htons (sizeof (struct GNUNET_MessageHeader)); - payload->type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE); - tunnel_send_multicast (t, &msg->header, GNUNET_YES); + tunnel_send_multicast (t, &msg->header); t->path_refresh_task = GNUNET_SCHEDULER_add_delayed (refresh_path_time, &path_refresh, t); - return; + tunnel_reset_timeout(t); } @@ -5924,7 +6201,8 @@ dht_get_string_handler (void *cls, struct GNUNET_TIME_Absolute exp, const struct GNUNET_PeerIdentity *get_path, unsigned int get_path_length, const struct GNUNET_PeerIdentity *put_path, - unsigned int put_path_length, enum GNUNET_BLOCK_Type type, + unsigned int put_path_length, + enum GNUNET_BLOCK_Type type, size_t size, const void *data) { const struct MeshRegexBlock *block = data; @@ -5969,9 +6247,9 @@ dht_get_string_handler (void *cls, struct GNUNET_TIME_Absolute exp, } return; } - GNUNET_break (GNUNET_OK == - GNUNET_MESH_regex_block_iterate (block, size, - ®ex_edge_iterator, ctx)); + + regex_next_edge (block, size, ctx); + return; } @@ -7279,7 +7557,7 @@ handle_local_multicast (void *cls, struct GNUNET_SERVER_Client *client, handle_mesh_data_multicast (client, &my_full_id, ©->header, NULL, 0); } - /* receive done gets called when last copy is sent to a neighbor */ + GNUNET_SERVER_receive_done (t->owner->handle, GNUNET_OK); return; } @@ -7319,7 +7597,7 @@ handle_local_ack (void *cls, struct GNUNET_SERVER_Client *client, t = tunnel_get_by_local_id (c, tid); if (NULL == t) { - GNUNET_break (0); // FIXME fc + GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Tunnel %X unknown.\n", tid); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " for client %u.\n", c->id); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); @@ -7494,13 +7772,15 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) while (NULL != q) { n = q->next; - if (q->peer == pi) - { - /* try to reroute this traffic instead */ - queue_destroy(q, GNUNET_YES); - } + /* TODO try to reroute this traffic instead */ + queue_destroy(q, GNUNET_YES); q = n; } + if (NULL != pi->core_transmit) + { + GNUNET_CORE_notify_transmit_ready_cancel(pi->core_transmit); + pi->core_transmit = NULL; + } peer_info_remove_path (pi, pi->id, myid); if (myid == pi->id) { @@ -7563,6 +7843,7 @@ shutdown_peer (void *cls, const struct GNUNET_HashCode * key, void *value) return GNUNET_YES; } + /** * Task run during shutdown. * @@ -7579,6 +7860,11 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_CORE_disconnect (core_handle); core_handle = NULL; } + if (NULL != keygen) + { + GNUNET_CRYPTO_rsa_key_create_stop (keygen); + keygen = NULL; + } GNUNET_CONTAINER_multihashmap_iterate (tunnels, &shutdown_tunnel, NULL); GNUNET_CONTAINER_multihashmap_iterate (peers, &shutdown_peer, NULL); if (dht_handle != NULL) @@ -7599,6 +7885,76 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "shut down\n"); } + +/** + * Callback for hostkey read/generation + * + * @param cls NULL + * @param pk the private key + * @param emsg error message + */ +static void +key_generation_cb (void *cls, + struct GNUNET_CRYPTO_RsaPrivateKey *pk, + const char *emsg) +{ + struct MeshPeerInfo *peer; + struct MeshPeerPath *p; + + keygen = NULL; + if (NULL == pk) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Mesh service could not access hostkey. Exiting.\n")); + GNUNET_SCHEDULER_shutdown (); + return; + } + my_private_key = pk; + GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key); + GNUNET_CRYPTO_hash (&my_public_key, sizeof (my_public_key), + &my_full_id.hashPubKey); + myid = GNUNET_PEER_intern (&my_full_id); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Mesh for peer [%s] starting\n", + GNUNET_i2s(&my_full_id)); + +// transport_handle = GNUNET_TRANSPORT_connect(c, +// &my_full_id, +// NULL, +// NULL, +// NULL, +// NULL); + + + + next_tid = 0; + next_local_tid = GNUNET_MESH_LOCAL_TUNNEL_ID_SERV; + + + GNUNET_SERVER_add_handlers (server_handle, client_handlers); + nc = GNUNET_SERVER_notification_context_create (server_handle, 1); + GNUNET_SERVER_disconnect_notify (server_handle, + &handle_local_client_disconnect, NULL); + + + clients = NULL; + clients_tail = NULL; + next_client_id = 0; + + announce_applications_task = GNUNET_SCHEDULER_NO_TASK; + announce_id_task = GNUNET_SCHEDULER_add_now (&announce_id, cls); + + /* Create a peer_info for the local peer */ + peer = peer_info_get (&my_full_id); + p = path_new (1); + p->peers[0] = myid; + GNUNET_PEER_change_rc (myid, 1); + peer_info_add_path (peer, p, GNUNET_YES); + GNUNET_SERVER_resume (server_handle); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Mesh service running\n"); +} + + /** * Process mesh requests. * @@ -7610,8 +7966,6 @@ static void run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) { - struct MeshPeerInfo *peer; - struct MeshPeerPath *p; char *keyfile; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "starting to run\n"); @@ -7752,76 +8106,28 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, dht_replication_level = 10; } - - my_private_key = GNUNET_CRYPTO_rsa_key_create_from_file (keyfile); - GNUNET_free (keyfile); - if (my_private_key == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Mesh service could not access hostkey. Exiting.\n")); - GNUNET_SCHEDULER_shutdown (); - return; - } - GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key); - GNUNET_CRYPTO_hash (&my_public_key, sizeof (my_public_key), - &my_full_id.hashPubKey); - myid = GNUNET_PEER_intern (&my_full_id); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Mesh for peer [%s] starting\n", - GNUNET_i2s(&my_full_id)); - -// transport_handle = GNUNET_TRANSPORT_connect(c, -// &my_full_id, -// NULL, -// NULL, -// NULL, -// NULL); - - dht_handle = GNUNET_DHT_connect (c, 64); - if (dht_handle == NULL) - { - GNUNET_break (0); - } - - stats = GNUNET_STATISTICS_create ("mesh", c); - - - next_tid = 0; - next_local_tid = GNUNET_MESH_LOCAL_TUNNEL_ID_SERV; - tunnels = GNUNET_CONTAINER_multihashmap_create (32); incoming_tunnels = GNUNET_CONTAINER_multihashmap_create (32); peers = GNUNET_CONTAINER_multihashmap_create (32); applications = GNUNET_CONTAINER_multihashmap_create (32); types = GNUNET_CONTAINER_multihashmap_create (32); - GNUNET_SERVER_add_handlers (server_handle, client_handlers); - nc = GNUNET_SERVER_notification_context_create (server_handle, 1); - GNUNET_SERVER_disconnect_notify (server_handle, - &handle_local_client_disconnect, NULL); - - - clients = NULL; - clients_tail = NULL; - next_client_id = 0; - - announce_applications_task = GNUNET_SCHEDULER_NO_TASK; - announce_id_task = GNUNET_SCHEDULER_add_now (&announce_id, cls); - - /* Create a peer_info for the local peer */ - peer = peer_info_get (&my_full_id); - p = path_new (1); - p->peers[0] = myid; - GNUNET_PEER_change_rc (myid, 1); - peer_info_add_path (peer, p, GNUNET_YES); + dht_handle = GNUNET_DHT_connect (c, 64); + if (NULL == dht_handle) + { + GNUNET_break (0); + } + stats = GNUNET_STATISTICS_create ("mesh", c); + GNUNET_SERVER_suspend (server_handle); /* Scheduled the task to clean up when shutdown is called */ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "end of run()\n"); + keygen = GNUNET_CRYPTO_rsa_key_create_start (keyfile, &key_generation_cb, NULL); + GNUNET_free (keyfile); } + /** * The main function for the mesh service. * @@ -7843,5 +8149,9 @@ main (int argc, char *const *argv) INTERVAL_SHOW; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Mesh for peer [%s] FWD ACKs %u, BCK ACKs %u\n", + GNUNET_i2s(&my_full_id), debug_fwd_ack, debug_bck_ack); + return ret; }