X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmesh%2Fgnunet-service-mesh.c;h=dba4116cf4df036e27ea3c88b6a720c15fba64f5;hb=f4d040c0f0dd2fef3d73b1f4532c76219f760f75;hp=be12e7d17fcc5fba257840898769225adb071dd4;hpb=c5fc7ffdb33524f2758b1ec4566348eb886239c7;p=oweals%2Fgnunet.git diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index be12e7d17..dba4116cf 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -59,7 +59,7 @@ #define MESH_DEBUG_DHT GNUNET_NO #define MESH_DEBUG_CONNECTION GNUNET_NO -#define MESH_DEBUG_TIMING __LINUX__ && GNUNET_YES +#define MESH_DEBUG_TIMING __LINUX__ && GNUNET_NO #if MESH_DEBUG_CONNECTION #define DEBUG_CONN(...) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__) @@ -170,6 +170,22 @@ struct MeshPeerQueue }; +/** + * Struct to store regex information announced by clients. + */ +struct MeshRegexDescriptor +{ + /** + * Regular expression itself. + */ + char *regex; + + /** + * How many characters per edge can we squeeze? + */ + uint16_t compression; +}; + /** * Struct containing all info possibly needed to build a package when called * back by core. @@ -205,6 +221,11 @@ struct MeshPeerInfo */ struct GNUNET_TIME_Absolute last_contact; + /** + * Task handler for delayed connect task; + */ + GNUNET_SCHEDULER_TaskIdentifier connect_task; + /** * Number of attempts to reconnect so far */ @@ -373,6 +394,11 @@ struct MeshTunnel */ unsigned int bck_queue_max; + /** + * Task to poll peer in case of a stall. + */ + GNUNET_SCHEDULER_TaskIdentifier fc_poll_bck; + /** * Last time the tunnel was used */ @@ -499,7 +525,12 @@ struct MeshTunnelChildInfo /** * Last sent PID. */ - uint32_t pid; + uint32_t fwd_pid; + + /** + * Last received PID. + */ + uint32_t bck_pid; /** * Maximum PID allowed (FWD ACK received). @@ -527,6 +558,16 @@ struct MeshTunnelChildInfo * How many elements are already in the buffer. */ unsigned int send_buffer_n; + + /** + * Tunnel this info is about + */ + struct MeshTunnel *t; + + /** + * Task to poll peer in case of a stall. + */ + GNUNET_SCHEDULER_TaskIdentifier fc_poll; }; @@ -666,7 +707,7 @@ struct MeshClient /** * Regular expressions describing the services offered by this client. */ - char **regexes; // FIXME add timeout? API to remove a regex? + struct MeshRegexDescriptor *regexes; // FIXME regex add timeout? API to remove a regex? /** * Number of regular expressions in regexes. @@ -954,8 +995,6 @@ unsigned int next_client_id; /*********************** DECLARATIONS **************************/ /******************************************************************************/ -/* FIXME move declarations here */ - /** * Function to process paths received for a new peer addition. The recorded * paths form the initial tunnel, which can be optimized later. @@ -1264,6 +1303,8 @@ regex_result_iterator (void *cls, } regex_next_edge(block, SIZE_MAX, ctx); + GNUNET_STATISTICS_update (stats, "# regex mesh blocks iterated", 1, GNUNET_NO); + return GNUNET_YES; } @@ -1289,6 +1330,8 @@ regex_edge_iterator (void *cls, char *current; size_t current_len; + GNUNET_STATISTICS_update (stats, "# regex edges iterated", 1, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Start of regex edge iterator\n"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* descr : %s\n", info->description); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* posit : %u\n", ctx->position); @@ -1374,6 +1417,8 @@ regex_next_edge (const struct MeshRegexBlock *block, return; // We are already looking for it } + GNUNET_STATISTICS_update (stats, "# regex nodes traversed", 1, GNUNET_NO); + /* Start search in DHT */ get_h = GNUNET_DHT_get_start (dht_handle, /* handle */ @@ -1557,15 +1602,17 @@ regex_iterator (void *cls, * @param regex The regular expresion. */ static void -regex_put (const char *regex) +regex_put (const struct MeshRegexDescriptor *regex) { struct GNUNET_REGEX_Automaton *dfa; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "regex_put (%s) start\n", regex); - dfa = GNUNET_REGEX_construct_dfa (regex, strlen(regex)); + DEBUG_DHT (" regex_put (%s) start\n", regex->regex); + dfa = GNUNET_REGEX_construct_dfa (regex->regex, + strlen(regex->regex), + regex->compression); GNUNET_REGEX_iterate_all_edges (dfa, ®ex_iterator, NULL); GNUNET_REGEX_automaton_destroy (dfa); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "regex_put (%s) end\n", regex); + DEBUG_DHT (" regex_put (%s) end\n", regex); } @@ -1746,9 +1793,9 @@ announce_regex (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct MeshClient *c = cls; unsigned int i; + c->regex_announce_task = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) { - c->regex_announce_task = GNUNET_SCHEDULER_NO_TASK; return; } @@ -1756,10 +1803,11 @@ announce_regex (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) for (i = 0; i < c->n_regex; i++) { - regex_put (c->regexes[i]); + regex_put (&c->regexes[i]); } - c->regex_announce_task = - GNUNET_SCHEDULER_add_delayed (app_announce_time, &announce_regex, cls); + c->regex_announce_task = GNUNET_SCHEDULER_add_delayed (app_announce_time, + &announce_regex, + cls); DEBUG_DHT ("Finished PUT for regex\n"); return; @@ -2085,6 +2133,11 @@ send_subscribed_clients (const struct GNUNET_MessageHeader *msg, tmsg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_TUNNEL_CREATE); GNUNET_PEER_resolve (t->id.oid, &tmsg.peer); tmsg.tunnel_id = htonl (t->local_tid_dest); + tmsg.opt = 0; + if (GNUNET_YES == t->speed_min) + tmsg.opt |= MESH_TUNNEL_OPT_SPEED_MIN; + if (GNUNET_YES == t->nobuffer) + tmsg.opt |= MESH_TUNNEL_OPT_NOBUFFER; GNUNET_SERVER_notification_context_unicast (nc, c->handle, &tmsg.header, GNUNET_NO); tunnel_add_client (t, c); @@ -2330,12 +2383,19 @@ send_prebuilt_message (const struct GNUNET_MessageHeader *message, info->mesh_data->data = GNUNET_malloc (size); memcpy (info->mesh_data->data, message, size); type = ntohs(message->type); - if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) + switch (type) { struct GNUNET_MESH_Unicast *m; + struct GNUNET_MESH_ToOrigin *to; - m = (struct GNUNET_MESH_Unicast *) info->mesh_data->data; - m->ttl = htonl (ntohl (m->ttl) - 1); + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + m = (struct GNUNET_MESH_Unicast *) info->mesh_data->data; + m->ttl = htonl (ntohl (m->ttl) - 1); + break; + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + to = (struct GNUNET_MESH_ToOrigin *) info->mesh_data->data; + t->bck_pid++; + to->pid = htonl(t->bck_pid); } info->mesh_data->data_len = size; info->mesh_data->reference_counter = 1; @@ -2598,6 +2658,8 @@ peer_info_connect_task (void *cls, { struct MeshPathInfo *path_info = cls; + path_info->peer->connect_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) { GNUNET_free (cls); @@ -2645,6 +2707,10 @@ peer_info_destroy (struct MeshPeerInfo *pi) path_destroy (p); p = nextp; } + if (GNUNET_SCHEDULER_NO_TASK != pi->connect_task) + { + GNUNET_free (GNUNET_SCHEDULER_cancel (pi->connect_task)); + } GNUNET_free (pi); return GNUNET_OK; } @@ -2850,6 +2916,42 @@ peer_info_add_path_to_origin (struct MeshPeerInfo *peer_info, } +/** + * Function called if the connection to the peer has been stalled for a while, + * possibly due to a missed ACK. Poll the peer about its ACK status. + * + * @param cls Closure (info about regex search). + * @param tc TaskContext. + */ +static void +tunnel_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct MeshTunnelChildInfo *cinfo = cls; + struct GNUNET_MESH_Poll msg; + struct GNUNET_PeerIdentity id; + struct MeshTunnel *t; + + return; // FIXME fc activate + cinfo->fc_poll = GNUNET_SCHEDULER_NO_TASK; + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + { + return; + } + + t = cinfo->t; + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL); + msg.header.size = htons (sizeof (msg)); + msg.tid = htonl (t->id.tid); + GNUNET_PEER_resolve (t->id.oid, &msg.oid); + msg.last_ack = htonl (cinfo->fwd_ack); + + GNUNET_PEER_resolve (tree_get_predecessor(cinfo->t->tree), &id); + send_prebuilt_message (&msg.header, &id, cinfo->t); + cinfo->fc_poll = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_SECONDS, + &tunnel_poll, cinfo); +} + + /** * Build a PeerPath from the paths returned from the DHT, reversing the paths * to obtain a local peer -> destination path and interning the peer ids. @@ -3138,14 +3240,17 @@ tunnel_destroy_child (void *cls, { struct MeshTunnelChildInfo *cinfo = value; struct MeshTunnel *t = cls; + struct MeshPeerQueue *q; unsigned int c; unsigned int i; for (c = 0; c < cinfo->send_buffer_n; c++) { i = (cinfo->send_buffer_start + c) % t->fwd_queue_max; - if (NULL != cinfo->send_buffer[i]) - queue_destroy (cinfo->send_buffer[i], GNUNET_YES); + q = cinfo->send_buffer[i]; + cinfo->send_buffer[i] = NULL; + if (NULL != q) + queue_destroy (q, GNUNET_YES); else GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%u %u\n", c, cinfo->send_buffer_n); @@ -3185,7 +3290,8 @@ tunnel_notify_client_peer_disconnected (void *cls, GNUNET_PEER_Id peer_id) path_info = GNUNET_malloc (sizeof (struct MeshPathInfo)); path_info->peer = peer; path_info->t = t; - GNUNET_SCHEDULER_add_now (&peer_info_connect_task, path_info); + peer->connect_task = GNUNET_SCHEDULER_add_now (&peer_info_connect_task, + path_info); } @@ -3488,10 +3594,14 @@ tunnel_add_skip (void *cls, * @return Neighbor's Flow Control info. */ static struct MeshTunnelChildInfo * -tunnel_get_neighbor_fc (const struct MeshTunnel *t, +tunnel_get_neighbor_fc (struct MeshTunnel *t, const struct GNUNET_PeerIdentity *peer) { struct MeshTunnelChildInfo *cinfo; + + if (NULL == t->children_fc) + return NULL; + cinfo = GNUNET_CONTAINER_multihashmap_get (t->children_fc, &peer->hashPubKey); if (NULL == cinfo) @@ -3501,10 +3611,12 @@ tunnel_get_neighbor_fc (const struct MeshTunnel *t, cinfo = GNUNET_malloc (sizeof (struct MeshTunnelChildInfo)); cinfo->id = GNUNET_PEER_intern (peer); cinfo->skip = t->fwd_pid; + cinfo->t = t; delta = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE; cinfo->fwd_ack = t->fwd_pid + delta; cinfo->bck_ack = delta; + cinfo->bck_pid = -1; cinfo->send_buffer = GNUNET_malloc (sizeof(struct MeshPeerQueue *) * t->fwd_queue_max); @@ -3701,9 +3813,23 @@ tunnel_get_fwd_ack (struct MeshTunnel *t) count = t->fwd_pid - t->skip; buffer_free = t->fwd_queue_max - t->fwd_queue_n; - ack = count + buffer_free; child_ack = tunnel_get_children_fwd_ack (t); client_ack = tunnel_get_clients_fwd_ack (t); + if (GNUNET_YES == t->nobuffer) + { + ack = count; + if (-1LL == child_ack) + child_ack = client_ack; + if (-1LL == child_ack) + { + GNUNET_break (0); + client_ack = child_ack = ack; + } + } + else + { + ack = count + buffer_free; // Overflow? OK! + } if (-1LL == child_ack) { // Node has no children, child_ack AND core buffer are irrelevant. @@ -3712,20 +3838,18 @@ tunnel_get_fwd_ack (struct MeshTunnel *t) } if (-1LL == client_ack) { - client_ack = ack; // Might overflow 32 bits, it's ok! + client_ack = ack; } if (GNUNET_YES == t->speed_min) { - ack = GMC_min_pid ((uint32_t) child_ack, ack); // Might overflow 32 bits, it's ok!; + ack = GMC_min_pid ((uint32_t) child_ack, ack); ack = GMC_min_pid ((uint32_t) client_ack, ack); } else { - ack = GMC_max_pid ((uint32_t) child_ack, ack); // Might overflow 32 bits, it's ok!; + ack = GMC_max_pid ((uint32_t) child_ack, ack); ack = GMC_max_pid ((uint32_t) client_ack, ack); } - if (GNUNET_YES == t->nobuffer && GMC_is_pid_bigger(ack, t->fwd_pid)) - ack = t->fwd_pid + 1; // Might overflow 32 bits, it's ok! GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "c %u, bf %u, ch %lld, cl %lld, ACK: %u\n", count, buffer_free, child_ack, client_ack, ack); @@ -3796,7 +3920,10 @@ tunnel_send_client_fwd_ack (struct MeshTunnel *t) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ack); if (t->last_fwd_ack == ack) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " same as last, not sending!\n"); return; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending!\n"); t->last_fwd_ack = ack; @@ -3895,11 +4022,18 @@ tunnel_send_child_bck_ack (void *cls, GNUNET_PEER_resolve (id, &peer); cinfo = tunnel_get_neighbor_fc (t, &peer); - if (cinfo->bck_ack != cinfo->pid && - GNUNET_NO == GMC_is_pid_bigger (cinfo->bck_ack, cinfo->pid)) + if (cinfo->bck_ack != cinfo->bck_pid && + GNUNET_NO == GMC_is_pid_bigger (cinfo->bck_ack, cinfo->bck_pid)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " Not sending ACK, not needed\n"); return; + } - cinfo->bck_ack++; + cinfo->bck_ack = t->bck_queue_max - t->bck_queue_n + cinfo->bck_pid; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " Sending BCK ACK %u\n", + cinfo->bck_ack); send_ack (t, &peer, cinfo->bck_ack); } @@ -3908,6 +4042,14 @@ tunnel_send_child_bck_ack (void *cls, * @brief Send BCK ACKs to clients to allow them more to_origin traffic * * Iterates over all clients and sends BCK ACKs to the ones that need it. + * + * FIXME fc: what happens if we have 2 clients but q_size is 1? + * - implement a size 1 buffer in each client_fc AND children_fc + * to hold at least 1 message per "child". + * problem: violates no buffer policy + * - ack 0 and make "children" poll for transmission slots + * problem: big overhead, extra latency even in low traffic + * settings * * @param t Tunnel on which to send the BCK ACKs. */ @@ -3919,7 +4061,7 @@ tunnel_send_clients_bck_ack (struct MeshTunnel *t) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Sending BCK ACK to clients\n"); - tunnel_delta = t->bck_ack - t->bck_pid; + tunnel_delta = t->bck_queue_max - t->bck_queue_n; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " tunnel delta: %u\n", tunnel_delta); /* Find client whom to allow to send to origin (with lowest buffer space) */ @@ -3946,6 +4088,12 @@ tunnel_send_clients_bck_ack (struct MeshTunnel *t) send_local_ack (t, t->clients[i], ack); clinfo->bck_ack = ack; } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " not sending ack to client %u (td %u, d %u)\n", + t->clients[i]->id, tunnel_delta, delta); + } } } @@ -3980,6 +4128,7 @@ tunnel_send_bck_ack (struct MeshTunnel *t, uint16_t type) break; case GNUNET_MESSAGE_TYPE_MESH_ACK: case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK: + case GNUNET_MESSAGE_TYPE_MESH_POLL: break; default: GNUNET_break (0); @@ -4210,6 +4359,7 @@ tunnel_destroy (struct MeshTunnel *t) r = GNUNET_SYSERR; } GNUNET_free (t->clients); + GNUNET_free (t->clients_fc); } if (NULL != t->peers) @@ -4223,9 +4373,9 @@ tunnel_destroy (struct MeshTunnel *t) &tunnel_destroy_child, t); GNUNET_CONTAINER_multihashmap_destroy (t->children_fc); + t->children_fc = NULL; tree_iterate_children (t->tree, &tunnel_cancel_queues, t); - tree_destroy (t->tree); if (NULL != t->regex_ctx) @@ -4239,7 +4389,6 @@ tunnel_destroy (struct MeshTunnel *t) n_tunnels--; GNUNET_STATISTICS_update (stats, "# tunnels", -1, GNUNET_NO); - GNUNET_assert (0 <= n_tunnels); GNUNET_free (t); return r; } @@ -4279,7 +4428,7 @@ tunnel_new (GNUNET_PEER_Id owner, t->bck_ack = INITIAL_WINDOW_SIZE - 1; t->last_fwd_ack = INITIAL_WINDOW_SIZE - 1; t->local_tid = local; - t->children_fc = GNUNET_CONTAINER_multihashmap_create (8); + t->children_fc = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO); n_tunnels++; GNUNET_STATISTICS_update (stats, "# tunnels", 1, GNUNET_NO); @@ -4386,6 +4535,7 @@ tunnel_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Tunnel %s [%X] timed out. Destroying.\n", GNUNET_i2s(&id), t->id.tid); + send_clients_tunnel_destroy (t); tunnel_destroy (t); } @@ -4585,12 +4735,19 @@ queue_destroy (struct MeshPeerQueue *queue, int clear_cls) { case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " cancelling TUNNEL_DESTROY\n"); + GNUNET_assert (GNUNET_YES == queue->tunnel->destroy); + /* FIXME: don't cancel, send and destroy tunnel in queue_send */ /* fall through */ case GNUNET_MESSAGE_TYPE_MESH_UNICAST: case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + case GNUNET_MESSAGE_TYPE_MESH_ACK: + case GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " prebuilt message\n"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " type prebuilt (payload, tunnel destroy)\n"); + " type %s\n", + GNUNET_MESH_DEBUG_M2S(queue->type)); dd = queue->cls; data_descriptor_decrement_rc (dd->mesh_data); break; @@ -4615,30 +4772,31 @@ queue_destroy (struct MeshPeerQueue *queue, int clear_cls) max = queue->tunnel->fwd_queue_max; GNUNET_PEER_resolve (queue->peer->id, &id); cinfo = tunnel_get_neighbor_fc (queue->tunnel, &id); - for (i = 0; i < cinfo->send_buffer_n; i++) + if (NULL != cinfo) { - unsigned int i2; - i2 = (cinfo->send_buffer_start + i) % max; - if (cinfo->send_buffer[i2] == queue) + for (i = 0; i < cinfo->send_buffer_n; i++) { - /* Found corresponding entry in the send_buffer. Move all others back. */ - unsigned int j; - unsigned int j2; - unsigned int j3; - - for (j = i, j2 = 0, j3 = 0; j < cinfo->send_buffer_n - 1; j++) + unsigned int i2; + i2 = (cinfo->send_buffer_start + i) % max; + if (cinfo->send_buffer[i2] == queue) { - j2 = (cinfo->send_buffer_start + j) % max; - j3 = (cinfo->send_buffer_start + j + 1) % max; - cinfo->send_buffer[j2] = cinfo->send_buffer[j3]; - } + /* Found corresponding entry in the send_buffer. Move all others back. */ + unsigned int j; + unsigned int j2; + unsigned int j3; - cinfo->send_buffer[j3] = NULL; + for (j = i, j2 = 0, j3 = 0; j < cinfo->send_buffer_n - 1; j++) + { + j2 = (cinfo->send_buffer_start + j) % max; + j3 = (cinfo->send_buffer_start + j + 1) % max; + cinfo->send_buffer[j2] = cinfo->send_buffer[j3]; + } - cinfo->send_buffer_n--; + cinfo->send_buffer[j3] = NULL; + cinfo->send_buffer_n--; + } } } - //queue-> GNUNET_free (queue); } @@ -4750,6 +4908,7 @@ queue_send (void *cls, size_t size, void *buf) size_t data_size; peer->core_transmit = NULL; + cinfo = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* Queue send\n"); queue = queue_get_next (peer); @@ -4765,6 +4924,9 @@ queue_send (void *cls, size_t size, void *buf) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not empty\n"); GNUNET_PEER_resolve (peer->id, &dst_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* towards %s\n", + GNUNET_i2s(&dst_id)); /* Check if buffer size is enough for the message */ if (queue->size > size) { @@ -4858,7 +5020,7 @@ queue_send (void *cls, size_t size, void *buf) break; default: GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "********* type unknown: %u\n", queue->type); data_size = 0; @@ -4897,8 +5059,9 @@ queue_send (void *cls, size_t size, void *buf) /* Free queue, but cls was freed by send_core_* */ queue_destroy (queue, GNUNET_NO); - if (GNUNET_YES == t->destroy && 0 == t->fwd_queue_n) + if (GNUNET_YES == t->destroy) { + // FIXME fc tunnel destroy all pending traffic? wait for it? GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* destroying tunnel!\n"); tunnel_destroy (t); } @@ -4924,9 +5087,15 @@ queue_send (void *cls, size_t size, void *buf) else { if (NULL != peer->queue_head) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "********* %s stalled\n", GNUNET_i2s(&my_full_id)); + if (NULL == cinfo) + cinfo = tunnel_get_neighbor_fc (t, &dst_id); + cinfo->fc_poll = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_SECONDS, + &tunnel_poll, cinfo); + } } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* return %d\n", data_size); return data_size; @@ -4973,13 +5142,18 @@ queue_add (void *cls, uint16_t type, size_t size, n = &t->bck_queue_n; max = &t->bck_queue_max; } - if (NULL != n) { + if (NULL != n) + { if (*n >= *max) { - if (NULL == t->owner) - GNUNET_break_op(0); // TODO: kill connection? - else - GNUNET_break(0); + struct MeshTransmissionDescriptor *td = cls; + struct GNUNET_MESH_ToOrigin *to; + + to = td->mesh_data->data; + GNUNET_break(0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "bck pid %u, bck ack %u, msg pid %u\n", + t->bck_pid, t->bck_ack, ntohl(to->pid)); GNUNET_STATISTICS_update(stats, "# messages dropped (buffer full)", 1, GNUNET_NO); return; // Drop message @@ -5111,8 +5285,11 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, opt = ntohl (msg->opt); t->speed_min = (0 != (opt & MESH_TUNNEL_OPT_SPEED_MIN)) ? GNUNET_YES : GNUNET_NO; - t->nobuffer = (0 != (opt & MESH_TUNNEL_OPT_NOBUFFER)) ? - GNUNET_YES : GNUNET_NO; + if (0 != (opt & MESH_TUNNEL_OPT_NOBUFFER)) + { + t->nobuffer = GNUNET_YES; + t->last_fwd_ack = t->fwd_pid + 1; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " speed_min: %d, nobuffer:%d\n", t->speed_min, t->nobuffer); @@ -5194,7 +5371,7 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, if (NULL == t->peers) { /* New tunnel! Notify clients on first payload message. */ - t->peers = GNUNET_CONTAINER_multihashmap_create (4); + t->peers = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); } GNUNET_break (GNUNET_SYSERR != GNUNET_CONTAINER_multihashmap_put (t->peers, @@ -5501,7 +5678,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, neighbor = tree_get_first_hop (t->tree, dest_id); cinfo = tunnel_get_neighbor_fc (t, neighbor); - cinfo->pid = pid; + cinfo->fwd_pid = pid; GNUNET_CONTAINER_multihashmap_iterate (t->children_fc, &tunnel_add_skip, &neighbor); @@ -5509,6 +5686,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_YES == GMC_is_pid_bigger (pid, cinfo->fwd_ack)) { GNUNET_STATISTICS_update (stats, "# unsolicited unicast", 1, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, " %u > %u\n", pid, cinfo->fwd_ack); GNUNET_break_op (0); return GNUNET_OK; } @@ -5624,8 +5802,9 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, struct GNUNET_PeerIdentity id; struct MeshPeerInfo *peer_info; struct MeshTunnel *t; + struct MeshTunnelChildInfo *cinfo; size_t size; - + uint32_t pid; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got a ToOrigin packet from %s\n", GNUNET_i2s (peer)); @@ -5640,15 +5819,40 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %s\n", GNUNET_MESH_DEBUG_M2S (ntohs (msg[1].header.type))); t = tunnel_get (&msg->oid, ntohl (msg->tid)); + pid = ntohl (msg->pid); if (NULL == t) { /* TODO notify that we dont know this tunnel (whom)? */ GNUNET_STATISTICS_update (stats, "# data on unknown tunnel", 1, GNUNET_NO); GNUNET_break_op (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received to_origin with PID %u on unknown tunnel\n", + pid); return GNUNET_OK; } + cinfo = tunnel_get_neighbor_fc(t, peer); + if (NULL == cinfo) + { + GNUNET_break (0); + return GNUNET_OK; + } + + if (cinfo->bck_pid == pid) + { + /* already seen this packet, drop */ + GNUNET_STATISTICS_update (stats, "# duplicate PID drops BCK", 1, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " Already seen pid %u, DROPPING!\n", pid); + tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); + return GNUNET_OK; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " pid %u not seen yet, forwarding\n", pid); + cinfo->bck_pid = pid; + if (NULL != t->owner) { char cbuf[size]; @@ -5660,6 +5864,8 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, memcpy (cbuf, message, size); copy = (struct GNUNET_MESH_ToOrigin *) cbuf; copy->tid = htonl (t->local_tid); + t->bck_pid++; + copy->pid = htonl (t->bck_pid); GNUNET_STATISTICS_update (stats, "# to origin received", 1, GNUNET_NO); GNUNET_SERVER_notification_context_unicast (nc, t->owner->handle, ©->header, GNUNET_NO); @@ -5720,6 +5926,7 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } ack = ntohl (msg->pid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack); /* Is this a forward or backward ACK? */ if (tree_get_predecessor(t->tree) != GNUNET_PEER_search(peer)) @@ -5744,6 +5951,62 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer, } +/** + * Core handler for mesh network traffic point-to-point ack polls. + * + * @param cls closure + * @param message message + * @param peer peer identity this notification is about + * @param atsi performance data + * @param atsi_count number of records in 'atsi' + * + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_mesh_poll (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + struct GNUNET_MESH_Poll *msg; + struct MeshTunnel *t; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got an POLL packet from %s!\n", + GNUNET_i2s (peer)); + + msg = (struct GNUNET_MESH_Poll *) message; + + t = tunnel_get (&msg->oid, ntohl (msg->tid)); + + if (NULL == t) + { + /* TODO notify that we dont know this tunnel (whom)? */ + GNUNET_STATISTICS_update (stats, "# poll on unknown tunnel", 1, GNUNET_NO); + GNUNET_break_op (0); + return GNUNET_OK; + } + + /* Is this a forward or backward ACK? */ + if (tree_get_predecessor(t->tree) != GNUNET_PEER_search(peer)) + { + struct MeshTunnelChildInfo *cinfo; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " from FWD\n"); + cinfo = tunnel_get_neighbor_fc (t, peer); + cinfo->bck_ack = cinfo->fwd_pid; // mark as ready to send + tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_POLL); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " from BCK\n"); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_POLL); + } + + return GNUNET_OK; +} + + /** * Core handler for path ACKs * @@ -5908,6 +6171,8 @@ static struct GNUNET_CORE_MessageHandler core_handlers[] = { {&handle_mesh_data_to_orig, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN, 0}, {&handle_mesh_ack, GNUNET_MESSAGE_TYPE_MESH_ACK, sizeof (struct GNUNET_MESH_ACK)}, + {&handle_mesh_poll, GNUNET_MESSAGE_TYPE_MESH_POLL, + sizeof (struct GNUNET_MESH_Poll)}, {&handle_mesh_path_ack, GNUNET_MESSAGE_TYPE_MESH_PATH_ACK, sizeof (struct GNUNET_MESH_PathACK)}, {NULL, 0, 0} @@ -6279,6 +6544,8 @@ handle_local_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " (SERVER DOWN)\n"); return; } + + return; c = clients; while (NULL != c) { @@ -6320,7 +6587,7 @@ handle_local_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) GNUNET_CONTAINER_multihashmap_destroy (c->types); for (i = 0; i < c->n_regex; i++) { - GNUNET_free (c->regexes[i]); + GNUNET_free (c->regexes[i].regex); } GNUNET_free_non_null (c->regexes); if (GNUNET_SCHEDULER_NO_TASK != c->regex_announce_task) @@ -6383,7 +6650,7 @@ handle_local_new_client (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_MESH_ApplicationType at; struct GNUNET_HashCode hc; - c->apps = GNUNET_CONTAINER_multihashmap_create (napps); + c->apps = GNUNET_CONTAINER_multihashmap_create (napps, GNUNET_NO); for (i = 0; i < napps; i++) { at = ntohl (a[i]); @@ -6407,7 +6674,7 @@ handle_local_new_client (void *cls, struct GNUNET_SERVER_Client *client, struct GNUNET_HashCode hc; t = (uint16_t *) & a[napps]; - c->types = GNUNET_CONTAINER_multihashmap_create (ntypes); + c->types = GNUNET_CONTAINER_multihashmap_create (ntypes, GNUNET_NO); for (i = 0; i < ntypes; i++) { u16 = ntohs (t[i]); @@ -6426,9 +6693,9 @@ handle_local_new_client (void *cls, struct GNUNET_SERVER_Client *client, " client has %u+%u subscriptions\n", napps, ntypes); GNUNET_CONTAINER_DLL_insert (clients, clients_tail, c); - c->own_tunnels = GNUNET_CONTAINER_multihashmap_create (32); - c->incoming_tunnels = GNUNET_CONTAINER_multihashmap_create (32); - c->ignore_tunnels = GNUNET_CONTAINER_multihashmap_create (32); + c->own_tunnels = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + c->incoming_tunnels = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + c->ignore_tunnels = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); GNUNET_SERVER_notification_context_add (nc, client); GNUNET_STATISTICS_update (stats, "# clients", 1, GNUNET_NO); @@ -6448,6 +6715,8 @@ static void handle_local_announce_regex (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { + struct GNUNET_MESH_RegexAnnounce *msg; + struct MeshRegexDescriptor rd; struct MeshClient *c; char *regex; size_t len; @@ -6463,18 +6732,24 @@ handle_local_announce_regex (void *cls, struct GNUNET_SERVER_Client *client, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); - len = ntohs (message->size) - sizeof(struct GNUNET_MessageHeader); + msg = (struct GNUNET_MESH_RegexAnnounce *) message; + len = ntohs (message->size) - sizeof(struct GNUNET_MESH_RegexAnnounce); regex = GNUNET_malloc (len + 1); - memcpy (regex, &message[1], len); + memcpy (regex, &msg[1], len); regex[len] = '\0'; - GNUNET_array_append (c->regexes, c->n_regex, regex); + rd.regex = regex; + rd.compression = ntohs (msg->compression_characters); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " length %u\n", len); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " regex %s\n", regex); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " cm %u\n", ntohs(rd.compression)); + GNUNET_array_append (c->regexes, c->n_regex, rd); if (GNUNET_SCHEDULER_NO_TASK == c->regex_announce_task) { c->regex_announce_task = GNUNET_SCHEDULER_add_now(&announce_regex, c); } else { - regex_put(regex); + regex_put(&rd); } GNUNET_SERVER_receive_done (client, GNUNET_OK); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "announce regex processed\n"); @@ -6546,7 +6821,7 @@ handle_local_tunnel_create (void *cls, struct GNUNET_SERVER_Client *client, next_tid = next_tid & ~GNUNET_MESH_LOCAL_TUNNEL_ID_CLI; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "CREATED TUNNEL %s [%x] (%x)\n", GNUNET_i2s (&my_full_id), t->id.tid, t->local_tid); - t->peers = GNUNET_CONTAINER_multihashmap_create (32); + t->peers = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new tunnel created\n"); GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -7219,8 +7494,8 @@ handle_local_connect_by_string (void *cls, struct GNUNET_SERVER_Client *client, info->description = GNUNET_malloc (len + 1); memcpy (info->description, string, len); info->description[len] = '\0'; - info->dht_get_handles = GNUNET_CONTAINER_multihashmap_create(32); - info->dht_get_results = GNUNET_CONTAINER_multihashmap_create(32); + info->dht_get_handles = GNUNET_CONTAINER_multihashmap_create(32, GNUNET_NO); + info->dht_get_results = GNUNET_CONTAINER_multihashmap_create(32, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " string: %s\n", info->description); ctx = GNUNET_malloc (sizeof (struct MeshRegexSearchContext)); @@ -7436,11 +7711,11 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + clinfo->bck_pid++; /* Ok, everything is correct, send the message * (pretend we got it from a mesh peer) */ - clinfo->bck_pid++; { char buf[ntohs (message->size)] GNUNET_ALIGN; struct GNUNET_MESH_ToOrigin *copy; @@ -7451,16 +7726,8 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_PEER_resolve (t->id.oid, ©->oid); copy->tid = htonl (t->id.tid); copy->ttl = htonl (default_ttl); - if (ntohl (copy->pid) != (t->bck_pid + 1)) - { - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "To Origin PID, expected %u, got %u\n", - t->bck_pid + 1, - ntohl (copy->pid)); - return; - } - t->bck_pid++; + copy->pid = htonl (++(t->bck_pid)); + copy->sender = my_full_id; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " calling generic handler...\n"); @@ -8024,6 +8291,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, GNUNET_SCHEDULER_shutdown (); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "APP_ANNOUNCE_TIME %llu ms\n", app_announce_time.rel_value); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (c, "MESH", "ID_ANNOUNCE_TIME", @@ -8110,11 +8378,11 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, dht_replication_level = 10; } - tunnels = GNUNET_CONTAINER_multihashmap_create (32); - incoming_tunnels = GNUNET_CONTAINER_multihashmap_create (32); - peers = GNUNET_CONTAINER_multihashmap_create (32); - applications = GNUNET_CONTAINER_multihashmap_create (32); - types = GNUNET_CONTAINER_multihashmap_create (32); + tunnels = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + incoming_tunnels = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + peers = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + applications = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); + types = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); dht_handle = GNUNET_DHT_connect (c, 64); if (NULL == dht_handle)