From e7d3e2a74f56cae5e5acca34a48c2a8509b3a9d2 Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Wed, 8 Aug 2012 18:49:10 +0000 Subject: [PATCH] - towards local packet numbering for flow control --- src/mesh/gnunet-service-mesh.c | 288 ++++++++++++++++++++------------- src/mesh/mesh_api.c | 4 +- 2 files changed, 179 insertions(+), 113 deletions(-) diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index 9f4fdbcf0..0905288f1 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -60,6 +60,9 @@ #define MESH_DEBUG_DHT GNUNET_YES #define MESH_DEBUG_CONNECTION GNUNET_NO +#define INITIAL_WINDOW_SIZE 2 +#define ACK_THRESHOLD INITIAL_WINDOW_SIZE / 2 + #if MESH_DEBUG_CONNECTION #define DEBUG_CONN(...) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__) #else @@ -323,11 +326,6 @@ struct MeshTunnel */ uint32_t last_fwd_ack; - /** - * Last ACK sent towards the next hop (for traffic towards root). - */ - uint32_t last_bck_ack; - /** * BCK ACK value received from the hop towards the owner of the tunnel, * (previous node / owner): up to what message PID can we sent back to him. @@ -528,6 +526,11 @@ struct MeshTunnelChildIteratorContext */ struct MeshTunnel *t; + /** + * Is this context initialized? Is the value in max_child_ack valid? + */ + int init; + /** * Maximum child ACK so far. */ @@ -3198,6 +3201,25 @@ tunnel_add_path (struct MeshTunnel *t, struct MeshPeerPath *p, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tunnel_add_path END\n"); } +/** + * Add a client to a tunnel, initializing all needed data structures. + * + * @param t Tunnel to which add the client. + * @param c Client which to add to the tunnel. + */ +static void +tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c) +{ + struct MeshTunnelClientInfo clinfo; + + GNUNET_array_append (t->clients, t->nclients, c); + t->nclients--; + clinfo.fwd_ack = t->fwd_pid + 1; + clinfo.bck_ack = t->bck_ack + 1; // FIXME fc review + clinfo.pid = t->fwd_pid; + GNUNET_array_append (t->clients_fc, t->nclients, clinfo); +} + /** * Notifies a tunnel that a connection has broken that affects at least @@ -3419,8 +3441,8 @@ tunnel_get_neighbor_fc (const struct MeshTunnel *t, * @param id Id of the child node. */ static void -tunnel_get_child_ack (void *cls, - GNUNET_PEER_Id id) +tunnel_get_child_fwd_ack (void *cls, + GNUNET_PEER_Id id) { struct GNUNET_PeerIdentity peer_id; struct MeshTunnelChildInfo *cinfo; @@ -3432,8 +3454,11 @@ tunnel_get_child_ack (void *cls, cinfo = tunnel_get_neighbor_fc (t, &peer_id); ack = cinfo->fwd_ack; - if (0 == ctx->max_child_ack) + if (GNUNET_NO == ctx->init) + { ctx->max_child_ack = ack; + ctx->init = GNUNET_YES; + } if (GNUNET_YES == t->speed_min) { @@ -3457,43 +3482,26 @@ tunnel_get_child_ack (void *cls, * @return Maximum PID allowed (uint32 MAX), -1 if node has no children. */ static int64_t -tunnel_get_children_ack (struct MeshTunnel *t) +tunnel_get_children_fwd_ack (struct MeshTunnel *t) { struct MeshTunnelChildIteratorContext ctx; ctx.t = t; ctx.max_child_ack = 0; ctx.nchildren = 0; - tree_iterate_children (t->tree, tunnel_get_child_ack, &ctx); + tree_iterate_children (t->tree, tunnel_get_child_fwd_ack, &ctx); if (0 == ctx.nchildren) return -1LL; - return (int64_t) ctx.max_child_ack; -} - - -/** - * Add a client to a tunnel, initializing all needed data structures. - * - * @param t Tunnel to which add the client. - * @param c Client which to add to the tunnel. - */ -static void -tunnel_add_client (struct MeshTunnel *t, struct MeshClient *c) -{ - struct MeshTunnelClientInfo clinfo; + if (GNUNET_YES == t->nobuffer && is_pid_bigger(ctx.max_child_ack, t->fwd_pid)) + ctx.max_child_ack = t->fwd_pid + 1; // Might overflow, it's ok. - GNUNET_array_append (t->clients, t->nclients, c); - t->nclients--; - clinfo.fwd_ack = t->fwd_pid + 1; - clinfo.bck_ack = t->bck_ack + 1; // FIXME fc review - clinfo.pid = t->fwd_pid; - GNUNET_array_append (t->clients_fc, t->nclients, clinfo); + return (int64_t) ctx.max_child_ack; } /** - * Set the ACK value of a client in a particular tunnel. + * Set the FWD ACK value of a client in a particular tunnel. * * @param t Tunnel affected. * @param c Client whose ACK to set. @@ -3525,8 +3533,8 @@ tunnel_set_client_fwd_ack (struct MeshTunnel *t, * * @return ACK value. */ -uint32_t // FIXME static when used!! -tunnel_get_client_ack (struct MeshTunnel *t, +static uint32_t +tunnel_get_client_fwd_ack (struct MeshTunnel *t, struct MeshClient *c) { unsigned int i; @@ -3552,7 +3560,7 @@ tunnel_get_client_ack (struct MeshTunnel *t, * If no clients are suscribed, -1. */ static int64_t -tunnel_get_clients_ack (struct MeshTunnel *t) +tunnel_get_clients_fwd_ack (struct MeshTunnel *t) { unsigned int i; int64_t ack; @@ -3599,8 +3607,8 @@ tunnel_get_fwd_ack (struct MeshTunnel *t) count = t->fwd_pid - t->skip; buffer_free = t->fwd_queue_max - t->fwd_queue_n; ack = count + buffer_free; // Might overflow 32 bits, it's ok! - child_ack = tunnel_get_children_ack (t); - client_ack = tunnel_get_clients_ack (t); + child_ack = tunnel_get_children_fwd_ack (t); + client_ack = tunnel_get_clients_fwd_ack (t); if (-1 == child_ack) { // Node has no children, child_ack AND core buffer are irrelevant. @@ -3655,6 +3663,38 @@ tunnel_get_bck_ack (struct MeshTunnel *t) return ack; } +static void +send_local_ack (struct MeshClient *c, struct MeshTunnel *t, uint32_t ack) +{ + struct GNUNET_MESH_LocalAck msg; + + msg.header.size = htons (sizeof (msg)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK); + msg.tunnel_id = htonl (t->local_tid_dest); + msg.max_pid = htonl (ack); + GNUNET_SERVER_notification_context_unicast(nc, + c->handle, + &msg.header, + GNUNET_NO); +} + +/** + * Build an ACK message and send it to the given peer. + */ +static void +send_ack (struct MeshTunnel *t, struct GNUNET_PeerIdentity *peer, uint32_t ack) +{ + struct GNUNET_MESH_ACK msg; + + GNUNET_PEER_resolve (t->id.oid, &msg.oid); + msg.header.size = htons (sizeof (msg)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK); + msg.pid = htonl (ack); + msg.tid = htonl (t->id.tid); + + send_message (&msg.header, peer, t); +} + /** * Send an ACK informing the predecessor about the available buffer space. @@ -3670,7 +3710,6 @@ tunnel_get_bck_ack (struct MeshTunnel *t) static void tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) { - struct GNUNET_MESH_ACK msg; struct GNUNET_PeerIdentity id; uint32_t ack; @@ -3715,21 +3754,76 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) } t->last_fwd_ack = ack; - msg.pid = htonl (ack); - GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id); + send_ack (t, &id, ack); +} - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK); - msg.tid = htonl (t->id.tid); - GNUNET_PEER_resolve(t->id.oid, &msg.oid); - send_message (&msg.header, &id, t); + +/** + * Iterator to send a child node a BCK ACK to allow him to send more + * to_origin data. + * + * @param cls Closure (tunnel). + * @param id Id of the child node. + */ +static void +tunnel_send_child_bck_ack (void *cls, + GNUNET_PEER_Id id) +{ + struct MeshTunnel *t = cls; + struct MeshTunnelChildInfo *cinfo; + struct GNUNET_PeerIdentity peer; + + GNUNET_PEER_resolve (id, &peer); + cinfo = tunnel_get_neighbor_fc (t, &peer); + + if (cinfo->bck_ack != cinfo->pid && + GNUNET_NO == is_pid_bigger (cinfo->bck_ack, cinfo->pid)) + return; + + cinfo->bck_ack++; + send_ack (t, &peer, cinfo->bck_ack); } /** - * Send an ACK informing the children nodes about the available buffer space. - * In case there is no child node, inform the destination clients. + * @brief Send BCK ACKs to clients to allow them more to_origin traffic + * + * Iterates over all clients and sends BCK ACKs to the ones that need + * + * @param t Tunnel on which to send the BCK ACKs. + */ +static void +tunnel_send_clients_bck_ack (struct MeshTunnel *t) +{ + unsigned int i; + + /* Find client whom to allow to send to origin (with lowest buffer space) */ + for (i = 0; i < t->nclients; i++) + { + struct MeshTunnelClientInfo *clinfo; + unsigned int delta; + + clinfo = &t->clients_fc[i]; + delta = clinfo->bck_ack - clinfo->pid; + + if ((GNUNET_NO == t->nobuffer && ACK_THRESHOLD > delta) || + (GNUNET_YES == t->nobuffer && 0 == delta)) + { + uint32_t ack; + + ack = clinfo->pid; + ack += t->nobuffer ? 1 : INITIAL_WINDOW_SIZE; + send_local_ack(t->clients[i], t, ack); + clinfo->bck_ack = ack; + } + } +} + + +/** + * Send an ACK informing the children nodes and destination clients about + * the available buffer space. * If buffering is off, send only on behalf of root (can be self). * If buffering is on, send when sent to predecessor and buffer space is free. * Note that although the name is bck_ack, the BCK mean backwards *traffic*, @@ -3741,13 +3835,6 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) static void tunnel_send_bck_ack (struct MeshTunnel *t, uint16_t type) { - struct GNUNET_MESH_ACK msg; - struct GNUNET_PeerIdentity id; - uint32_t ack; - unsigned int i; - unsigned int min_d; - unsigned int min_i; - if (NULL != t->owner) { send_client_tunnel_ack (t->owner, t); @@ -3777,51 +3864,8 @@ tunnel_send_bck_ack (struct MeshTunnel *t, uint16_t type) GNUNET_break (0); } - /* Ok, ACK might be necessary, what PID to ACK? */ - ack = tunnel_get_bck_ack (t); - - /* If speed_min and not all children have ack'd, dont send yet */ - if (ack == t->last_bck_ack) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not sending BCK ACK, not ready\n"); - return; - } - - /* Unlock local clients. */ - if (0 < t->nclients) - { - struct GNUNET_MESH_LocalAck msg; - - /* Find client who to allow to send to origin (with lowest buffer space) */ - /* FIXME fc Round robin? Priority? FIFO? */ - for (i = 0; i < t->nclients; i++) - { - unsigned int d; - - d = t->clients_fc[i].bck_ack - t->clients_fc[i].pid; - if (0 == i || d < min_d) - { - min_d = d; - min_i = i; - } - } - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK); - msg.tunnel_id = htonl (t->local_tid_dest); - msg.max_pid = t->bck_pid + 1; // FIXME fc - GNUNET_SERVER_notification_context_unicast(nc, - t->clients[min_i]->handle, - &msg.header, - GNUNET_NO); - } - - t->last_bck_ack = ack; - msg.pid = htonl (ack); - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK); - msg.tid = htonl (t->id.tid); - GNUNET_PEER_resolve(t->id.oid, &msg.oid); - send_message (&msg.header, &id, t); + tunnel_send_clients_bck_ack (t); + tree_iterate_children (t->tree, &tunnel_send_child_bck_ack, NULL); } @@ -3998,7 +4042,7 @@ tunnel_new (GNUNET_PEER_Id owner, { struct MeshTunnel *t; struct GNUNET_HashCode hash; - + if (n_tunnels >= max_tunnels && NULL == client) return NULL; @@ -4009,9 +4053,8 @@ tunnel_new (GNUNET_PEER_Id owner, t->bck_queue_max = t->fwd_queue_max; t->tree = tree_new (owner); t->owner = client; - t->bck_ack = 1; - t->last_bck_ack = 1; - t->last_fwd_ack = 1; + t->bck_ack = INITIAL_WINDOW_SIZE; + t->last_fwd_ack = INITIAL_WINDOW_SIZE; t->local_tid = local; t->children_fc = GNUNET_CONTAINER_multihashmap_create (8); n_tunnels++; @@ -4954,7 +4997,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, t->fwd_pid = pid; if (is_pid_bigger (pid, t->last_fwd_ack)) { - GNUNET_STATISTICS_update (stats, "# not allowed unicast", 1, GNUNET_NO); + GNUNET_STATISTICS_update (stats, "# unsolicited unicast", 1, GNUNET_NO); GNUNET_break_op (0); return GNUNET_OK; } @@ -4966,7 +5009,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, " it's for us! sending to clients...\n"); GNUNET_STATISTICS_update (stats, "# unicast received", 1, GNUNET_NO); send_subscribed_clients (message, (struct GNUNET_MessageHeader *) &msg[1]); - // ACK is generated by client (api part), service only retransmits. + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST); return GNUNET_OK; } ttl = ntohl (msg->ttl); @@ -4975,11 +5018,11 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, { GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n"); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); return GNUNET_OK; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n"); - GNUNET_STATISTICS_update (stats, "# unicast forwarded", 1, GNUNET_NO); neighbor = tree_get_first_hop (t->tree, dest_id); cinfo = tunnel_get_neighbor_fc (t, neighbor); @@ -4987,12 +5030,15 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_CONTAINER_multihashmap_iterate (t->children_fc, &tunnel_add_skip, &neighbor); - if (is_pid_bigger(pid, cinfo->fwd_ack)) + if (is_pid_bigger (pid, cinfo->fwd_ack)) { + GNUNET_STATISTICS_update (stats, "# unsolicited unicast", 1, GNUNET_NO); GNUNET_break_op (0); return GNUNET_OK; } send_message (message, neighbor, t); + GNUNET_STATISTICS_update (stats, "# unicast forwarded", 1, GNUNET_NO); + return GNUNET_OK; } @@ -5047,6 +5093,7 @@ handle_mesh_data_multicast (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_STATISTICS_update (stats, "# duplicate PID drops", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Already seen pid %u, DROPPING!\n", pid); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); return GNUNET_OK; } else @@ -5064,12 +5111,14 @@ handle_mesh_data_multicast (void *cls, const struct GNUNET_PeerIdentity *peer, { GNUNET_STATISTICS_update (stats, "# multicast received", 1, GNUNET_NO); send_subscribed_clients (message, &msg[1].header); + tunnel_send_fwd_ack(t, GNUNET_MESSAGE_TYPE_MESH_MULTICAST); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ttl: %u\n", ntohl (msg->ttl)); if (ntohl (msg->ttl) == 0) { GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n"); + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); return GNUNET_OK; } GNUNET_STATISTICS_update (stats, "# multicast forwarded", 1, GNUNET_NO); @@ -5124,7 +5173,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } - if (t->id.oid == myid) + if (NULL != t->owner) { char cbuf[size]; struct GNUNET_MESH_ToOrigin *copy; @@ -5147,6 +5196,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_STATISTICS_update (stats, "# to origin received", 1, GNUNET_NO); GNUNET_SERVER_notification_context_unicast (nc, t->owner->handle, ©->header, GNUNET_NO); + tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); return GNUNET_OK; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -5186,11 +5236,10 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer, unsigned int atsi_count) { struct GNUNET_MESH_ACK *msg; - struct MeshTunnelChildInfo *cinfo; struct MeshTunnel *t; uint32_t ack; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got an ACK packet from %s\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK packet from %s!\n", GNUNET_i2s (peer)); msg = (struct GNUNET_MESH_ACK *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " of type %u\n", @@ -5205,9 +5254,24 @@ handle_mesh_ack (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } ack = ntohl (msg->pid); - cinfo = tunnel_get_neighbor_fc (t, peer); - cinfo->fwd_ack = ack; - tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); + + /* Is this a forward or backward ACK? */ + if (tree_get_predecessor(t->tree) == GNUNET_PEER_search(peer)) + { + struct MeshTunnelChildInfo *cinfo; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK\n"); + cinfo = tunnel_get_neighbor_fc (t, peer); + cinfo->fwd_ack = ack; + tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " BCK ACK\n"); + t->bck_ack = ack; + tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_ACK); + } + // FIXME fc Unlock queues? return GNUNET_OK; } diff --git a/src/mesh/mesh_api.c b/src/mesh/mesh_api.c index 201d18ae6..cfabd4c0c 100644 --- a/src/mesh/mesh_api.c +++ b/src/mesh/mesh_api.c @@ -1275,7 +1275,8 @@ send_callback (void *cls, size_t size, void *buf) if (GNUNET_YES == th_is_payload (th)) { LOG (GNUNET_ERROR_TYPE_DEBUG, " payload\n"); - if (t->max_pid < t->pid && ! PID_OVERFLOW (t->pid, t->max_pid)) { + if (t->max_pid < t->pid && GNUNET_NO == PID_OVERFLOW (t->pid, t->max_pid)) + { /* This tunnel is not ready to transmit yet, try next message */ next = th->next; continue; @@ -2020,6 +2021,7 @@ GNUNET_MESH_notify_transmit_ready_cancel (struct GNUNET_MESH_TransmitHandle *th) { struct GNUNET_MESH_Handle *mesh; + th->tunnel->packet_size = 0; mesh = th->tunnel->mesh; if (th->timeout_task != GNUNET_SCHEDULER_NO_TASK) GNUNET_SCHEDULER_cancel (th->timeout_task); -- 2.25.1