X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmesh%2Fgnunet-service-mesh.c;h=dba4116cf4df036e27ea3c88b6a720c15fba64f5;hb=f4d040c0f0dd2fef3d73b1f4532c76219f760f75;hp=006cc3b38a8109016590d078f1cf9b9d5cd8cf6b;hpb=1d116671d2464dcf0bdbab9671be02bde339068a;p=oweals%2Fgnunet.git diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index 006cc3b38..dba4116cf 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -170,6 +170,22 @@ struct MeshPeerQueue }; +/** + * Struct to store regex information announced by clients. + */ +struct MeshRegexDescriptor +{ + /** + * Regular expression itself. + */ + char *regex; + + /** + * How many characters per edge can we squeeze? + */ + uint16_t compression; +}; + /** * Struct containing all info possibly needed to build a package when called * back by core. @@ -205,6 +221,11 @@ struct MeshPeerInfo */ struct GNUNET_TIME_Absolute last_contact; + /** + * Task handler for delayed connect task; + */ + GNUNET_SCHEDULER_TaskIdentifier connect_task; + /** * Number of attempts to reconnect so far */ @@ -504,7 +525,12 @@ struct MeshTunnelChildInfo /** * Last sent PID. */ - uint32_t pid; + uint32_t fwd_pid; + + /** + * Last received PID. + */ + uint32_t bck_pid; /** * Maximum PID allowed (FWD ACK received). @@ -681,7 +707,7 @@ struct MeshClient /** * Regular expressions describing the services offered by this client. */ - char **regexes; // FIXME add timeout? API to remove a regex? + struct MeshRegexDescriptor *regexes; // FIXME regex add timeout? API to remove a regex? /** * Number of regular expressions in regexes. @@ -1277,6 +1303,8 @@ regex_result_iterator (void *cls, } regex_next_edge(block, SIZE_MAX, ctx); + GNUNET_STATISTICS_update (stats, "# regex mesh blocks iterated", 1, GNUNET_NO); + return GNUNET_YES; } @@ -1302,6 +1330,8 @@ regex_edge_iterator (void *cls, char *current; size_t current_len; + GNUNET_STATISTICS_update (stats, "# regex edges iterated", 1, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Start of regex edge iterator\n"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* descr : %s\n", info->description); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* posit : %u\n", ctx->position); @@ -1387,6 +1417,8 @@ regex_next_edge (const struct MeshRegexBlock *block, return; // We are already looking for it } + GNUNET_STATISTICS_update (stats, "# regex nodes traversed", 1, GNUNET_NO); + /* Start search in DHT */ get_h = GNUNET_DHT_get_start (dht_handle, /* handle */ @@ -1570,15 +1602,17 @@ regex_iterator (void *cls, * @param regex The regular expresion. */ static void -regex_put (const char *regex) +regex_put (const struct MeshRegexDescriptor *regex) { struct GNUNET_REGEX_Automaton *dfa; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "regex_put (%s) start\n", regex); - dfa = GNUNET_REGEX_construct_dfa (regex, strlen(regex)); + DEBUG_DHT (" regex_put (%s) start\n", regex->regex); + dfa = GNUNET_REGEX_construct_dfa (regex->regex, + strlen(regex->regex), + regex->compression); GNUNET_REGEX_iterate_all_edges (dfa, ®ex_iterator, NULL); GNUNET_REGEX_automaton_destroy (dfa); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "regex_put (%s) end\n", regex); + DEBUG_DHT (" regex_put (%s) end\n", regex); } @@ -1759,9 +1793,9 @@ announce_regex (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct MeshClient *c = cls; unsigned int i; + c->regex_announce_task = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) { - c->regex_announce_task = GNUNET_SCHEDULER_NO_TASK; return; } @@ -1769,10 +1803,11 @@ announce_regex (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) for (i = 0; i < c->n_regex; i++) { - regex_put (c->regexes[i]); + regex_put (&c->regexes[i]); } - c->regex_announce_task = - GNUNET_SCHEDULER_add_delayed (app_announce_time, &announce_regex, cls); + c->regex_announce_task = GNUNET_SCHEDULER_add_delayed (app_announce_time, + &announce_regex, + cls); DEBUG_DHT ("Finished PUT for regex\n"); return; @@ -2348,12 +2383,19 @@ send_prebuilt_message (const struct GNUNET_MessageHeader *message, info->mesh_data->data = GNUNET_malloc (size); memcpy (info->mesh_data->data, message, size); type = ntohs(message->type); - if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) + switch (type) { struct GNUNET_MESH_Unicast *m; + struct GNUNET_MESH_ToOrigin *to; - m = (struct GNUNET_MESH_Unicast *) info->mesh_data->data; - m->ttl = htonl (ntohl (m->ttl) - 1); + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + m = (struct GNUNET_MESH_Unicast *) info->mesh_data->data; + m->ttl = htonl (ntohl (m->ttl) - 1); + break; + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + to = (struct GNUNET_MESH_ToOrigin *) info->mesh_data->data; + t->bck_pid++; + to->pid = htonl(t->bck_pid); } info->mesh_data->data_len = size; info->mesh_data->reference_counter = 1; @@ -2616,6 +2658,8 @@ peer_info_connect_task (void *cls, { struct MeshPathInfo *path_info = cls; + path_info->peer->connect_task = GNUNET_SCHEDULER_NO_TASK; + if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) { GNUNET_free (cls); @@ -2663,6 +2707,10 @@ peer_info_destroy (struct MeshPeerInfo *pi) path_destroy (p); p = nextp; } + if (GNUNET_SCHEDULER_NO_TASK != pi->connect_task) + { + GNUNET_free (GNUNET_SCHEDULER_cancel (pi->connect_task)); + } GNUNET_free (pi); return GNUNET_OK; } @@ -3242,7 +3290,8 @@ tunnel_notify_client_peer_disconnected (void *cls, GNUNET_PEER_Id peer_id) path_info = GNUNET_malloc (sizeof (struct MeshPathInfo)); path_info->peer = peer; path_info->t = t; - GNUNET_SCHEDULER_add_now (&peer_info_connect_task, path_info); + peer->connect_task = GNUNET_SCHEDULER_add_now (&peer_info_connect_task, + path_info); } @@ -3567,6 +3616,7 @@ tunnel_get_neighbor_fc (struct MeshTunnel *t, delta = t->nobuffer ? 1 : INITIAL_WINDOW_SIZE; cinfo->fwd_ack = t->fwd_pid + delta; cinfo->bck_ack = delta; + cinfo->bck_pid = -1; cinfo->send_buffer = GNUNET_malloc (sizeof(struct MeshPeerQueue *) * t->fwd_queue_max); @@ -3972,11 +4022,18 @@ tunnel_send_child_bck_ack (void *cls, GNUNET_PEER_resolve (id, &peer); cinfo = tunnel_get_neighbor_fc (t, &peer); - if (cinfo->bck_ack != cinfo->pid && - GNUNET_NO == GMC_is_pid_bigger (cinfo->bck_ack, cinfo->pid)) + if (cinfo->bck_ack != cinfo->bck_pid && + GNUNET_NO == GMC_is_pid_bigger (cinfo->bck_ack, cinfo->bck_pid)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " Not sending ACK, not needed\n"); return; + } - cinfo->bck_ack++; // FIXME window size? + cinfo->bck_ack = t->bck_queue_max - t->bck_queue_n + cinfo->bck_pid; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " Sending BCK ACK %u\n", + cinfo->bck_ack); send_ack (t, &peer, cinfo->bck_ack); } @@ -3985,6 +4042,14 @@ tunnel_send_child_bck_ack (void *cls, * @brief Send BCK ACKs to clients to allow them more to_origin traffic * * Iterates over all clients and sends BCK ACKs to the ones that need it. + * + * FIXME fc: what happens if we have 2 clients but q_size is 1? + * - implement a size 1 buffer in each client_fc AND children_fc + * to hold at least 1 message per "child". + * problem: violates no buffer policy + * - ack 0 and make "children" poll for transmission slots + * problem: big overhead, extra latency even in low traffic + * settings * * @param t Tunnel on which to send the BCK ACKs. */ @@ -3996,7 +4061,7 @@ tunnel_send_clients_bck_ack (struct MeshTunnel *t) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Sending BCK ACK to clients\n"); - tunnel_delta = t->bck_ack - t->bck_pid; + tunnel_delta = t->bck_queue_max - t->bck_queue_n; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " tunnel delta: %u\n", tunnel_delta); /* Find client whom to allow to send to origin (with lowest buffer space) */ @@ -4670,10 +4735,13 @@ queue_destroy (struct MeshPeerQueue *queue, int clear_cls) { case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " cancelling TUNNEL_DESTROY\n"); + GNUNET_assert (GNUNET_YES == queue->tunnel->destroy); + /* FIXME: don't cancel, send and destroy tunnel in queue_send */ /* fall through */ case GNUNET_MESSAGE_TYPE_MESH_UNICAST: case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + case GNUNET_MESSAGE_TYPE_MESH_ACK: case GNUNET_MESSAGE_TYPE_MESH_PATH_KEEPALIVE: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " prebuilt message\n"); @@ -4856,6 +4924,9 @@ queue_send (void *cls, size_t size, void *buf) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not empty\n"); GNUNET_PEER_resolve (peer->id, &dst_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "********* towards %s\n", + GNUNET_i2s(&dst_id)); /* Check if buffer size is enough for the message */ if (queue->size > size) { @@ -5071,13 +5142,18 @@ queue_add (void *cls, uint16_t type, size_t size, n = &t->bck_queue_n; max = &t->bck_queue_max; } - if (NULL != n) { + if (NULL != n) + { if (*n >= *max) { - if (NULL == t->owner) - GNUNET_break_op(0); // TODO: kill connection? - else - GNUNET_break(0); + struct MeshTransmissionDescriptor *td = cls; + struct GNUNET_MESH_ToOrigin *to; + + to = td->mesh_data->data; + GNUNET_break(0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "bck pid %u, bck ack %u, msg pid %u\n", + t->bck_pid, t->bck_ack, ntohl(to->pid)); GNUNET_STATISTICS_update(stats, "# messages dropped (buffer full)", 1, GNUNET_NO); return; // Drop message @@ -5602,7 +5678,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, neighbor = tree_get_first_hop (t->tree, dest_id); cinfo = tunnel_get_neighbor_fc (t, neighbor); - cinfo->pid = pid; + cinfo->fwd_pid = pid; GNUNET_CONTAINER_multihashmap_iterate (t->children_fc, &tunnel_add_skip, &neighbor); @@ -5726,6 +5802,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, struct GNUNET_PeerIdentity id; struct MeshPeerInfo *peer_info; struct MeshTunnel *t; + struct MeshTunnelChildInfo *cinfo; size_t size; uint32_t pid; @@ -5742,6 +5819,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %s\n", GNUNET_MESH_DEBUG_M2S (ntohs (msg[1].header.type))); t = tunnel_get (&msg->oid, ntohl (msg->tid)); + pid = ntohl (msg->pid); if (NULL == t) { @@ -5749,13 +5827,19 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_STATISTICS_update (stats, "# data on unknown tunnel", 1, GNUNET_NO); GNUNET_break_op (0); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received PID %u, ACK %u\n", - pid, t->bck_ack); + "Received to_origin with PID %u on unknown tunnel\n", + pid); return GNUNET_OK; } - pid = ntohl (msg->pid); - if (t->bck_pid == pid) + cinfo = tunnel_get_neighbor_fc(t, peer); + if (NULL == cinfo) + { + GNUNET_break (0); + return GNUNET_OK; + } + + if (cinfo->bck_pid == pid) { /* already seen this packet, drop */ GNUNET_STATISTICS_update (stats, "# duplicate PID drops BCK", 1, GNUNET_NO); @@ -5764,11 +5848,10 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); return GNUNET_OK; } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " pid %u not seen yet, forwarding\n", pid); - } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " pid %u not seen yet, forwarding\n", pid); + cinfo->bck_pid = pid; if (NULL != t->owner) { @@ -5781,6 +5864,8 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, memcpy (cbuf, message, size); copy = (struct GNUNET_MESH_ToOrigin *) cbuf; copy->tid = htonl (t->local_tid); + t->bck_pid++; + copy->pid = htonl (t->bck_pid); GNUNET_STATISTICS_update (stats, "# to origin received", 1, GNUNET_NO); GNUNET_SERVER_notification_context_unicast (nc, t->owner->handle, ©->header, GNUNET_NO); @@ -5841,6 +5926,7 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } ack = ntohl (msg->pid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack); /* Is this a forward or backward ACK? */ if (tree_get_predecessor(t->tree) != GNUNET_PEER_search(peer)) @@ -5908,7 +5994,7 @@ handle_mesh_poll (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " from FWD\n"); cinfo = tunnel_get_neighbor_fc (t, peer); - cinfo->bck_ack = cinfo->pid; // mark as ready to send + cinfo->bck_ack = cinfo->fwd_pid; // mark as ready to send tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_POLL); } else @@ -6458,6 +6544,8 @@ handle_local_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " (SERVER DOWN)\n"); return; } + + return; c = clients; while (NULL != c) { @@ -6499,7 +6587,7 @@ handle_local_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) GNUNET_CONTAINER_multihashmap_destroy (c->types); for (i = 0; i < c->n_regex; i++) { - GNUNET_free (c->regexes[i]); + GNUNET_free (c->regexes[i].regex); } GNUNET_free_non_null (c->regexes); if (GNUNET_SCHEDULER_NO_TASK != c->regex_announce_task) @@ -6627,6 +6715,8 @@ static void handle_local_announce_regex (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { + struct GNUNET_MESH_RegexAnnounce *msg; + struct MeshRegexDescriptor rd; struct MeshClient *c; char *regex; size_t len; @@ -6642,18 +6732,24 @@ handle_local_announce_regex (void *cls, struct GNUNET_SERVER_Client *client, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id); - len = ntohs (message->size) - sizeof(struct GNUNET_MessageHeader); + msg = (struct GNUNET_MESH_RegexAnnounce *) message; + len = ntohs (message->size) - sizeof(struct GNUNET_MESH_RegexAnnounce); regex = GNUNET_malloc (len + 1); - memcpy (regex, &message[1], len); + memcpy (regex, &msg[1], len); regex[len] = '\0'; - GNUNET_array_append (c->regexes, c->n_regex, regex); + rd.regex = regex; + rd.compression = ntohs (msg->compression_characters); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " length %u\n", len); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " regex %s\n", regex); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " cm %u\n", ntohs(rd.compression)); + GNUNET_array_append (c->regexes, c->n_regex, rd); if (GNUNET_SCHEDULER_NO_TASK == c->regex_announce_task) { c->regex_announce_task = GNUNET_SCHEDULER_add_now(&announce_regex, c); } else { - regex_put(regex); + regex_put(&rd); } GNUNET_SERVER_receive_done (client, GNUNET_OK); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "announce regex processed\n"); @@ -7615,11 +7711,11 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + clinfo->bck_pid++; /* Ok, everything is correct, send the message * (pretend we got it from a mesh peer) */ - clinfo->bck_pid++; { char buf[ntohs (message->size)] GNUNET_ALIGN; struct GNUNET_MESH_ToOrigin *copy; @@ -7630,16 +7726,8 @@ handle_local_to_origin (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_PEER_resolve (t->id.oid, ©->oid); copy->tid = htonl (t->id.tid); copy->ttl = htonl (default_ttl); - if (ntohl (copy->pid) != (t->bck_pid + 1)) - { - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "To Origin PID, expected %u, got %u\n", - t->bck_pid + 1, - ntohl (copy->pid)); - return; - } - t->bck_pid++; + copy->pid = htonl (++(t->bck_pid)); + copy->sender = my_full_id; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " calling generic handler...\n"); @@ -8203,6 +8291,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, GNUNET_SCHEDULER_shutdown (); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "APP_ANNOUNCE_TIME %llu ms\n", app_announce_time.rel_value); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (c, "MESH", "ID_ANNOUNCE_TIME",