From: Bart Polot Date: Sat, 13 Jul 2013 01:13:07 +0000 (+0000) Subject: - use message ID for end-to-end ACKs X-Git-Tag: initial-import-from-subversion-38251~8333 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=1d53ec8da14a39e436cbddebba9cd92537870b6a;hp=581fa8973a09190bc59f62e5da7eb69983f39228;p=oweals%2Fgnunet.git - use message ID for end-to-end ACKs --- diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index 169b30087..7504fa223 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -51,9 +51,7 @@ #define MESH_MAX_POLL_TIME GNUNET_TIME_relative_multiply (\ GNUNET_TIME_UNIT_MINUTES,\ 10) -#define MESH_RETRANSMIT_TIME GNUNET_TIME_relative_multiply (\ - GNUNET_TIME_UNIT_SECONDS,\ - 5) +#define MESH_RETRANSMIT_TIME GNUNET_TIME_UNIT_SECONDS #define MESH_RETRANSMIT_MARGIN 4 #if MESH_DEBUG_CONNECTION @@ -285,6 +283,12 @@ struct MESH_TunnelID }; +/** + * Data needed for reliable tunnel endpoint retransmission management. + */ +struct MeshTunnelReliability; + + /** * Info needed to retry a message in case it gets lost. */ @@ -304,7 +308,7 @@ struct MeshReliableMessage /** * ID of the message (ACK needed to free) */ - uint32_t id; + uint64_t mid; /** * When was this message issued (to calculate ACK delay) FIXME update with traffic @@ -315,9 +319,6 @@ struct MeshReliableMessage }; -/** - * Data needed for reliable tunnel endpoint retransmission management. - */ struct MeshTunnelReliability { /** @@ -336,12 +337,19 @@ struct MeshTunnelReliability */ unsigned int n_sent; + /** + * Next MID to use + */ + uint64_t mid_sent; + /** * DLL of messages received out of order. */ struct MeshReliableMessage *head_recv; struct MeshReliableMessage *tail_recv; + uint64_t mid_recv; + /** * Task to resend/poll in case no ACK is received. */ @@ -2057,7 +2065,7 @@ tunnel_send_fwd_data_ack (struct MeshTunnel *t) msg.header.size = htons (sizeof (msg)); msg.tid = htonl (t->id.tid); GNUNET_PEER_resolve (t->id.oid, &msg.oid); - msg.pid = htonl (t->prev_fc.last_pid_recv); + msg.mid = GNUNET_htonll (t->bck_rel->mid_recv); msg.futures = 0; // FIXME set bits of other newer messages received send_prebuilt_message (&msg.header, t->prev_hop, t); @@ -2078,7 +2086,7 @@ tunnel_send_bck_data_ack (struct MeshTunnel *t) msg.header.size = htons (sizeof (msg)); msg.tid = htonl (t->id.tid); GNUNET_PEER_resolve (t->id.oid, &msg.oid); - msg.pid = htonl (t->next_fc.last_pid_recv); + msg.mid = GNUNET_htonll (t->fwd_rel->mid_recv); msg.futures = 0; // FIXME set bits of other newer messages received send_prebuilt_message (&msg.header, t->next_hop, t); @@ -2100,6 +2108,7 @@ static void tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) { uint32_t ack; + int delta; /* Is it after unicast retransmission? */ switch (type) @@ -2114,18 +2123,10 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) } if (GNUNET_YES == t->reliable && NULL != t->client) tunnel_send_fwd_data_ack (t); - if (GNUNET_YES == t->reliable && NULL != t->owner) - if (t->fwd_rel->n_sent > 10) - return; break; case GNUNET_MESSAGE_TYPE_MESH_ACK: - if (NULL != t->owner && GNUNET_YES == t->reliable) - return; case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK: break; - case GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK: - tunnel_send_fwd_data_ack (t); - break; case GNUNET_MESSAGE_TYPE_MESH_POLL: case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK: t->force_ack = GNUNET_YES; @@ -2151,7 +2152,10 @@ tunnel_send_fwd_ack (struct MeshTunnel *t, uint16_t type) } /* Ok, ACK might be necessary, what PID to ACK? */ - ack = t->prev_fc.last_pid_recv + t->queue_max - t->next_fc.queue_n; + delta = t->queue_max - t->next_fc.queue_n; + if (0 > delta) + delta = 0; + ack = t->prev_fc.last_pid_recv + delta; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK %u\n", ack); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " last %u, qmax %u, q %u\n", @@ -2202,18 +2206,12 @@ tunnel_send_bck_ack (struct MeshTunnel *t, uint16_t type) " Not sending ACK, nobuffer + traffic\n"); return; } + if (GNUNET_YES == t->reliable && NULL != t->owner) + tunnel_send_bck_data_ack (t); break; case GNUNET_MESSAGE_TYPE_MESH_ACK: - /* Why was this here?! - * This prevents the destination from starting traffic to the origin - */ -// if (NULL != t->client && GNUNET_YES == t->reliable) -// return; case GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK: break; - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK: - tunnel_send_bck_data_ack (t); - /* fall through */ case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK: case GNUNET_MESSAGE_TYPE_MESH_POLL: t->force_ack = GNUNET_YES; @@ -2324,6 +2322,7 @@ tunnel_retransmit_message (void *cls, { struct MeshTunnelReliability *rel = cls; struct MeshReliableMessage *copy; + struct MeshFlowControl *fc; struct MeshPeerQueue *q; struct MeshPeerInfo *pi; struct MeshTunnel *t; @@ -2345,6 +2344,7 @@ tunnel_retransmit_message (void *cls, /* Search the message to be retransmitted in the outgoing queue */ payload = (struct GNUNET_MESH_Data *) ©[1]; hop = rel == t->fwd_rel ? t->next_hop : t->prev_hop; + fc = rel == t->fwd_rel ? &t->next_fc : &t->prev_fc; pi = peer_get_short (hop); for (q = pi->queue_head; NULL != q; q = q->next) { @@ -2352,7 +2352,7 @@ tunnel_retransmit_message (void *cls, { struct GNUNET_MESH_Data *queued_data = q->cls; - if (queued_data->pid == payload->pid) + if (queued_data->mid == payload->mid) break; } } @@ -2360,14 +2360,18 @@ tunnel_retransmit_message (void *cls, /* Message not found in the queue */ if (NULL == q) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %u\n", copy->id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! RETRANSMIT %llu\n", copy->mid); copy->timestamp = GNUNET_TIME_absolute_get (); + + fc->last_ack_sent++; + payload->pid = htonl (fc->last_pid_recv + 1); + fc->last_pid_recv++; send_prebuilt_message (&payload->header, hop, t); GNUNET_STATISTICS_update (stats, "# data retransmitted", 1, GNUNET_NO); } else { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! QUEUED %u\n", copy->id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! STILL IN QUEUE %llu\n", copy->mid); } rel->retry_timer = GNUNET_TIME_STD_BACKOFF (rel->retry_timer); @@ -3161,13 +3165,11 @@ queue_send (void *cls, size_t size, void *buf) switch (type) { case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - if (GMC_is_pid_bigger(pid, t->next_fc.last_pid_sent)) - t->next_fc.last_pid_sent = pid; + t->next_fc.last_pid_sent = pid; tunnel_send_fwd_ack (t, GNUNET_MESSAGE_TYPE_MESH_UNICAST); break; case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - if (GMC_is_pid_bigger(pid, t->prev_fc.last_pid_sent)) - t->prev_fc.last_pid_sent = pid; + t->prev_fc.last_pid_sent = pid; tunnel_send_bck_ack (t, GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); break; default: @@ -3248,8 +3250,6 @@ queue_add (void *cls, uint16_t type, size_t size, struct MeshPeerQueue *queue; struct GNUNET_PeerIdentity id; struct MeshFlowControl *fc; - uint32_t pid; - uint32_t pid_q; int priority; fc = NULL; @@ -3257,56 +3257,30 @@ queue_add (void *cls, uint16_t type, size_t size, if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) { fc = &t->next_fc; - pid = ntohl (((struct GNUNET_MESH_Data *)cls)->pid); } else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type) { fc = &t->prev_fc; - pid = ntohl (((struct GNUNET_MESH_Data *)cls)->pid); } if (NULL != fc) { if (fc->queue_n >= t->queue_max) { GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "queue full: %u/%u\n", fc->queue_n, t->queue_max); - GNUNET_STATISTICS_update (stats, - "# messages dropped (buffer full)", - 1, GNUNET_NO); - /* Get the PID of the oldest message in the queue */ - for (queue = dst->queue_head; queue != NULL; queue = queue->next) - if (queue->type == type && queue->tunnel == t) - { - pid_q = ntohl (((struct GNUNET_MESH_Data *)(queue->cls))->pid); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "pid: %u, pid_q: %u\n", pid, pid_q); - break; - } - GNUNET_assert (NULL != queue); - - /* If this is an earlier message that that, give it priority: - * - drop the newest message in the queue - * - instert current one at the end of the queue (first to get out) - */ - if (GNUNET_YES == t->reliable && GMC_is_pid_bigger(pid_q, pid)) + + /* If this isn't a retransmission, drop the message */ + if (GNUNET_NO == t->reliable || + (NULL == t->owner && GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) || + (NULL == t->client && GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)) { - for (queue = dst->queue_tail; queue != NULL; queue = queue->prev) - if (queue->type == type && queue->tunnel == t) - { - /* Drop message from queue */ - pid_q = ntohl (((struct GNUNET_MESH_Data *)(queue->cls))->pid); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "dropping pid: %u\n", pid_q); - queue_destroy (queue, GNUNET_YES); - t->pending_messages--; - priority = GNUNET_YES; - break; - } - } - else + GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)", + 1, GNUNET_NO); return; /* Drop this message */ + } + priority = GNUNET_YES; } fc->queue_n++; } @@ -3773,15 +3747,15 @@ handle_mesh_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, " it's for us! sending to clients...\n"); GNUNET_STATISTICS_update (stats, "# unicast received", 1, GNUNET_NO); // if (GMC_is_pid_bigger(pid, t->prev_fc.last_pid_recv)) FIXME use - if ( (GNUNET_NO == t->reliable && - GMC_is_pid_bigger(pid, t->prev_fc.last_pid_recv)) - || - (GNUNET_YES == t->reliable && - pid == t->prev_fc.last_pid_recv + 1) ) + if (GMC_is_pid_bigger (pid, t->prev_fc.last_pid_recv) + && + (GNUNET_NO == t->reliable || + GNUNET_ntohll (msg->mid) == (t->bck_rel->mid_recv + 1)) ) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " pid %u not seen yet, forwarding\n", pid); t->prev_fc.last_pid_recv = pid; + t->bck_rel->mid_recv++; tunnel_send_client_ucast (t, msg); } else @@ -3958,7 +3932,7 @@ handle_mesh_data_ack (void *cls, const struct GNUNET_PeerIdentity *peer, struct MeshReliableMessage *next; struct MeshTunnel *t; GNUNET_PEER_Id id; - uint32_t ack; + uint64_t ack; uint16_t type; int work; @@ -3974,8 +3948,8 @@ handle_mesh_data_ack (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_STATISTICS_update (stats, "# ack on unknown tunnel", 1, GNUNET_NO); return GNUNET_OK; } - ack = ntohl (msg->pid); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack); + ack = GNUNET_ntohll (msg->mid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ACK %llu\n", ack); /* Is this a forward or backward ACK? */ id = GNUNET_PEER_search (peer); @@ -4007,18 +3981,18 @@ handle_mesh_data_ack (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! ACK %u\n", ack); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! ACK %llu\n", ack); for (work = GNUNET_NO, copy = rel->head_sent; copy != NULL; copy = next) { struct GNUNET_TIME_Relative time; - if (copy->id > ack) + if (copy->mid > ack) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! head %u, out!\n", copy->id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! head %llu, out!\n", copy->mid); break; } work = GNUNET_YES; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! id %u\n", copy->id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! id %llu\n", copy->mid); next = copy->next; time = GNUNET_TIME_absolute_get_duration (copy->timestamp); rel->expected_delay.rel_value += time.rel_value; @@ -4783,13 +4757,13 @@ handle_local_data (void *cls, struct GNUNET_SERVER_Client *client, struct MeshTunnelReliability *rel; struct MeshReliableMessage *copy; + rel = (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV) ? t->fwd_rel : t->bck_rel; copy = GNUNET_malloc (sizeof (struct MeshReliableMessage) + sizeof(struct GNUNET_MESH_Data) + size); - copy->id = fc->last_pid_recv + 1; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! DATA %u\n", copy->id); + copy->mid = rel->mid_sent++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! DATA %llu\n", copy->mid); copy->timestamp = GNUNET_TIME_absolute_get (); - rel = (tid < GNUNET_MESH_LOCAL_TUNNEL_ID_SERV) ? t->fwd_rel : t->bck_rel; copy->rel = rel; rel->n_sent++; GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy); @@ -4804,10 +4778,14 @@ handle_local_data (void *cls, struct GNUNET_SERVER_Client *client, rel); } payload = (struct GNUNET_MESH_Data *) ©[1]; + payload->mid = GNUNET_htonll (copy->mid); } else { payload = (struct GNUNET_MESH_Data *) cbuf; + payload->mid = 0; + // FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME + // use different struct for unreliable traffic, save 8 bytes } memcpy (&payload[1], &data_msg[1], size); payload->header.size = htons (sizeof (struct GNUNET_MESH_Data) + size); diff --git a/src/mesh/mesh_protocol.h b/src/mesh/mesh_protocol.h index 28cb1fa93..6067da908 100644 --- a/src/mesh/mesh_protocol.h +++ b/src/mesh/mesh_protocol.h @@ -121,7 +121,7 @@ struct GNUNET_MESH_Data uint32_t ttl GNUNET_PACKED; /** - * Unique ID of the packet + * ID of the packet */ uint32_t pid GNUNET_PACKED; @@ -130,6 +130,11 @@ struct GNUNET_MESH_Data */ struct GNUNET_PeerIdentity oid; + /** + * Unique ID of the payload message + */ + uint64_t mid GNUNET_PACKED; + /** * Payload follows */ @@ -157,16 +162,16 @@ struct GNUNET_MESH_DataACK struct GNUNET_PeerIdentity oid; /** - * Maximum packet ID acknowledged. + * Last message ID received. */ - uint32_t pid; + uint64_t mid GNUNET_PACKED; /** * Bitfield of already-received newer messages // TODO implement and use * pid + 1 @ LSB * pid + 32 @ MSB */ - uint32_t futures; + uint32_t futures GNUNET_PACKED; }; @@ -193,7 +198,7 @@ struct GNUNET_MESH_ACK /** * Maximum packet ID authorized. */ - uint32_t pid; + uint32_t pid GNUNET_PACKED; };