From a78e015dd4764c54c013729cf58c55dbaa8af626 Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Sun, 11 Aug 2013 03:31:06 +0000 Subject: [PATCH] - major refactorization --- src/mesh/gnunet-service-mesh-enc.c | 631 +++++++++++++++++++++-------- 1 file changed, 458 insertions(+), 173 deletions(-) diff --git a/src/mesh/gnunet-service-mesh-enc.c b/src/mesh/gnunet-service-mesh-enc.c index e5ea26ae6..6005e22db 100644 --- a/src/mesh/gnunet-service-mesh-enc.c +++ b/src/mesh/gnunet-service-mesh-enc.c @@ -191,6 +191,11 @@ struct MeshPeerQueue */ struct MeshConnection *c; + /** + * Is FWD in c? + */ + int fwd; + /** * Channel this message belongs to, if known. */ @@ -225,17 +230,7 @@ struct MeshFlowControl struct MeshConnection *c; /** - * Transmission queue to core DLL head - */ - struct MeshPeerQueue *queue_head; - - /** - * Transmission queue to core DLL tail - */ - struct MeshPeerQueue *queue_tail; - - /** - * How many messages are in the queue to this peer. + * How many messages are in the queue on this connection. */ unsigned int queue_n; @@ -244,11 +239,6 @@ struct MeshFlowControl */ unsigned int queue_max; - /** - * Handle for queued transmissions - */ - struct GNUNET_CORE_TransmitHandle *core_transmit; - /** * ID of the last packet sent towards the peer. */ @@ -316,6 +306,30 @@ struct MeshPeer */ struct MeshTunnel2 *tunnel; + /** + * Connections that go through this peer, indexed by tid; + */ + struct GNUNET_CONTAINER_MultiHashMap *connections; + + /** + * Handle for queued transmissions + */ + struct GNUNET_CORE_TransmitHandle *core_transmit; + + /** + * Transmission queue to core DLL head + */ + struct MeshPeerQueue *queue_head; + + /** + * Transmission queue to core DLL tail + */ + struct MeshPeerQueue *queue_tail; + + /** + * How many messages are in the queue to this peer. + */ + unsigned int queue_n; }; @@ -363,7 +377,7 @@ struct MeshChannelReliability struct MeshReliableMessage *tail_sent; /** - * Messages pending + * Messages pending to send. */ unsigned int n_sent; @@ -373,6 +387,16 @@ struct MeshChannelReliability struct MeshReliableMessage *head_recv; struct MeshReliableMessage *tail_recv; + /** + * Messages received. + */ + unsigned int n_recv; + + /** + * Can we send data to the client? + */ + int client_ready; + /** * Task to resend/poll in case no ACK is received. */ @@ -1241,6 +1265,22 @@ connection_get_next_hop (struct MeshConnection *c) } +/** + * Get the hop in a connection. + * + * @param c Connection. + * @param fwd Next hop? + * + * @return Next peer in the connection. + */ +static struct MeshPeer * +connection_get_hop (struct MeshConnection *c, int fwd) +{ + if (fwd) + return connection_get_next_hop (c); + return connection_get_prev_hop (c); +} + /** * Check if client has registered with the service and has not disconnected * @@ -1421,6 +1461,29 @@ tunnel_get_connection (struct MeshTunnel2 *t, int fwd) } +/** + * Get the total buffer space for a tunnel + */ +static unsigned int +tunnel_get_buffer (struct MeshTunnel2 *t, int fwd) +{ + struct MeshConnection *c; + struct MeshFlowControl *fc; + unsigned int buffer; + + for (buffer = 0, c = t->connection_head; NULL != c; c = c->next) + { + if (c->state != MESH_CONNECTION_READY) + continue; + + fc = fwd ? &c->fwd_fc : &c->bck_fc; + buffer += fc->last_ack_recv - fc->last_pid_sent; + } + + return buffer; +} + + /** * FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME * Encrypt data with the tunnel key. @@ -1482,7 +1545,7 @@ send_prebuilt_message_connection (const struct GNUNET_MessageHeader *message, size_t size; uint16_t type; - neighbor = fwd ? connection_get_next_hop (c) : connection_get_prev_hop (c); + neighbor = connection_get_hop (c, fwd); if (NULL == neighbor) { GNUNET_break (0); @@ -1780,23 +1843,140 @@ send_core_connection_ack (void *cls, size_t size, void *buf) /** - * Iterator over all the peers to remove the oldest not-used entry. + * Destroy the peer_info and free any allocated resources linked to it + * + * @param peer The peer_info to destroy. + * + * @return GNUNET_OK on success + */ +static int +peer_destroy (struct MeshPeer *peer) +{ + struct GNUNET_PeerIdentity id; + struct MeshPeerPath *p; + struct MeshPeerPath *nextp; + + GNUNET_PEER_resolve (peer->id, &id); + GNUNET_PEER_change_rc (peer->id, -1); + + if (GNUNET_YES != + GNUNET_CONTAINER_multihashmap_remove (peers, &id.hashPubKey, peer)) + { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "removing peer %s, not in hashmap\n", GNUNET_i2s (&id)); + } + if (NULL != peer->dhtget) + { + GNUNET_DHT_get_stop (peer->dhtget); + } + p = peer->path_head; + while (NULL != p) + { + nextp = p->next; + GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, p); + path_destroy (p); + p = nextp; + } + tunnel_destroy_empty (peer->tunnel); + GNUNET_free (peer); + return GNUNET_OK; +} + + +/** + * Returns if peer is used (has a tunnel, is neighbor). + * + * @peer Peer to check. + * + * @return GNUNET_YES if peer is in use. + */ +static int +peer_is_used (struct MeshPeer *peer) +{ + struct MeshPeerPath *p; + + if (NULL != peer->tunnel) + return GNUNET_YES; + + for (p = peer->path_head; NULL != p; p = p->next) + { + if (p->length < 3) + return GNUNET_YES; + } + return GNUNET_NO; +} + +/** + * Iterator over all the peers to get the oldest timestamp. * * @param cls Closure (unsued). * @param key ID of the peer. * @param value Peer_Info of the peer. + */ +static int +peer_get_oldest (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct MeshPeer *p = value; + struct GNUNET_TIME_Absolute *abs = cls; + + /* Don't count active peers */ + if (GNUNET_YES == peer_is_used (p)) + return GNUNET_YES; + + if (abs->abs_value < p->last_contact.abs_value) + abs->abs_value = p->last_contact.abs_value; + + return GNUNET_YES; +} + + +/** + * Iterator over all the peers to remove the oldest entry. * - * FIXME implement + * @param cls Closure (unsued). + * @param key ID of the peer. + * @param value Peer_Info of the peer. */ static int peer_timeout (void *cls, const struct GNUNET_HashCode *key, void *value) { + struct MeshPeer *p = value; + struct GNUNET_TIME_Absolute *abs = cls; + + if (p->last_contact.abs_value == abs->abs_value && + GNUNET_NO == peer_is_used (p)) + { + peer_destroy (p); + return GNUNET_NO; + } return GNUNET_YES; } +/** + * Delete oldest unused peer. + */ +static void +peer_delete_oldest (void) +{ + struct GNUNET_TIME_Absolute abs; + + abs = GNUNET_TIME_UNIT_FOREVER_ABS; + + GNUNET_CONTAINER_multihashmap_iterate (peers, + &peer_get_oldest, + &abs); + GNUNET_CONTAINER_multihashmap_iterate (peers, + &peer_timeout, + &abs); +} + + /** * Retrieve the MeshPeer stucture associated with the peer, create one * and insert it in the appropriate structures if the peer is not known yet. @@ -1816,9 +1996,7 @@ peer_get (const struct GNUNET_PeerIdentity *peer_id) peer = GNUNET_new (struct MeshPeer); if (GNUNET_CONTAINER_multihashmap_size (peers) > max_peers) { - GNUNET_CONTAINER_multihashmap_iterate (peers, - &peer_timeout, - NULL); + peer_delete_oldest (); } GNUNET_CONTAINER_multihashmap_put (peers, &peer_id->hashPubKey, peer, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); @@ -1921,6 +2099,49 @@ peer_get_best_path (const struct MeshPeer *peer) return best_p; } +static int +queue_is_sendable (struct MeshPeerQueue *q) +{ + struct MeshFlowControl *fc; + + /* Is PID-independent? */ + switch (q->type) + { + case GNUNET_MESSAGE_TYPE_MESH_ACK: + case GNUNET_MESSAGE_TYPE_MESH_POLL: + return GNUNET_YES; + } + + /* Is PID allowed? */ + fc = q->fwd ? &q->c->fwd_fc : &q->c->bck_fc; + if (GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent)) + return GNUNET_YES; + + return GNUNET_NO; +} + + +/** + * Get first sendable message. + * + * @param peer The destination peer. + * + * @return Best current known path towards the peer, if any. + */ +static struct MeshPeerQueue * +peer_get_first_message (const struct MeshPeer *peer) +{ + struct MeshPeerQueue *q; + + for (q = peer->queue_head; NULL != q; q = q->next) + { + if (queue_is_sendable (q)) + return q; + } + + return NULL; +} + /** * Try to establish a new connection to this peer in the given tunnel. @@ -1973,6 +2194,35 @@ peer_connect (struct MeshPeer *peer) } +/** + * Get the first transmittable message for a connection. + * + * @param c Connection. + * @param fwd Is this FWD? + * + * @return First transmittable message. + */ +static struct MeshPeerQueue * +connection_get_first_message (struct MeshConnection *c, int fwd) +{ + struct MeshPeerQueue *q; + struct MeshPeer *p; + + p = connection_get_hop (c, fwd); + + for (q = p->queue_head; NULL != q; q = q->next) + { + if (q->c != c) + continue; + if (queue_is_sendable (q)) + return q; + } + + return NULL; +} + + + /** * @brief Re-initiate traffic on this connection if necessary. * @@ -1986,23 +2236,21 @@ peer_connect (struct MeshPeer *peer) static void connection_unlock_queue (struct MeshConnection *c, int fwd) { - struct MeshFlowControl *fc; struct MeshPeer *peer; struct MeshPeerQueue *q; size_t size; - peer = fwd ? connection_get_next_hop(c) : connection_get_prev_hop(c); - fc = fwd ? &c->fwd_fc : &c->bck_fc; + peer = connection_get_hop (c, fwd); - if (NULL != fc->core_transmit) + if (NULL != peer->core_transmit) return; /* Already unlocked */ - q = fc->queue_head; + q = connection_get_first_message (c, fwd); if (NULL == q) return; /* Nothing to transmit */ size = q->size; - fc->core_transmit = + peer->core_transmit = GNUNET_CORE_notify_transmit_ready (core_handle, GNUNET_NO, 0, @@ -2026,6 +2274,7 @@ connection_cancel_queues (struct MeshConnection *c, int fwd) struct MeshPeerQueue *q; struct MeshPeerQueue *next; struct MeshFlowControl *fc; + struct MeshPeer *peer; if (NULL == c) { @@ -2033,7 +2282,9 @@ connection_cancel_queues (struct MeshConnection *c, int fwd) return; } fc = fwd ? &c->fwd_fc : &c->bck_fc; - for (q = fc->queue_head; NULL != q; q = next) + peer = connection_get_hop (c, fwd); + + for (q = peer->queue_head; NULL != q; q = next) { next = q->next; if (q->c == c) @@ -2044,12 +2295,12 @@ connection_cancel_queues (struct MeshConnection *c, int fwd) queue_destroy (q, GNUNET_YES); } } - if (NULL == fc->queue_head) + if (NULL == peer->queue_head) { - if (NULL != fc->core_transmit) + if (NULL != peer->core_transmit) { - GNUNET_CORE_notify_transmit_ready_cancel (fc->core_transmit); - fc->core_transmit = NULL; + GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); + peer->core_transmit = NULL; } if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task) { @@ -2060,48 +2311,6 @@ connection_cancel_queues (struct MeshConnection *c, int fwd) } -/** - * Destroy the peer_info and free any allocated resources linked to it - * - * @param peer The peer_info to destroy. - * - * @return GNUNET_OK on success - */ -static int -peer_destroy (struct MeshPeer *peer) -{ - struct GNUNET_PeerIdentity id; - struct MeshPeerPath *p; - struct MeshPeerPath *nextp; - - GNUNET_PEER_resolve (peer->id, &id); - GNUNET_PEER_change_rc (peer->id, -1); - - if (GNUNET_YES != - GNUNET_CONTAINER_multihashmap_remove (peers, &id.hashPubKey, peer)) - { - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "removing peer %s, not in hashmap\n", GNUNET_i2s (&id)); - } - if (NULL != peer->dhtget) - { - GNUNET_DHT_get_stop (peer->dhtget); - } - p = peer->path_head; - while (NULL != p) - { - nextp = p->next; - GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, p); - path_destroy (p); - p = nextp; - } - tunnel_destroy_empty (peer->tunnel); - GNUNET_free (peer); - return GNUNET_OK; -} - - /** * Remove all paths that rely on a direct connection between p1 and p2 * from the peer itself and notify all tunnels about it. @@ -2874,7 +3083,8 @@ channel_send_client_data (struct MeshChannel *ch, /** - * Send up to 64 buffered messages to the client for in order delivery. + * Send a buffered message to the client, for in order delivery or + * as result of client ACK. * * @param ch Channel on which to empty the message buffer. * @param c Client to send to. @@ -2887,22 +3097,21 @@ channel_send_client_buffered_data (struct MeshChannel *ch, struct MeshChannelReliability *rel) { struct MeshReliableMessage *copy; - struct MeshReliableMessage *next; uint32_t *mid; - if (GNUNET_NO == ch->reliable) + if (GNUNET_NO == rel->client_ready) { - GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n"); return; } mid = rel == ch->bck_rel ? &ch->mid_recv_fwd : &ch->mid_recv_bck; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n"); - for (copy = rel->head_recv; NULL != copy; copy = next) + copy = rel->head_recv; + if (NULL != copy) { - next = copy->next; - if (copy->mid == *mid) + if (copy->mid == *mid || GNUNET_NO == ch->reliable) { struct GNUNET_MESH_Data *msg = (struct GNUNET_MESH_Data *) ©[1]; @@ -2910,6 +3119,7 @@ channel_send_client_buffered_data (struct MeshChannel *ch, " have %u! now expecting %u\n", copy->mid, *mid + 1); channel_send_client_data (ch, msg, (rel == ch->bck_rel)); + rel->n_recv--; *mid = *mid + 1; GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy); GNUNET_free (copy); @@ -2917,7 +3127,7 @@ channel_send_client_buffered_data (struct MeshChannel *ch, else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " don't have %u, next is %u\n", + " reliable && don't have %u, next is %u\n", *mid, copy->mid); return; @@ -2928,8 +3138,9 @@ channel_send_client_buffered_data (struct MeshChannel *ch, /** - * We have received a message out of order, buffer it until we receive - * the missing one and we can feed the rest to the client. + * We have received a message out of order, or the client is not ready. + * Buffer it until we receive an ACK from the client or the missing + * message from the channel. * * @param msg Message to buffer. * @param rel Reliability data to the corresponding direction. @@ -2952,6 +3163,8 @@ channel_rel_add_buffered_data (const struct GNUNET_MESH_Data *msg, copy->rel = rel; memcpy (©[1], msg, size); + rel->n_recv++; + // FIXME do something better than O(n), although n < 64... // FIXME start from the end (most messages are the latest ones) for (prev = rel->head_recv; NULL != prev; prev = prev->next) @@ -3121,10 +3334,10 @@ channel_retransmit_message (void *cls, struct MeshChannelReliability *rel = cls; struct MeshReliableMessage *copy; struct MeshPeerQueue *q; - struct MeshFlowControl *fc; struct MeshChannel *ch; struct MeshConnection *c; struct GNUNET_MESH_Data *payload; + struct MeshPeer *hop; int fwd; rel->retry_task = GNUNET_SCHEDULER_NO_TASK; @@ -3152,8 +3365,8 @@ channel_retransmit_message (void *cls, payload = (struct GNUNET_MESH_Data *) ©[1]; fwd = (rel == ch->fwd_rel); c = tunnel_get_connection (ch->t, fwd); - fc = fwd ? &c->fwd_fc : &c->bck_fc; - for (q = fc->queue_head; NULL != q; q = q->next) + hop = connection_get_hop (c, fwd); + for (q = hop->queue_head; NULL != q; q = q->next) { if (ntohs (payload->header.type) == q->type && ch == q->ch) { @@ -3184,6 +3397,57 @@ channel_retransmit_message (void *cls, } + +/** + * Send ACK on one or more connections due to buffer space to the client. + */ +static void +channel_send_ack (struct MeshChannel *ch, uint32_t buffer, int fwd) +{ + struct MeshTunnel2 *t = ch->t; + struct MeshConnection *c; + struct MeshFlowControl *fc; + uint32_t allowed; + uint32_t to_allow; + unsigned int cs; + + /* Count connections, how many messages are already allowed */ + for (cs = 0, allowed = 0, c = t->connection_head; NULL != c; c = c->next) + { + fc = fwd ? &c->fwd_fc : &c->bck_fc; + if (GMC_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent)) + { + GNUNET_break (0); + continue; + } + allowed += fc->last_ack_sent - fc->last_pid_recv; + cs++; + } + + /* Make sure there is no overflow */ + if (allowed > buffer) + { + GNUNET_break (0); + return; + } + + /* Authorize connections to send more data */ + to_allow = buffer - allowed; + for (c = t->connection_head; NULL != c && to_allow > 0; c = c->next) + { + fc = fwd ? &c->fwd_fc : &c->bck_fc; + if (fc->last_ack_sent - fc->last_pid_recv > 64 / 3) + { + continue; + } + send_ack (c, fc->last_ack_sent + 1, fwd); + to_allow--; + } + + GNUNET_break (to_allow == 0); +} + + /** * Send keepalive packets for a connection. * @@ -3275,7 +3539,7 @@ connection_fwd_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext * if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - connection_keepalive (c, GNUNET_YES); + connection_maintain (c, GNUNET_YES); c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, &connection_fwd_keepalive, c); @@ -3291,7 +3555,7 @@ connection_bck_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext * if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - connection_keepalive (c, GNUNET_NO); + connection_maintain (c, GNUNET_NO); c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, &connection_bck_keepalive, c); @@ -3536,7 +3800,7 @@ tunnel_destroy (struct MeshTunnel2 *t) return; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying tunnel %s\n", - GNUNET_i2s (GNUNET_PEER_resolve2 (c->t->peer->id))); + GNUNET_i2s (GNUNET_PEER_resolve2 (t->peer->id))); if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (tunnels, &t->id, t)) GNUNET_break (0); @@ -3867,6 +4131,42 @@ connection_reset_timeout (struct MeshConnection *c, int fwd) } +/** + * Iterator to notify all connections of a broken link. Mark connections + * to destroy after all traffic has been sent. + * + * @param cls Closure (peer disconnected). + * @param key Current key code (tid). + * @param value Value in the hash map (connection). + * + * @return GNUNET_YES if we should continue to iterate, + * GNUNET_NO if not. + */ +static int +connection_broken (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct MeshPeer *peer = cls; + struct MeshConnection *c = value; + struct GNUNET_MESH_ConnectionBroken msg; + int fwd; + + fwd = peer == connection_get_prev_hop (c); + connection_cancel_queues (c, !fwd); + + msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN); + msg.cid = htonl (c->id); + msg.tid = c->t->id; + msg.peer1 = my_full_id; + msg.peer2 = *GNUNET_PEER_resolve2 (peer->id); + send_prebuilt_message_connection (&msg.header, c, NULL, fwd); + c->destroy = GNUNET_YES; + + return GNUNET_YES; +} + /******************************************************************************/ /**************** MESH NETWORK HANDLER HELPERS ***********************/ /******************************************************************************/ @@ -3881,10 +4181,12 @@ connection_reset_timeout (struct MeshConnection *c, int fwd) static void queue_destroy (struct MeshPeerQueue *queue, int clear_cls) { + struct MeshPeer *peer; struct MeshFlowControl *fc; int fwd; - fwd = (queue->peer == connection_get_next_hop (queue->c)); + fwd = queue->fwd; + peer = queue->peer; fc = fwd ? &queue->c->fwd_fc : &queue->c->bck_fc; if (GNUNET_YES == clear_cls) @@ -3916,9 +4218,18 @@ queue_destroy (struct MeshPeerQueue *queue, int clear_cls) } GNUNET_free_non_null (queue->cls); } - GNUNET_CONTAINER_DLL_remove (fc->queue_head, fc->queue_tail, queue); + GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue); fc->queue_n--; + peer->queue_n--; + if (NULL != queue->c) + { + queue->c->pending_messages--; + if (NULL != queue->c->t) + { + queue->c->t->pending_messages--; + } + } GNUNET_free (queue); } @@ -3928,28 +4239,17 @@ static size_t queue_send (void *cls, size_t size, void *buf) { struct MeshPeer *peer = cls; - const struct GNUNET_PeerIdentity *dst_id; + struct MeshFlowControl *fc; + struct MeshConnection *c; struct GNUNET_MessageHeader *msg; struct MeshPeerQueue *queue; struct MeshTunnel2 *t; - struct MeshFlowControl *fc; - struct MeshConnection *c; + const struct GNUNET_PeerIdentity *dst_id; size_t data_size; uint32_t pid; uint16_t type; int fwd; - c = queue->c; - fwd = (queue->peer == connection_get_next_hop (c)); - fc = fwd ? &c->fwd_fc : &c->bck_fc; - - if (NULL == fc) - { - GNUNET_break (0); - return 0; - } - fc->core_transmit = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Queue send\n"); if (NULL == buf || 0 == size) @@ -3957,14 +4257,19 @@ queue_send (void *cls, size_t size, void *buf) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Buffer size 0.\n"); return 0; } - queue = fc->queue_head; - /* Queue has no traffic */ + /* Initialize */ + queue = peer_get_first_message (peer); if (NULL == queue) { GNUNET_break (0); /* Core tmt_rdy should've been canceled */ return 0; } + queue->peer->core_transmit = NULL; + c = queue->c; + fwd = queue->fwd; + fc = fwd ? &c->fwd_fc : &c->bck_fc; + dst_id = GNUNET_PEER_resolve2 (peer->id); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* towards %s\n", GNUNET_i2s (dst_id)); @@ -3972,7 +4277,7 @@ queue_send (void *cls, size_t size, void *buf) if (queue->size > size) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* not enough room, reissue\n"); - fc->core_transmit = + peer->core_transmit = GNUNET_CORE_notify_transmit_ready (core_handle, GNUNET_NO, 0, @@ -4028,8 +4333,6 @@ queue_send (void *cls, size_t size, void *buf) data_size = 0; } - fc->queue_n--; - if (0 < drop_percent && GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent) { @@ -4055,12 +4358,12 @@ queue_send (void *cls, size_t size, void *buf) } /* If more data in queue, send next */ - queue = fc->queue_head; + queue = peer_get_first_message (peer); if (NULL != queue) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* more data!\n"); - if (NULL == fc->core_transmit) { - fc->core_transmit = + if (NULL == peer->core_transmit) { + peer->core_transmit = GNUNET_CORE_notify_transmit_ready(core_handle, 0, 0, @@ -4156,7 +4459,6 @@ queue_add (void *cls, uint16_t type, size_t size, return; /* Drop this message */ } - fc->queue_n++; if (GMC_is_pid_bigger(fc->last_pid_sent + 1, fc->last_ack_recv) && GNUNET_SCHEDULER_NO_TASK == fc->poll_task) fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time, @@ -4169,14 +4471,15 @@ queue_add (void *cls, uint16_t type, size_t size, queue->peer = dst; queue->c = c; queue->ch = ch; + queue->fwd = fwd; if (100 <= priority) - GNUNET_CONTAINER_DLL_insert (fc->queue_head, fc->queue_tail, queue); + GNUNET_CONTAINER_DLL_insert (dst->queue_head, dst->queue_tail, queue); else - GNUNET_CONTAINER_DLL_insert_tail (fc->queue_head, fc->queue_tail, queue); + GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue); - if (NULL == fc->core_transmit) + if (NULL == dst->core_transmit) { - fc->core_transmit = + dst->core_transmit = GNUNET_CORE_notify_transmit_ready (core_handle, 0, 0, @@ -4188,6 +4491,8 @@ queue_add (void *cls, uint16_t type, size_t size, } c->pending_messages++; c->t->pending_messages++; + fc->queue_n++; + dst->queue_n++; } @@ -4829,7 +5134,7 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer, fc = fwd ? &c->fwd_fc : &c->bck_fc; /* Check if origin is as expected */ - neighbor = fwd ? connection_get_prev_hop (c) : connection_get_next_hop (c); + neighbor = connection_get_hop (c, fwd); if (peer_get (peer)->id != neighbor->id) { GNUNET_break_op (0); @@ -5145,7 +5450,7 @@ handle_mesh_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_YES : GNUNET_NO; /* Check if origin is as expected */ - neighbor = fwd ? connection_get_prev_hop (c) : connection_get_next_hop (c); + neighbor = connection_get_hop (c, fwd); if (peer_get (peer)->id != neighbor->id) { GNUNET_break_op (0); @@ -5651,7 +5956,6 @@ handle_local_data (void *cls, struct GNUNET_SERVER_Client *client, copy = GNUNET_malloc (sizeof (struct MeshReliableMessage) + sizeof(struct GNUNET_MESH_Data) + size); - copy->mid = *mid; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! DATA %u\n", copy->mid); copy->timestamp = GNUNET_TIME_absolute_get (); @@ -5685,11 +5989,10 @@ handle_local_data (void *cls, struct GNUNET_SERVER_Client *client, payload->chid = htonl (ch->gid); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " calling generic handler...\n"); - if (chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV) - handle_data (ch->t, payload, GNUNET_YES); - else - handle_data (ch->t, payload, GNUNET_NO); + send_prebuilt_message_channel (&payload->header, ch, fwd); } + if (tunnel_get_buffer (ch->t, fwd) > 0) + send_local_ack (ch, c, fwd); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "receive done OK\n"); GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -5709,11 +6012,14 @@ handle_local_ack (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { struct GNUNET_MESH_LocalAck *msg; + struct MeshChannelReliability *rel; struct MeshChannel *ch; struct MeshClient *c; MESH_ChannelNumber chid; + int fwd; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got a local ACK\n"); + /* Sanity check for client registration */ if (NULL == (c = client_get (client))) { @@ -5725,7 +6031,7 @@ handle_local_ack (void *cls, struct GNUNET_SERVER_Client *client, msg = (struct GNUNET_MESH_LocalAck *) message; - /* Tunnel exists? */ + /* Channel exists? */ chid = ntohl (msg->channel_id); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " on channel %X\n", chid); ch = channel_get_by_local_id (c, chid); @@ -5738,19 +6044,12 @@ handle_local_ack (void *cls, struct GNUNET_SERVER_Client *client, return; } - /* Does client own tunnel? I.E: Is this an ACK for BCK traffic? */ - if (chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV) - { - /* The client owns the channel, ACK is for data to_origin, send BCK ACK. */ - ch->prev_fc.last_ack_recv++; - tunnel_send_ack (t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK, GNUNET_NO); - } - else - { - /* The client doesn't own the channel, this ACK is for FWD traffic. */ - t->next_fc.last_ack_recv++; - tunnel_send_ack (t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK, GNUNET_YES); - } + fwd = chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV; + rel = fwd ? ch->fwd_rel : ch->bck_rel; + + rel->client_ready = GNUNET_YES; + channel_send_client_buffered_data (ch, c, rel); + channel_send_ack (ch, 64 - rel->n_recv, fwd); GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -5777,7 +6076,7 @@ monitor_all_tunnels_iterator (void *cls, struct GNUNET_MESH_LocalMonitor *msg; msg = GNUNET_malloc (sizeof(struct GNUNET_MESH_LocalMonitor)); - msg->channel_id = htonl (ch->id); + msg->channel_id = htonl (ch->gid); msg->header.size = htons (sizeof (struct GNUNET_MESH_LocalMonitor)); msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS); @@ -5855,7 +6154,8 @@ handle_local_show_tunnel (void *cls, struct GNUNET_SERVER_Client *client, c->id, &msg->owner, ntohl (msg->channel_id)); - ch = channel_get (&msg->owner, ntohl (msg->channel_id)); +// ch = channel_get (&msg->owner, ntohl (msg->channel_id)); + ch = NULL; // FIXME if (NULL == ch) { /* We don't know the tunnel */ @@ -5920,13 +6220,13 @@ static struct GNUNET_SERVER_MessageHandler client_handlers[] = { static void core_connect (void *cls, const struct GNUNET_PeerIdentity *peer) { - struct MeshPeer *peer_info; + struct MeshPeer *pi; struct MeshPeerPath *path; DEBUG_CONN ("Peer connected\n"); DEBUG_CONN (" %s\n", GNUNET_i2s (&my_full_id)); - peer_info = peer_get (peer); - if (myid == peer_info->id) + pi = peer_get (peer); + if (myid == pi->id) { DEBUG_CONN (" (self)\n"); path = path_new (1); @@ -5935,13 +6235,15 @@ core_connect (void *cls, const struct GNUNET_PeerIdentity *peer) { DEBUG_CONN (" %s\n", GNUNET_i2s (peer)); path = path_new (2); - path->peers[1] = peer_info->id; - GNUNET_PEER_change_rc (peer_info->id, 1); + path->peers[1] = pi->id; + GNUNET_PEER_change_rc (pi->id, 1); GNUNET_STATISTICS_update (stats, "# peers", 1, GNUNET_NO); } path->peers[0] = myid; GNUNET_PEER_change_rc (myid, 1); - peer_add_path (peer_info, path, GNUNET_YES); + peer_add_path (pi, path, GNUNET_YES); + + pi->connections = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_YES); return; } @@ -5956,9 +6258,6 @@ static void core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) { struct MeshPeer *pi; - struct MeshPeerQueue *q; - struct MeshPeerQueue *n; - struct MeshFlowControl *fc; DEBUG_CONN ("Peer disconnected\n"); pi = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey); @@ -5967,33 +6266,19 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) GNUNET_break (0); return; } - fc = pi->fc; - if (NULL != fc) - { - GNUNET_break (0); - return; - } - pi->fc = NULL; - q = fc->queue_head; - while (NULL != q) - { - n = q->next; - queue_destroy (q, GNUNET_YES); - q = n; - } - if (NULL != fc->core_transmit) - GNUNET_CORE_notify_transmit_ready_cancel (fc->core_transmit); - if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task) - GNUNET_SCHEDULER_cancel (fc->poll_task); + peer_remove_path (pi, myid, pi->id); - peer_remove_path (pi, pi->id, myid); + GNUNET_CONTAINER_multihashmap_iterate (pi->connections, + connection_broken, + pi); + GNUNET_CONTAINER_multihashmap_destroy (pi->connections); + pi->connections = NULL; if (myid == pi->id) { DEBUG_CONN (" (self)\n"); } GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO); - GNUNET_free (fc); return; } -- 2.25.1