X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmesh%2Fgnunet-service-mesh.c;h=7ebe7d199d188696f36268b002a872fa3a1b1ae1;hb=b552dea05cbfacacf1c65c6eb1f54220f4e4beb5;hp=1e2e31d6d8db03411ac4621501b4ac6e5d8f62f4;hpb=374a3ad71d63c8576f83c508a52e3588df3a2523;p=oweals%2Fgnunet.git diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index 1e2e31d6d..7ebe7d199 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -512,8 +512,9 @@ struct MeshTunnelChildInfo uint32_t bck_ack; /** - * Circular buffer pointing to MeshPeerQueue elements. - * Size determined by the tunnel queue size. + * Circular buffer pointing to MeshPeerQueue elements for all + * payload traffic going to this child. + * Size determined by the tunnel queue size (@c t->fwd_queue_max). */ struct MeshPeerQueue **send_buffer; @@ -1140,20 +1141,18 @@ tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c); /** - * Iterator over edges in a regex block retrieved from the DHT. + * Jump to the next edge, with the longest matching token. * - * @param cls Closure. - * @param token Token that follows to next state. - * @param len Lenght of token. - * @param key Hash of next state. + * @param block Block found in the DHT. + * @param size Size of the block. + * @param ctx Context of the search. * * @return GNUNET_YES if should keep iterating, GNUNET_NO otherwise. */ -static int -regex_edge_iterator (void *cls, - const char *token, - size_t len, - const struct GNUNET_HashCode *key); +static void +regex_next_edge (const struct MeshRegexBlock *block, + size_t size, + struct MeshRegexSearchContext *ctx); /** @@ -1169,9 +1168,17 @@ regex_find_path (const struct GNUNET_HashCode *key, /** - * Queue and pass message to core when possible. + * @brief Queue and pass message to core when possible. + * + * If type is payload (UNICAST, TO_ORIGIN, MULTICAST) checks for queue status + * and accounts for it. In case the queue is full, the message is dropped and + * a break issued. + * + * Otherwise, message is treated as internal and allowed to go regardless of + * queue status. * - * @param cls Closure (type dependant). + * @param cls Closure (@c type dependant). It will be used by queue_send to + * build the message to be sent if not already prebuilt. * @param type Type of the message, 0 for a raw message. * @param size Size of the message. * @param dst Neighbor to send message to. @@ -1255,8 +1262,7 @@ regex_result_iterator (void *cls, ntohl(block->accepting)); } - (void) GNUNET_MESH_regex_block_iterate (block, SIZE_MAX, - ®ex_edge_iterator, ctx); + regex_next_edge(block, SIZE_MAX, ctx); return GNUNET_YES; } @@ -1310,6 +1316,10 @@ regex_edge_iterator (void *cls, ctx->longest_match = len; ctx->hash = *key; } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Token is not longer, IGNORE\n"); + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* End of regex edge iterator\n"); return GNUNET_YES; @@ -1319,14 +1329,13 @@ regex_edge_iterator (void *cls, /** * Jump to the next edge, with the longest matching token. * - * @param cls Closure (context of the search). - * @param token Token that follows to next state. - * @param len Lenght of token. - * @param key Hash of next state. + * @param block Block found in the DHT. + * @param size Size of the block. + * @param ctx Context of the search. * * @return GNUNET_YES if should keep iterating, GNUNET_NO otherwise. */ -static int +static void regex_next_edge (const struct MeshRegexBlock *block, size_t size, struct MeshRegexSearchContext *ctx) @@ -1335,14 +1344,25 @@ regex_next_edge (const struct MeshRegexBlock *block, struct MeshRegexSearchInfo *info = ctx->info; struct GNUNET_DHT_GetHandle *get_h; - GNUNET_break (GNUNET_OK == - GNUNET_MESH_regex_block_iterate (block, size, - ®ex_edge_iterator, ctx)); + int result; + + /* Find the longest match for the current string position, + * among tokens in the given block */ + ctx->longest_match = 0; + result = GNUNET_MESH_regex_block_iterate (block, size, + ®ex_edge_iterator, ctx); + GNUNET_break (GNUNET_OK == result || SIZE_MAX == size); + + /* Did anything match? */ + if (0 == ctx->longest_match) + return; new_ctx = GNUNET_malloc (sizeof (struct MeshRegexSearchContext)); new_ctx->info = info; new_ctx->position = ctx->position + ctx->longest_match; GNUNET_array_append (info->contexts, info->n_contexts, new_ctx); + + /* Check whether we already have a DHT GET running for it */ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains(info->dht_get_handles, &ctx->hash)) { @@ -1351,8 +1371,9 @@ regex_next_edge (const struct MeshRegexBlock *block, &ctx->hash, ®ex_result_iterator, new_ctx); - return GNUNET_YES; // We are already looking for it + return; // We are already looking for it } + /* Start search in DHT */ get_h = GNUNET_DHT_get_start (dht_handle, /* handle */ @@ -1370,10 +1391,8 @@ regex_next_edge (const struct MeshRegexBlock *block, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)) { GNUNET_break (0); - return GNUNET_YES; + return; } - - return GNUNET_YES; } @@ -2293,9 +2312,9 @@ send_core_data_raw (void *cls, size_t size, void *buf) * @param t Tunnel on which this message is transmitted. */ static void -send_message (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - struct MeshTunnel *t) +send_prebuilt_message (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + struct MeshTunnel *t) { struct MeshTransmissionDescriptor *info; struct MeshPeerInfo *neighbor; @@ -2363,16 +2382,7 @@ send_message (const struct GNUNET_MessageHeader *message, } info->peer = neighbor; if (GNUNET_MESSAGE_TYPE_MESH_PATH_ACK == type) - { - /* - * TODO: in this case we only need the service to retransmit - * the message down the path. If we pass the real type to queue_add, - * queue_send will try to build the message from scratch. This can - * probably be done by some other way instead of deleteing the type - * info. - */ type = 0; - } queue_add (info, type, size, @@ -2470,12 +2480,39 @@ send_destroy_path (struct MeshTunnel *t, GNUNET_PEER_Id destination) { GNUNET_PEER_resolve (p->peers[i], &pi[i]); } - send_message (&msg->header, tree_get_first_hop (t->tree, destination), t); + send_prebuilt_message (&msg->header, tree_get_first_hop (t->tree, destination), t); } path_destroy (p); } +/** + * Sends a PATH ACK message in reponse to a received PATH_CREATE directed to us. + * + * @param t Tunnel which to confirm. + */ +static void +send_path_ack (struct MeshTunnel *t) +{ + struct MeshTransmissionDescriptor *info; + struct GNUNET_PeerIdentity id; + GNUNET_PEER_Id peer; + + peer = tree_get_predecessor (t->tree); + GNUNET_PEER_resolve (peer, &id); + info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); + info->origin = &t->id; + info->peer = GNUNET_CONTAINER_multihashmap_get (peers, &id.hashPubKey); + GNUNET_assert (NULL != info->peer); + + queue_add (info, + GNUNET_MESSAGE_TYPE_MESH_PATH_ACK, + sizeof (struct GNUNET_MESH_PathACK), + info->peer, + t); +} + + /** * Try to establish a new connection to this peer. * Use the best path for the given tunnel. @@ -3083,7 +3120,10 @@ tunnel_delete_client (struct MeshTunnel *t, const struct MeshClient *c) /** - * Iterator to free MeshTunnelChildInfo of tunnel children. + * @brief Iterator to destroy MeshTunnelChildInfo of tunnel children. + * + * Destroys queue elements of all waiting transmissions and frees all memory + * used by the struct and its elements. * * @param cls Closure (tunnel info). * @param key Hash of GNUNET_PEER_Id (unused). @@ -3105,7 +3145,7 @@ tunnel_destroy_child (void *cls, { i = (cinfo->send_buffer_start + c) % t->fwd_queue_max; if (NULL != cinfo->send_buffer[i]) - queue_destroy(cinfo->send_buffer[i], GNUNET_YES); + queue_destroy (cinfo->send_buffer[i], GNUNET_YES); else GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u %u\n", c, cinfo->send_buffer_n); @@ -3296,7 +3336,7 @@ tunnel_notify_connection_broken (struct MeshTunnel *t, GNUNET_PEER_Id p1, msg.peer1 = my_full_id; GNUNET_PEER_resolve (pid, &msg.peer2); GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &neighbor); - send_message (&msg.header, &neighbor, t); + send_prebuilt_message (&msg.header, &neighbor, t); } } return pid; @@ -3315,6 +3355,7 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) struct MeshData *mdata = cls; struct MeshTransmissionDescriptor *info; struct GNUNET_PeerIdentity neighbor; + struct GNUNET_MessageHeader *msg; info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); @@ -3326,8 +3367,9 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) GNUNET_i2s (&neighbor)); info->peer = peer_info_get (&neighbor); GNUNET_assert (NULL != info->peer); + msg = (struct GNUNET_MessageHeader *) mdata->data; queue_add(info, - GNUNET_MESSAGE_TYPE_MESH_MULTICAST, + ntohs (msg->type), info->mesh_data->data_len, info->peer, mdata->t); @@ -3340,14 +3382,10 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) * * @param t Tunnel in which to send the data. * @param msg Message to be sent. - * @param internal DEPRECATED Has the service generated this message? - * - * FIXME remove internal if no use comes up */ static void tunnel_send_multicast (struct MeshTunnel *t, - const struct GNUNET_MessageHeader *msg, - int internal) + const struct GNUNET_MessageHeader *msg) { struct MeshData *mdata; @@ -3561,6 +3599,7 @@ tunnel_get_children_fwd_ack (struct MeshTunnel *t) ctx.t = t; ctx.max_child_ack = 0; ctx.nchildren = 0; + ctx.init = GNUNET_NO; tree_iterate_children (t->tree, tunnel_get_child_fwd_ack, &ctx); if (0 == ctx.nchildren) @@ -3662,7 +3701,7 @@ tunnel_get_fwd_ack (struct MeshTunnel *t) count = t->fwd_pid - t->skip; buffer_free = t->fwd_queue_max - t->fwd_queue_n; - ack = count + buffer_free; // Might overflow 32 bits, it's ok! + ack = count; child_ack = tunnel_get_children_fwd_ack (t); client_ack = tunnel_get_clients_fwd_ack (t); if (-1LL == child_ack) @@ -3673,16 +3712,16 @@ tunnel_get_fwd_ack (struct MeshTunnel *t) } if (-1LL == client_ack) { - client_ack = ack; + client_ack = ack + buffer_free; // Might overflow 32 bits, it's ok! } if (GNUNET_YES == t->speed_min) { - ack = GMC_min_pid ((uint32_t) child_ack, ack); + ack = GMC_min_pid ((uint32_t) child_ack, ack) + buffer_free; // Might overflow 32 bits, it's ok!; ack = GMC_min_pid ((uint32_t) client_ack, ack); } else { - ack = GMC_max_pid ((uint32_t) child_ack, ack); + ack = GMC_max_pid ((uint32_t) child_ack, ack) + buffer_free; // Might overflow 32 bits, it's ok!; ack = GMC_max_pid ((uint32_t) client_ack, ack); } if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ack, t->fwd_pid)) @@ -3734,7 +3773,7 @@ send_ack (struct MeshTunnel *t, struct GNUNET_PeerIdentity *peer, uint32_t ack) msg.pid = htonl (ack); msg.tid = htonl (t->id.tid); - send_message (&msg.header, peer, t); + send_prebuilt_message (&msg.header, peer, t); } @@ -3808,7 +3847,7 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) } /* Check if we need no retransmit the ACK */ - if (t->fwd_queue_max > t->fwd_queue_n * 2 && + if (t->fwd_queue_max > t->fwd_queue_n * 4 && GMC_is_pid_bigger(t->last_fwd_ack, t->fwd_pid)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer free\n"); @@ -4049,7 +4088,7 @@ tunnel_send_destroy (struct MeshTunnel *t) msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY); GNUNET_PEER_resolve (t->id.oid, &msg.oid); msg.tid = htonl (t->id.tid); - tunnel_send_multicast (t, &msg.header, GNUNET_NO); + tunnel_send_multicast (t, &msg.header); } @@ -4125,16 +4164,21 @@ tunnel_destroy (struct MeshTunnel *t) GNUNET_CRYPTO_hash (&t->id, sizeof (struct MESH_TunnelID), &hash); if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } - GNUNET_CRYPTO_hash (&t->local_tid, sizeof (MESH_TunnelNumber), &hash); - if (NULL != c && - GNUNET_YES != - GNUNET_CONTAINER_multihashmap_remove (c->own_tunnels, &hash, t)) + if (NULL != c) { - r = GNUNET_SYSERR; + GNUNET_CRYPTO_hash (&t->local_tid, sizeof (MESH_TunnelNumber), &hash); + if (GNUNET_YES != + GNUNET_CONTAINER_multihashmap_remove (c->own_tunnels, &hash, t)) + { + GNUNET_break (0); + r = GNUNET_SYSERR; + } } + GNUNET_CRYPTO_hash (&t->local_tid_dest, sizeof (MESH_TunnelNumber), &hash); for (i = 0; i < t->nclients; i++) { @@ -4142,6 +4186,7 @@ tunnel_destroy (struct MeshTunnel *t) if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (c->incoming_tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } } @@ -4151,18 +4196,22 @@ tunnel_destroy (struct MeshTunnel *t) if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (c->ignore_tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } } + if (t->nclients > 0) { if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (incoming_tunnels, &hash, t)) { + GNUNET_break (0); r = GNUNET_SYSERR; } GNUNET_free (t->clients); } + if (NULL != t->peers) { GNUNET_CONTAINER_multihashmap_iterate (t->peers, &peer_info_delete_tunnel, @@ -4293,14 +4342,13 @@ tunnel_delete_peer (struct MeshTunnel *t, GNUNET_PEER_Id peer) * @param key the hash of the local tunnel id (used to access the hashmap) * @param value the value stored at the key (tunnel to destroy) * - * @return GNUNET_OK on success + * @return GNUNET_OK, keep iterating. */ static int tunnel_destroy_iterator (void *cls, const struct GNUNET_HashCode * key, void *value) { struct MeshTunnel *t = value; struct MeshClient *c = cls; - int r; send_client_tunnel_disconnect(t, c); if (c != t->owner) @@ -4312,8 +4360,10 @@ tunnel_destroy_iterator (void *cls, const struct GNUNET_HashCode * key, void *va return GNUNET_OK; } tunnel_send_destroy(t); - r = tunnel_destroy (t); - return r; + t->owner = NULL; + t->destroy = GNUNET_YES; + + return GNUNET_OK; } @@ -4327,10 +4377,15 @@ static void tunnel_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct MeshTunnel *t = cls; + struct GNUNET_PeerIdentity id; + t->timeout_task = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - t->timeout_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_PEER_resolve(t->id.oid, &id); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Tunnel %s [%X] timed out. Destroying.\n", + GNUNET_i2s(&id), t->id.tid); tunnel_destroy (t); } @@ -4397,6 +4452,7 @@ send_core_path_create (void *cls, size_t size, void *buf) if (GNUNET_YES == t->nobuffer) opt |= MESH_TUNNEL_OPT_NOBUFFER; msg->opt = htonl(opt); + msg->reserved = 0; peer_ptr = (struct GNUNET_PeerIdentity *) &msg[1]; for (i = 0; i < p->length; i++) @@ -4518,24 +4574,32 @@ queue_destroy (struct MeshPeerQueue *queue, int clear_cls) { struct MeshTransmissionDescriptor *dd; struct MeshPathInfo *path_info; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_PeerIdentity id; + unsigned int i; + unsigned int max; if (GNUNET_YES == clear_cls) { switch (queue->type) { - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type payload\n"); + case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " cancelling TUNNEL_DESTROY\n"); + /* fall through */ + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " type prebuilt (payload, tunnel destroy)\n"); dd = queue->cls; data_descriptor_decrement_rc (dd->mesh_data); break; - case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: + case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type create path\n"); path_info = queue->cls; path_destroy (path_info->path); break; - default: + default: GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " type %s unknown!\n", @@ -4546,6 +4610,36 @@ queue_destroy (struct MeshPeerQueue *queue, int clear_cls) GNUNET_CONTAINER_DLL_remove (queue->peer->queue_head, queue->peer->queue_tail, queue); + + /* Delete from child_fc in the appropiate tunnel */ + max = queue->tunnel->fwd_queue_max; + GNUNET_PEER_resolve (queue->peer->id, &id); + cinfo = tunnel_get_neighbor_fc (queue->tunnel, &id); + for (i = 0; i < cinfo->send_buffer_n; i++) + { + unsigned int i2; + i2 = (cinfo->send_buffer_start + i) % max; + if (cinfo->send_buffer[i2] == queue) + { + /* Found corresponding entry in the send_buffer. Move all others back. */ + unsigned int j; + unsigned int j2; + unsigned int j3; + + for (j = i, j2 = 0, j3 = 0; j < cinfo->send_buffer_n - 1; j++) + { + j2 = (cinfo->send_buffer_start + j) % max; + j3 = (cinfo->send_buffer_start + j + 1) % max; + cinfo->send_buffer[j2] = cinfo->send_buffer[j3]; + } + + cinfo->send_buffer[j3] = NULL; + + cinfo->send_buffer_n--; + } + } + //queue-> + GNUNET_free (queue); } @@ -4656,16 +4750,16 @@ queue_send (void *cls, size_t size, void *buf) size_t data_size; peer->core_transmit = NULL; - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* Queue send\n"); - queue = queue_get_next(peer); + queue = queue_get_next (peer); - /* Queue has no internal mesh traffic not sendable payload */ + /* Queue has no internal mesh traffic nor sendable payload */ if (NULL == queue) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not ready, return\n"); if (NULL == peer->queue_head) - GNUNET_break(0); // Should've been canceled + GNUNET_break (0); // Should've been canceled return 0; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not empty\n"); @@ -4674,19 +4768,17 @@ queue_send (void *cls, size_t size, void *buf) /* Check if buffer size is enough for the message */ if (queue->size > size) { - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not enough room, reissue\n"); peer->core_transmit = - GNUNET_CORE_notify_transmit_ready(core_handle, - 0, - 0, - GNUNET_TIME_UNIT_FOREVER_REL, - &dst_id, - queue->size, - &queue_send, - peer); + GNUNET_CORE_notify_transmit_ready (core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &dst_id, + queue->size, + &queue_send, + peer); return 0; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* size ok\n"); @@ -4726,7 +4818,7 @@ queue_send (void *cls, size_t size, void *buf) tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST); break; case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - tunnel_send_bck_ack(t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); + tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); break; default: break; @@ -4753,11 +4845,15 @@ queue_send (void *cls, size_t size, void *buf) break; case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path create\n"); - data_size = send_core_path_create(queue->cls, size, buf); + data_size = send_core_path_create (queue->cls, size, buf); break; case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path ack\n"); - data_size = send_core_path_ack(queue->cls, size, buf); + data_size = send_core_path_ack (queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path keepalive\n"); + data_size = send_core_data_multicast (queue->cls, size, buf); break; default: GNUNET_break (0); @@ -4771,21 +4867,27 @@ queue_send (void *cls, size_t size, void *buf) case GNUNET_MESSAGE_TYPE_MESH_UNICAST: case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: - cinfo = tunnel_get_neighbor_fc(t, &dst_id); + cinfo = tunnel_get_neighbor_fc (t, &dst_id); if (cinfo->send_buffer[cinfo->send_buffer_start] != queue) { - GNUNET_break(0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "at pos %u (%p) != %p\n", cinfo->send_buffer_start, cinfo->send_buffer[cinfo->send_buffer_start], queue); } - GNUNET_break(cinfo->send_buffer_n > 0); - cinfo->send_buffer[cinfo->send_buffer_start] = NULL; - cinfo->send_buffer_n--; - cinfo->send_buffer_start++; - cinfo->send_buffer_start %= t->fwd_queue_max; + if (cinfo->send_buffer_n > 0) + { + cinfo->send_buffer[cinfo->send_buffer_start] = NULL; + cinfo->send_buffer_n--; + cinfo->send_buffer_start++; + cinfo->send_buffer_start %= t->fwd_queue_max; + } + else + { + GNUNET_break (0); + } break; default: break; @@ -4831,9 +4933,17 @@ queue_send (void *cls, size_t size, void *buf) /** - * Queue and pass message to core when possible. + * @brief Queue and pass message to core when possible. + * + * If type is payload (UNICAST, TO_ORIGIN, MULTICAST) checks for queue status + * and accounts for it. In case the queue is full, the message is dropped and + * a break issued. + * + * Otherwise, message is treated as internal and allowed to go regardless of + * queue status. * - * @param cls Closure (type dependant). + * @param cls Closure (@c type dependant). It will be used by queue_send to + * build the message to be sent if not already prebuilt. * @param type Type of the message, 0 for a raw message. * @param size Size of the message. * @param dst Neighbor to send message to. @@ -4886,14 +4996,14 @@ queue_add (void *cls, uint16_t type, size_t size, if (NULL == dst->core_transmit) { dst->core_transmit = - GNUNET_CORE_notify_transmit_ready(core_handle, - 0, - 0, - GNUNET_TIME_UNIT_FOREVER_REL, - &id, - size, - &queue_send, - dst); + GNUNET_CORE_notify_transmit_ready (core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + size, + &queue_send, + dst); } if (NULL == n) // Is this internal mesh traffic? return; @@ -4904,13 +5014,15 @@ queue_add (void *cls, uint16_t type, size_t size, if (NULL != cinfo->send_buffer[i]) { GNUNET_break (cinfo->send_buffer_n == t->fwd_queue_max); // aka i == start - queue_destroy(cinfo->send_buffer[cinfo->send_buffer_start], GNUNET_YES); + queue_destroy (cinfo->send_buffer[cinfo->send_buffer_start], GNUNET_YES); cinfo->send_buffer_start++; cinfo->send_buffer_start %= t->fwd_queue_max; - cinfo->send_buffer_n--; + } + else + { + cinfo->send_buffer_n++; } cinfo->send_buffer[i] = queue; - cinfo->send_buffer_n++; if (cinfo->send_buffer_n > t->fwd_queue_max) { GNUNET_break (0); @@ -5076,13 +5188,11 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, if (own_pos == size - 1) { /* It is for us! Send ack. */ - struct MeshTransmissionDescriptor *info; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " It's for us!\n"); peer_info_add_path_to_origin (orig_peer_info, path, GNUNET_NO); if (NULL == t->peers) { - /* New tunnel! Notify clients on data. */ + /* New tunnel! Notify clients on first payload message. */ t->peers = GNUNET_CONTAINER_multihashmap_create (4); } GNUNET_break (GNUNET_SYSERR != @@ -5091,15 +5201,7 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, peer_info_get (&my_full_id), GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE)); - info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); - info->origin = &t->id; - info->peer = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey); - GNUNET_assert (NULL != info->peer); - queue_add(info, - GNUNET_MESSAGE_TYPE_MESH_PATH_ACK, - sizeof (struct GNUNET_MESH_PathACK), - info->peer, - t); + send_path_ack (t); } else { @@ -5191,7 +5293,7 @@ handle_mesh_path_destroy (void *cls, const struct GNUNET_PeerIdentity *peer, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Own position: %u\n", own_pos); if (own_pos < path->length - 1) - send_message (message, &pi[own_pos + 1], t); + send_prebuilt_message (message, &pi[own_pos + 1], t); else send_client_tunnel_disconnect(t, NULL); @@ -5409,7 +5511,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_break_op (0); return GNUNET_OK; } - send_message (message, neighbor, t); + send_prebuilt_message (message, neighbor, t); GNUNET_STATISTICS_update (stats, "# unicast forwarded", 1, GNUNET_NO); return GNUNET_OK; } @@ -5494,7 +5596,7 @@ handle_mesh_data_multicast (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } GNUNET_STATISTICS_update (stats, "# multicast forwarded", 1, GNUNET_NO); - tunnel_send_multicast (t, message, GNUNET_NO); + tunnel_send_multicast (t, message); return GNUNET_OK; } @@ -5574,7 +5676,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id); - send_message (message, &id, t); + send_prebuilt_message (message, &id, t); GNUNET_STATISTICS_update (stats, "# to origin forwarded", 1, GNUNET_NO); return GNUNET_OK; @@ -5738,7 +5840,7 @@ handle_mesh_path_ack (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_break (0); return GNUNET_OK; } - send_message (message, &id, t); + send_prebuilt_message (message, &id, t); return GNUNET_OK; } @@ -5782,7 +5884,7 @@ handle_mesh_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, tunnel_reset_timeout (t); GNUNET_STATISTICS_update (stats, "# keepalives forwarded", 1, GNUNET_NO); - tunnel_send_multicast (t, message, GNUNET_NO); + tunnel_send_multicast (t, message); return GNUNET_OK; } @@ -5902,11 +6004,11 @@ path_refresh (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE); msg->oid = my_full_id; msg->tid = htonl (t->id.tid); - tunnel_send_multicast (t, &msg->header, GNUNET_YES); + tunnel_send_multicast (t, &msg->header); t->path_refresh_task = GNUNET_SCHEDULER_add_delayed (refresh_path_time, &path_refresh, t); - return; + tunnel_reset_timeout(t); } @@ -6099,7 +6201,8 @@ dht_get_string_handler (void *cls, struct GNUNET_TIME_Absolute exp, const struct GNUNET_PeerIdentity *get_path, unsigned int get_path_length, const struct GNUNET_PeerIdentity *put_path, - unsigned int put_path_length, enum GNUNET_BLOCK_Type type, + unsigned int put_path_length, + enum GNUNET_BLOCK_Type type, size_t size, const void *data) { const struct MeshRegexBlock *block = data; @@ -7669,13 +7772,15 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) while (NULL != q) { n = q->next; - if (q->peer == pi) - { - /* try to reroute this traffic instead */ - queue_destroy(q, GNUNET_YES); - } + /* TODO try to reroute this traffic instead */ + queue_destroy(q, GNUNET_YES); q = n; } + if (NULL != pi->core_transmit) + { + GNUNET_CORE_notify_transmit_ready_cancel(pi->core_transmit); + pi->core_transmit = NULL; + } peer_info_remove_path (pi, pi->id, myid); if (myid == pi->id) { @@ -8047,7 +8152,6 @@ main (int argc, char *const *argv) GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Mesh for peer [%s] FWD ACKs %u, BCK ACKs %u\n", GNUNET_i2s(&my_full_id), debug_fwd_ack, debug_bck_ack); - return ret; }