X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmesh%2Fgnunet-service-mesh_connection.c;h=e966e7192f50417bc92583c84c84e3246c198bb8;hb=3546e3b90ad958eb9fd3d372c7bd53400853b698;hp=7c3cb08bb31ee913cd80f76c1bc337e700d16105;hpb=50f061e6ca2df0c9473b87f79beb5225b1400f03;p=oweals%2Fgnunet.git diff --git a/src/mesh/gnunet-service-mesh_connection.c b/src/mesh/gnunet-service-mesh_connection.c index 7c3cb08bb..e966e7192 100644 --- a/src/mesh/gnunet-service-mesh_connection.c +++ b/src/mesh/gnunet-service-mesh_connection.c @@ -29,78 +29,27 @@ #include "gnunet_statistics_service.h" +#include "mesh_path.h" +#include "mesh_protocol_enc.h" +#include "mesh_enc.h" #include "gnunet-service-mesh_connection.h" #include "gnunet-service-mesh_peer.h" #include "gnunet-service-mesh_tunnel.h" -#include "mesh_protocol_enc.h" -#include "mesh_path.h" +#include "gnunet-service-mesh_channel.h" + +#define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__) #define MESH_MAX_POLL_TIME GNUNET_TIME_relative_multiply (\ GNUNET_TIME_UNIT_MINUTES,\ 10) -#define MESH_RETRANSMIT_TIME GNUNET_TIME_UNIT_SECONDS -#define MESH_RETRANSMIT_MARGIN 4 - -#define LOG(level, ...) GNUNET_log_from (level,"mesh-con",__VA_ARGS__) +#define AVG_MSGS 32 /******************************************************************************/ /******************************** STRUCTS **********************************/ /******************************************************************************/ -/** - * Struct containing info about a queued transmission to this peer - */ -struct MeshPeerQueue -{ - /** - * DLL next - */ - struct MeshPeerQueue *next; - - /** - * DLL previous - */ - struct MeshPeerQueue *prev; - - /** - * Peer this transmission is directed to. - */ - struct MeshPeer *peer; - - /** - * Connection this message belongs to. - */ - struct MeshConnection *c; - - /** - * Is FWD in c? - */ - int fwd; - - /** - * Channel this message belongs to, if known. - */ - struct MeshChannel *ch; - - /** - * Pointer to info stucture used as cls. - */ - void *cls; - - /** - * Type of message - */ - uint16_t type; - - /** - * Size of the message - */ - size_t size; -}; - - /** * Struct to encapsulate all the Flow Control information to a peer to which * we are directly connected (on a core level). @@ -158,6 +107,32 @@ struct MeshFlowControl struct GNUNET_TIME_Relative poll_time; }; +/** + * Keep a record of the last messages sent on this connection. + */ +struct MeshConnectionPerformance +{ + /** + * Circular buffer for storing measurements. + */ + double usecsperbyte[AVG_MSGS]; + + /** + * Running average of @c usecsperbyte. + */ + double avg; + + /** + * How many values of @c usecsperbyte are valid. + */ + uint16_t size; + + /** + * Index of the next "free" position in @c usecsperbyte. + */ + uint16_t idx; +}; + /** * Struct containing all information regarding a connection to a peer. @@ -179,6 +154,11 @@ struct MeshConnection */ struct MeshFlowControl bck_fc; + /** + * Measure connection performance on the endpoint. + */ + struct MeshConnectionPerformance *perf; + /** * ID of the connection. */ @@ -231,6 +211,16 @@ struct MeshConnection */ extern struct GNUNET_STATISTICS_Handle *stats; +/** + * Local peer own ID (memory efficient handle). + */ +extern GNUNET_PEER_Id myid; + +/** + * Local peer own ID (full value). + */ +extern struct GNUNET_PeerIdentity my_full_id; + /** * Connections known, indexed by cid (MeshConnection). */ @@ -347,6 +337,158 @@ connection_get (const struct GNUNET_HashCode *cid) } +static void +connection_change_state (struct MeshConnection* c, + enum MeshConnectionState state) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Connection %s state was %s\n", + GNUNET_h2s (&c->id), GMC_state2s (c->state)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Connection %s state is now %s\n", + GNUNET_h2s (&c->id), GMC_state2s (state)); + c->state = state; +} + + +/** + * Callback called when a queued message is sent. + * + * Calculates the average time and connection packet tracking. + * + * @param cls Closure. + * @param c Connection this message was on. + * @param type Type of message sent. + * @param fwd Was this a FWD going message? + * @param size Size of the message. + * @param wait Time spent waiting for core (only the time for THIS message) + */ +static void +message_sent (void *cls, + struct MeshConnection *c, uint16_t type, + int fwd, size_t size, + struct GNUNET_TIME_Relative wait) +{ + struct MeshConnectionPerformance *p; + struct MeshFlowControl *fc; + double usecsperbyte; + + fc = fwd ? &c->fwd_fc : &c->bck_fc; + LOG (GNUNET_ERROR_TYPE_DEBUG, "! Q_N- %p %u\n", fc, fc->queue_n); + fc->queue_n--; + c->pending_messages--; + if (GNUNET_YES == c->destroy && 0 == c->pending_messages) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "! destroying connection!\n"); + GMC_destroy (c); + } + /* Send ACK if needed, after accounting for sent ID in fc->queue_n */ + switch (type) + { + case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED: + fc->last_pid_sent++; + LOG (GNUNET_ERROR_TYPE_DEBUG, "! accounting pid %u\n", fc->last_pid_sent); +// send_ack (c, ch, fwd); + break; + default: + break; + } + + if (NULL == c->perf) + return; /* Only endpoints are interested in timing. */ + + LOG (GNUNET_ERROR_TYPE_DEBUG, "! message sent!\n"); + p = c->perf; + usecsperbyte = ((double) wait.rel_value_us) / size; + if (p->size == AVG_MSGS) + { + /* Array is full. Substract oldest value, add new one and store. */ + p->avg -= (p->usecsperbyte[p->idx] / AVG_MSGS); + p->usecsperbyte[p->idx] = usecsperbyte; + p->avg += (p->usecsperbyte[p->idx] / AVG_MSGS); + } + else + { + /* Array not yet full. Add current value to avg and store. */ + p->usecsperbyte[p->idx] = usecsperbyte; + p->avg *= p->size; + p->avg += p->usecsperbyte[p->idx]; + p->size++; + p->avg /= p->size; + } + p->idx = (p->idx + 1) % AVG_MSGS; + +// if (NULL != c->t) +// { +// c->t->pending_messages--; +// if (GNUNET_YES == c->t->destroy && 0 == t->pending_messages) +// { +// LOG (GNUNET_ERROR_TYPE_DEBUG, "* destroying tunnel!\n"); +// GMT_destroy (c->t); +// } +// } +} + + +/** + * Get the previous hop in a connection + * + * @param c Connection. + * + * @return Previous peer in the connection. + */ +static struct MeshPeer * +get_prev_hop (const struct MeshConnection *c) +{ + GNUNET_PEER_Id id; + + if (0 == c->own_pos || c->path->length < 2) + id = c->path->peers[0]; + else + id = c->path->peers[c->own_pos - 1]; + + return GMP_get_short (id); +} + + +/** + * Get the next hop in a connection + * + * @param c Connection. + * + * @return Next peer in the connection. + */ +static struct MeshPeer * +get_next_hop (const struct MeshConnection *c) +{ + GNUNET_PEER_Id id; + + if ((c->path->length - 1) == c->own_pos || c->path->length < 2) + id = c->path->peers[c->path->length - 1]; + else + id = c->path->peers[c->own_pos + 1]; + + return GMP_get_short (id); +} + + +/** + * Get the hop in a connection. + * + * @param c Connection. + * @param fwd Next hop? + * + * @return Next peer in the connection. + */ +static struct MeshPeer * +get_hop (struct MeshConnection *c, int fwd) +{ + if (fwd) + return get_next_hop (c); + return get_prev_hop (c); +} + + /** * Send an ACK informing the predecessor about the available buffer space. * @@ -374,23 +516,23 @@ connection_send_ack (struct MeshConnection *c, unsigned int buffer, int fwd) fwd ? "FWD" : "BCK", GNUNET_h2s (&c->id)); /* Check if we need to transmit the ACK */ - if (prev_fc->last_ack_sent - prev_fc->last_pid_recv > 3) + delta = prev_fc->last_ack_sent - prev_fc->last_pid_recv; + if (3 < delta && buffer < delta) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending ACK, buffer > 3\n"); LOG (GNUNET_ERROR_TYPE_DEBUG, - " last pid recv: %u, last ack sent: %u\n", - prev_fc->last_pid_recv, prev_fc->last_ack_sent); + " last pid recv: %u, last ack sent: %u\n", + prev_fc->last_pid_recv, prev_fc->last_ack_sent); return; } /* Ok, ACK might be necessary, what PID to ACK? */ - delta = next_fc->queue_max - next_fc->queue_n; - ack = prev_fc->last_pid_recv + delta; + ack = prev_fc->last_pid_recv + buffer; LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK %u\n", ack); LOG (GNUNET_ERROR_TYPE_DEBUG, - " last pid %u, last ack %u, qmax %u, q %u\n", - prev_fc->last_pid_recv, prev_fc->last_ack_sent, - next_fc->queue_max, next_fc->queue_n); + " last pid %u, last ack %u, qmax %u, q %u\n", + prev_fc->last_pid_recv, prev_fc->last_ack_sent, + next_fc->queue_max, next_fc->queue_n); if (ack == prev_fc->last_ack_sent) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Not sending FWD ACK, not needed\n"); @@ -405,13 +547,13 @@ connection_send_ack (struct MeshConnection *c, unsigned int buffer, int fwd) msg.ack = htonl (ack); msg.cid = c->id; - send_prebuilt_message_connection (&msg.header, c, NULL, !fwd); + GMC_send_prebuilt_message (&msg.header, c, NULL, !fwd); } /** * Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE - * directed to us. + * or a first CONNECTION_ACK directed to us. * * @param connection Connection to confirm. * @param fwd Is this a fwd ACK? (First is bck (SYNACK), second is fwd (ACK)) @@ -423,63 +565,18 @@ send_connection_ack (struct MeshConnection *connection, int fwd) t = connection->t; LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection ack\n"); - queue_add (NULL, - GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK, - sizeof (struct GNUNET_MESH_ConnectionACK), - connection, - NULL, - fwd); - if (MESH_TUNNEL_NEW == t->state) - tunnel_change_state (t, MESH_TUNNEL_WAITING); + GMP_queue_add (get_hop (connection, fwd), NULL, + GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK, + sizeof (struct GNUNET_MESH_ConnectionACK), + connection, NULL, fwd, + &message_sent, NULL); + if (MESH_TUNNEL3_NEW == GMT_get_state (t)) + GMT_change_state (t, MESH_TUNNEL3_WAITING); if (MESH_CONNECTION_READY != connection->state) connection_change_state (connection, MESH_CONNECTION_SENT); } -/** - * Sends a CREATE CONNECTION message for a path to a peer. - * Changes the connection and tunnel states if necessary. - * - * @param connection Connection to create. - */ -static void -send_connection_create (struct MeshConnection *connection) -{ - struct MeshTunnel3 *t; - - t = connection->t; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n"); - queue_add (NULL, - GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE, - sizeof (struct GNUNET_MESH_ConnectionCreate) + - (connection->path->length * - sizeof (struct GNUNET_PeerIdentity)), - connection, - NULL, - GNUNET_YES); - if (NULL != t && - (MESH_TUNNEL_SEARCHING == t->state || MESH_TUNNEL_NEW == t->state)) - tunnel_change_state (t, MESH_TUNNEL_WAITING); - if (MESH_CONNECTION_NEW == connection->state) - connection_change_state (connection, MESH_CONNECTION_SENT); -} - - -static void -connection_change_state (struct MeshConnection* c, - enum MeshConnectionState state) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connection %s state was %s\n", - GNUNET_h2s (&c->id), GMC_state2s (c->state)); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connection %s state is now %s\n", - GNUNET_h2s (&c->id), GMC_state2s (state)); - c->state = state; -} - - - /** * Send keepalive packets for a connection. * @@ -498,17 +595,15 @@ connection_keepalive (struct MeshConnection *c, int fwd) GNUNET_MESSAGE_TYPE_MESH_BCK_KEEPALIVE; LOG (GNUNET_ERROR_TYPE_DEBUG, - "sending %s keepalive for connection %s[%d]\n", - fwd ? "FWD" : "BCK", - peer2s (c->t->peer), - c->id); + "sending %s keepalive for connection %s[%d]\n", + fwd ? "FWD" : "BCK", GMT_2s (c->t), c->id); msg = (struct GNUNET_MESH_ConnectionKeepAlive *) cbuf; msg->header.size = htons (size); msg->header.type = htons (type); msg->cid = c->id; - send_prebuilt_message_connection (&msg->header, c, NULL, fwd); + GMC_send_prebuilt_message (&msg->header, c, NULL, fwd); } @@ -523,7 +618,7 @@ connection_recreate (struct MeshConnection *c, int fwd) { LOG (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n"); if (fwd) - send_connection_create (c); + GMC_send_create (c); else send_connection_ack (c, GNUNET_NO); } @@ -540,7 +635,7 @@ connection_recreate (struct MeshConnection *c, int fwd) static void connection_maintain (struct MeshConnection *c, int fwd) { - if (MESH_TUNNEL_SEARCHING == c->t->state) + if (MESH_TUNNEL3_SEARCHING == GMT_get_state (c->t)) { /* TODO DHT GET with RO_BART */ return; @@ -593,35 +688,6 @@ connection_bck_keepalive (void *cls, const struct GNUNET_SCHEDULER_TaskContext * } - -/** - * Get the first transmittable message for a connection. - * - * @param c Connection. - * @param fwd Is this FWD? - * - * @return First transmittable message. - */ -static struct MeshPeerQueue * -connection_get_first_message (struct MeshConnection *c, int fwd) -{ - struct MeshPeerQueue *q; - struct MeshPeer *p; - - p = connection_get_hop (c, fwd); - - for (q = p->queue_head; NULL != q; q = q->next) - { - if (q->c != c) - continue; - if (queue_is_sendable (q)) - return q; - } - - return NULL; -} - - /** * @brief Re-initiate traffic on this connection if necessary. * @@ -636,8 +702,6 @@ static void connection_unlock_queue (struct MeshConnection *c, int fwd) { struct MeshPeer *peer; - struct MeshPeerQueue *q; - size_t size; LOG (GNUNET_ERROR_TYPE_DEBUG, "connection_unlock_queue %s on %s\n", @@ -649,31 +713,8 @@ connection_unlock_queue (struct MeshConnection *c, int fwd) return; } - peer = connection_get_hop (c, fwd); - - if (NULL != peer->core_transmit) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " already unlocked!\n"); - return; /* Already unlocked */ - } - - q = connection_get_first_message (c, fwd); - if (NULL == q) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " queue empty!\n"); - return; /* Nothing to transmit */ - } - - size = q->size; - peer->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, - GNUNET_NO, - 0, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_PEER_resolve2 (peer->id), - size, - &queue_send, - peer); + peer = get_hop (c, fwd); + GMP_queue_unlock (peer, c); } @@ -686,8 +727,7 @@ connection_unlock_queue (struct MeshConnection *c, int fwd) static void connection_cancel_queues (struct MeshConnection *c, int fwd) { - struct MeshPeerQueue *q; - struct MeshPeerQueue *next; + struct MeshFlowControl *fc; struct MeshPeer *peer; @@ -696,32 +736,15 @@ connection_cancel_queues (struct MeshConnection *c, int fwd) GNUNET_break (0); return; } - fc = fwd ? &c->fwd_fc : &c->bck_fc; - peer = connection_get_hop (c, fwd); - for (q = peer->queue_head; NULL != q; q = next) - { - next = q->next; - if (q->c == c) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "connection_cancel_queue %s\n", - GNUNET_MESH_DEBUG_M2S (q->type)); - queue_destroy (q, GNUNET_YES); - } - } - if (NULL == peer->queue_head) + peer = get_hop (c, fwd); + GMP_queue_cancel (peer, c); + + fc = fwd ? &c->fwd_fc : &c->bck_fc; + if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task) { - if (NULL != peer->core_transmit) - { - GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit); - peer->core_transmit = NULL; - } - if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task) - { - GNUNET_SCHEDULER_cancel (fc->poll_task); - fc->poll_task = GNUNET_SCHEDULER_NO_TASK; - } + GNUNET_SCHEDULER_cancel (fc->poll_task); + fc->poll_task = GNUNET_SCHEDULER_NO_TASK; } } @@ -741,89 +764,26 @@ connection_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct MeshConnection *c; fc->poll_task = GNUNET_SCHEDULER_NO_TASK; - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - { - return; - } - - c = fc->c; - LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%X]\n", - GNUNET_h2s (&c->id)); - LOG (GNUNET_ERROR_TYPE_DEBUG, " *** %s\n", - fc == &c->fwd_fc ? "FWD" : "BCK"); - - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL); - msg.header.size = htons (sizeof (msg)); - LOG (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", fc->last_pid_sent); - send_prebuilt_message_connection (&msg.header, c, NULL, fc == &c->fwd_fc); - fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time); - fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time, - &connection_poll, fc); -} - - - - -/** - * Get the previous hop in a connection - * - * @param c Connection. - * - * @return Previous peer in the connection. - */ -static struct MeshPeer * -connection_get_prev_hop (struct MeshConnection *c) -{ - GNUNET_PEER_Id id; - - if (0 == c->own_pos || c->path->length < 2) - id = c->path->peers[0]; - else - id = c->path->peers[c->own_pos - 1]; - - return peer_get_short (id); -} - - -/** - * Get the next hop in a connection - * - * @param c Connection. - * - * @return Next peer in the connection. - */ -static struct MeshPeer * -connection_get_next_hop (struct MeshConnection *c) -{ - GNUNET_PEER_Id id; - - if ((c->path->length - 1) == c->own_pos || c->path->length < 2) - id = c->path->peers[c->path->length - 1]; - else - id = c->path->peers[c->own_pos + 1]; - - return peer_get_short (id); -} - - -/** - * Get the hop in a connection. - * - * @param c Connection. - * @param fwd Next hop? - * - * @return Next peer in the connection. - */ -static struct MeshPeer * -connection_get_hop (struct MeshConnection *c, int fwd) -{ - if (fwd) - return connection_get_next_hop (c); - return connection_get_prev_hop (c); -} + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + { + return; + } + c = fc->c; + LOG (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, " *** connection [%X]\n", + GNUNET_h2s (&c->id)); + LOG (GNUNET_ERROR_TYPE_DEBUG, " *** %s\n", + fc == &c->fwd_fc ? "FWD" : "BCK"); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL); + msg.header.size = htons (sizeof (msg)); + LOG (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", fc->last_pid_sent); + GMC_send_prebuilt_message (&msg.header, c, NULL, fc == &c->fwd_fc); + fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time); + fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time, + &connection_poll, fc); +} /** @@ -844,7 +804,7 @@ connection_fwd_timeout (void *cls, return; LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s[%X] FWD timed out. Destroying.\n", - peer2s (c->t->peer), + GMT_2s (c->t), c->id); if (GMC_is_origin (c, GNUNET_YES)) /* If local, leave. */ @@ -873,8 +833,7 @@ connection_bck_timeout (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Connection %s[%X] FWD timed out. Destroying.\n", - peer2s (c->t->peer), - c->id); + GMT_2s (c->t), c->id); if (GMC_is_origin (c, GNUNET_NO)) /* If local, leave. */ return; @@ -925,47 +884,68 @@ connection_reset_timeout (struct MeshConnection *c, int fwd) /** - * + * Add the connection to the list of both neighbors. + * + * @param c Connection. */ static void register_neighbors (struct MeshConnection *c) { struct MeshPeer *peer; - peer = connection_get_next_hop (c); + peer = get_next_hop (c); if (GNUNET_NO == GMP_is_neighbor (peer)) { GMC_destroy (c); - return NULL; + return; } GMP_add_connection (peer, c); - peer = connection_get_prev_hop (c); + peer = get_prev_hop (c); if (GNUNET_NO == GMP_is_neighbor (peer)) { GMC_destroy (c); - return NULL; + return; } GMP_add_connection (peer, c); } /** - * + * Remove the connection from the list of both neighbors. + * + * @param c Connection. */ static void unregister_neighbors (struct MeshConnection *c) { struct MeshPeer *peer; - peer = connection_get_next_hop (c); + peer = get_next_hop (c); GMP_remove_connection (peer, c); - peer = connection_get_prev_hop (c); + peer = get_prev_hop (c); GMP_remove_connection (peer, c); } +/** + * Bind the connection to the peer and the tunnel to that peer. + * + * If the peer has no tunnel, create one. Update tunnel and connection + * data structres to reflect new status. + * + * @param c Connection. + * @param peer Peer. + */ +static void +add_to_peer (struct MeshConnection *c, struct MeshPeer *peer) +{ + GMP_add_tunnel (peer); + c->t = GMP_get_tunnel (peer); + GMT_add_connection (c->t, c); +} + /******************************************************************************/ /******************************** API ***********************************/ /******************************************************************************/ @@ -1033,12 +1013,6 @@ GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer, c = connection_get (cid); if (NULL == c) { - LOG (GNUNET_ERROR_TYPE_DEBUG, " Creating connection\n"); - c = connection_new (cid); - if (NULL == c) - return GNUNET_OK; - connection_reset_timeout (c, GNUNET_YES); - /* Create path */ LOG (GNUNET_ERROR_TYPE_DEBUG, " Creating path...\n"); path = path_new (size); @@ -1056,13 +1030,15 @@ GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer, /* create path: self not found in path through self */ GNUNET_break_op (0); path_destroy (path); - connection_destroy (c); return GNUNET_OK; } LOG (GNUNET_ERROR_TYPE_DEBUG, " Own position: %u\n", own_pos); - path_add_to_peers (path, GNUNET_NO); - c->path = path_duplicate (path); - c->own_pos = own_pos; + GMP_add_path_to_all (path, GNUNET_NO); + LOG (GNUNET_ERROR_TYPE_DEBUG, " Creating connection\n"); + c = GMC_new (cid, NULL, path_duplicate (path), own_pos); + if (NULL == c) + return GNUNET_OK; + connection_reset_timeout (c, GNUNET_YES); } else { @@ -1072,23 +1048,18 @@ GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer, connection_change_state (c, MESH_CONNECTION_SENT); /* Remember peers */ - dest_peer = peer_get (&id[size - 1]); - orig_peer = peer_get (&id[0]); + dest_peer = GMP_get (&id[size - 1]); + orig_peer = GMP_get (&id[0]); /* Is it a connection to us? */ if (c->own_pos == size - 1) { LOG (GNUNET_ERROR_TYPE_DEBUG, " It's for us!\n"); - peer_add_path_to_origin (orig_peer, path, GNUNET_YES); + GMP_add_path_to_origin (orig_peer, path, GNUNET_YES); - if (NULL == orig_peer->tunnel) - { - orig_peer->tunnel = tunnel_new (); - orig_peer->tunnel->peer = orig_peer; - } - tunnel_add_connection (orig_peer->tunnel, c); - if (MESH_TUNNEL_NEW == c->t->state) - tunnel_change_state (c->t, MESH_TUNNEL_WAITING); + add_to_peer (c, orig_peer); + if (MESH_TUNNEL3_NEW == GMT_get_state (c->t)) + GMT_change_state (c->t, MESH_TUNNEL3_WAITING); send_connection_ack (c, GNUNET_NO); if (MESH_CONNECTION_SENT == c->state) @@ -1101,9 +1072,9 @@ GMC_handle_create (void *cls, const struct GNUNET_PeerIdentity *peer, { /* It's for somebody else! Retransmit. */ LOG (GNUNET_ERROR_TYPE_DEBUG, " Retransmitting.\n"); - peer_add_path (dest_peer, path_duplicate (path), GNUNET_NO); - peer_add_path_to_origin (orig_peer, path, GNUNET_NO); - send_prebuilt_message_connection (message, c, NULL, GNUNET_YES); + GMP_add_path (dest_peer, path_duplicate (path), GNUNET_NO); + GMP_add_path_to_origin (orig_peer, path, GNUNET_NO); + GMC_send_prebuilt_message (message, c, NULL, GNUNET_YES); } return GNUNET_OK; } @@ -1146,15 +1117,15 @@ GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer, LOG (GNUNET_ERROR_TYPE_DEBUG, " via peer %s\n", GNUNET_i2s (peer)); - pi = peer_get (peer); - if (connection_get_next_hop (c) == pi) + pi = GMP_get (peer); + if (get_next_hop (c) == pi) { LOG (GNUNET_ERROR_TYPE_DEBUG, " SYNACK\n"); fwd = GNUNET_NO; if (MESH_CONNECTION_SENT == c->state) connection_change_state (c, MESH_CONNECTION_ACK); } - else if (connection_get_prev_hop (c) == pi) + else if (get_prev_hop (c) == pi) { LOG (GNUNET_ERROR_TYPE_DEBUG, " ACK\n"); fwd = GNUNET_YES; @@ -1171,7 +1142,7 @@ GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer, p = c->path; if (NULL != p) { - path_add_to_peers (p, GNUNET_YES); + GMP_add_path_to_all (p, GNUNET_YES); } else { @@ -1179,19 +1150,13 @@ GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer, } /* Message for us as creator? */ - if (connection_is_origin (c, GNUNET_YES)) + if (GMC_is_origin (c, GNUNET_YES)) { LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection (SYN)ACK for us!\n"); connection_change_state (c, MESH_CONNECTION_READY); - if (MESH_TUNNEL_READY != c->t->state) - tunnel_change_state (c->t, MESH_TUNNEL_READY); + GMT_change_state (c->t, MESH_TUNNEL3_READY); send_connection_ack (c, GNUNET_YES); - tunnel_send_queued_data (c->t, GNUNET_YES); - if (3 <= tunnel_count_connections (c->t) && NULL != c->t->peer->dhtget) - { - GNUNET_DHT_get_stop (c->t->peer->dhtget); - c->t->peer->dhtget = NULL; - } + GMT_send_queued_data (c->t, GNUNET_YES); return GNUNET_OK; } @@ -1199,38 +1164,66 @@ GMC_handle_confirm (void *cls, const struct GNUNET_PeerIdentity *peer, if (GMC_is_terminal (c, GNUNET_YES)) { LOG (GNUNET_ERROR_TYPE_DEBUG, " Connection ACK for us!\n"); - if (MESH_TUNNEL_READY != c->t->state) - tunnel_change_state (c->t, MESH_TUNNEL_READY); connection_change_state (c, MESH_CONNECTION_READY); - tunnel_send_queued_data (c->t, GNUNET_NO); + GMT_change_state (c->t, MESH_TUNNEL3_READY); + GMT_send_queued_data (c->t, GNUNET_NO); return GNUNET_OK; } LOG (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n"); - send_prebuilt_message_connection (message, c, NULL, fwd); + GMC_send_prebuilt_message (message, c, NULL, fwd); return GNUNET_OK; } +/** + * Is traffic coming from this sender 'FWD' traffic? + * + * @param c Connection to check. + * @param sender Peer identity of neighbor. + * + * @return GNUNET_YES in case the sender is the 'prev' hop and therefore + * the traffic is 'FWD'. GNUNET_NO for BCK. GNUNET_SYSERR for errors. + */ +int +is_fwd (const struct MeshConnection *c, + const struct GNUNET_PeerIdentity *sender) +{ + GNUNET_PEER_Id id; + + id = GNUNET_PEER_search (sender); + if (GMP_get_short_id (get_prev_hop (c)) == id) + return GNUNET_YES; + + if (GMP_get_short_id (get_next_hop (c)) == id) + return GNUNET_NO; + + GNUNET_break (0); + return GNUNET_SYSERR; +} + + /** * Core handler for notifications of broken paths * * @param cls Closure (unused). - * @param peer Peer identity of sending neighbor. + * @param id Peer identity of sending neighbor. * @param message Message. * * @return GNUNET_OK to keep the connection open, * GNUNET_SYSERR to close it (signal serious error) */ int -GMC_handle_broken (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) +GMC_handle_broken (void* cls, + const struct GNUNET_PeerIdentity* id, + const struct GNUNET_MessageHeader* message) { struct GNUNET_MESH_ConnectionBroken *msg; struct MeshConnection *c; + int fwd; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received a CONNECTION BROKEN msg from %s\n", GNUNET_i2s (peer)); + "Received a CONNECTION BROKEN msg from %s\n", GNUNET_i2s (id)); msg = (struct GNUNET_MESH_ConnectionBroken *) message; LOG (GNUNET_ERROR_TYPE_DEBUG, " regarding %s\n", GNUNET_i2s (&msg->peer1)); @@ -1242,8 +1235,22 @@ GMC_handle_broken (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_break_op (0); return GNUNET_OK; } - tunnel_notify_connection_broken (c->t, GNUNET_PEER_search (&msg->peer1), - GNUNET_PEER_search (&msg->peer2)); + + fwd = is_fwd (c, id); + connection_cancel_queues (c, !fwd); + if (GMC_is_terminal (c, fwd)) + { + if (0 < c->pending_messages) + c->destroy = GNUNET_YES; + else + GMC_destroy (c); + } + else + { + GMC_send_prebuilt_message (message, c, NULL, fwd); + c->destroy = GNUNET_YES; + } + return GNUNET_OK; } @@ -1265,7 +1272,6 @@ GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer, { struct GNUNET_MESH_ConnectionDestroy *msg; struct MeshConnection *c; - GNUNET_PEER_Id id; int fwd; msg = (struct GNUNET_MESH_ConnectionDestroy *) message; @@ -1286,17 +1292,13 @@ GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer, 1, GNUNET_NO); return GNUNET_OK; } - id = GNUNET_PEER_search (peer); - if (id == connection_get_prev_hop (c)->id) - fwd = GNUNET_YES; - else if (id == connection_get_next_hop (c)->id) - fwd = GNUNET_NO; - else + fwd = is_fwd (c, peer); + if (GNUNET_SYSERR == fwd) { GNUNET_break_op (0); return GNUNET_OK; } - send_prebuilt_message_connection (message, c, NULL, fwd); + GMC_send_prebuilt_message (message, c, NULL, fwd); c->destroy = GNUNET_YES; return GNUNET_OK; @@ -1307,24 +1309,23 @@ GMC_handle_destroy (void *cls, const struct GNUNET_PeerIdentity *peer, * * @param peer Peer identity this notification is about. * @param message Encrypted message. - * @param fwd Is this FWD traffic? GNUNET_YES : GNUNET_NO; * * @return GNUNET_OK to keep the connection open, * GNUNET_SYSERR to close it (signal serious error) */ static int handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MESH_Encrypted *msg, - int fwd) + const struct GNUNET_MESH_Encrypted *msg) { struct MeshConnection *c; - struct MeshTunnel3 *t; struct MeshPeer *neighbor; struct MeshFlowControl *fc; + GNUNET_PEER_Id peer_id; uint32_t pid; uint32_t ttl; uint16_t type; size_t size; + int fwd; /* Check size */ size = ntohs (msg->header.size); @@ -1348,16 +1349,28 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer, LOG (GNUNET_ERROR_TYPE_DEBUG, "WARNING connection unknown\n"); return GNUNET_OK; } - t = c->t; - fc = fwd ? &c->bck_fc : &c->fwd_fc; /* Check if origin is as expected */ - neighbor = connection_get_hop (c, !fwd); - if (peer_get (peer)->id != neighbor->id) + neighbor = get_prev_hop (c); + peer_id = GNUNET_PEER_search (peer); + if (peer_id == GMP_get_short_id (neighbor)) { - GNUNET_break_op (0); - return GNUNET_OK; + fwd = GNUNET_YES; + } + else + { + neighbor = get_next_hop (c); + if (peer_id == GMP_get_short_id (neighbor)) + { + fwd = GNUNET_NO; + } + else + { + GNUNET_break_op (0); + return GNUNET_OK; + } } + fc = fwd ? &c->bck_fc : &c->fwd_fc; /* Check PID */ pid = ntohl (msg->pid); @@ -1385,25 +1398,18 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer, /* Is this message for us? */ if (GMC_is_terminal (c, fwd)) { - size_t dsize = size - sizeof (struct GNUNET_MESH_Encrypted); - char cbuf[dsize]; - struct GNUNET_MessageHeader *msgh; - unsigned int off; - /* TODO signature verification */ LOG (GNUNET_ERROR_TYPE_DEBUG, " message for us!\n"); GNUNET_STATISTICS_update (stats, "# messages received", 1, GNUNET_NO); - fc->last_pid_recv = pid; - tunnel_decrypt (t, cbuf, &msg[1], dsize, msg->iv, fwd); - off = 0; - while (off < dsize) + if (NULL == c->t) { - msgh = (struct GNUNET_MessageHeader *) &cbuf[off]; - handle_decrypted (t, msgh, fwd); - off += ntohs (msgh->size); + GNUNET_break (0); + return GNUNET_OK; } - send_ack (c, NULL, fwd); + fc->last_pid_recv = pid; + GMT_handle_encrypted (c->t, msg, fwd); + GMC_send_ack (c, NULL, fwd); return GNUNET_OK; } @@ -1415,38 +1421,19 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer, { GNUNET_STATISTICS_update (stats, "# TTL drops", 1, GNUNET_NO); LOG (GNUNET_ERROR_TYPE_WARNING, " TTL is 0, DROPPING!\n"); - send_ack (c, NULL, fwd); + GMC_send_ack (c, NULL, fwd); return GNUNET_OK; } GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO); - send_prebuilt_message_connection (&msg->header, c, NULL, fwd); + GMC_send_prebuilt_message (&msg->header, c, NULL, fwd); return GNUNET_OK; } /** - * Core handler for mesh network traffic going orig->dest. - * - * @param cls Closure (unused). - * @param message Message received. - * @param peer Peer who sent the message. - * - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) - */ -int -GMC_handle_fwd (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) -{ - return handle_mesh_encrypted (peer, - (struct GNUNET_MESH_Encrypted *)message, - GNUNET_YES); -} - -/** - * Core handler for mesh network traffic going dest->orig. + * Core handler for encrypted mesh network traffic (channel mgmt, data). * * @param cls Closure (unused). * @param message Message received. @@ -1456,12 +1443,11 @@ GMC_handle_fwd (void *cls, const struct GNUNET_PeerIdentity *peer, * GNUNET_SYSERR to close it (signal serious error) */ int -GMC_handle_bck (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) +GMC_handle_encrypted (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message) { return handle_mesh_encrypted (peer, - (struct GNUNET_MESH_Encrypted *)message, - GNUNET_NO); + (struct GNUNET_MESH_Encrypted *)message); } @@ -1502,13 +1488,13 @@ GMC_handle_ack (void *cls, const struct GNUNET_PeerIdentity *peer, /* Is this a forward or backward ACK? */ id = GNUNET_PEER_search (peer); - if (connection_get_next_hop (c)->id == id) + if (GMP_get_short_id (get_next_hop (c)) == id) { LOG (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK\n"); fc = &c->fwd_fc; fwd = GNUNET_YES; } - else if (connection_get_prev_hop (c)->id == id) + else if (GMP_get_short_id (get_prev_hop (c)) == id) { LOG (GNUNET_ERROR_TYPE_DEBUG, " BCK ACK\n"); fc = &c->bck_fc; @@ -1585,12 +1571,12 @@ GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer, * this way of discerining FWD/BCK should not be a problem. */ id = GNUNET_PEER_search (peer); - if (connection_get_next_hop (c)->id == id) + if (GMP_get_short_id (get_next_hop (c)) == id) { LOG (GNUNET_ERROR_TYPE_DEBUG, " FWD ACK\n"); fc = &c->fwd_fc; } - else if (connection_get_prev_hop (c)->id == id) + else if (GMP_get_short_id (get_prev_hop (c)) == id) { LOG (GNUNET_ERROR_TYPE_DEBUG, " BCK ACK\n"); fc = &c->bck_fc; @@ -1606,7 +1592,7 @@ GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer, pid, fc->last_pid_recv); fc->last_pid_recv = pid; fwd = fc == &c->fwd_fc; - send_ack (c, NULL, fwd); + GMC_send_ack (c, NULL, fwd); return GNUNET_OK; } @@ -1625,7 +1611,7 @@ GMC_handle_poll (void *cls, const struct GNUNET_PeerIdentity *peer, */ int GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) + const struct GNUNET_MessageHeader *message) { struct GNUNET_MESH_ConnectionKeepAlive *msg; struct MeshConnection *c; @@ -1648,8 +1634,8 @@ GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_YES : GNUNET_NO; /* Check if origin is as expected */ - neighbor = connection_get_hop (c, fwd); - if (peer_get (peer)->id != neighbor->id) + neighbor = get_hop (c, fwd); + if (GNUNET_PEER_search (peer) != GMP_get_short_id (neighbor)) { GNUNET_break_op (0); return GNUNET_OK; @@ -1676,18 +1662,22 @@ GMC_handle_keepalive (void *cls, const struct GNUNET_PeerIdentity *peer, * @param ch Channel, if any. * @param fwd Is this a fwd ACK? (will go dest->root) */ -static void -send_ack (struct MeshConnection *c, struct MeshChannel *ch, int fwd) +void +GMC_send_ack (struct MeshConnection *c, struct MeshChannel *ch, int fwd) { unsigned int buffer; LOG (GNUNET_ERROR_TYPE_DEBUG, "send ack %s on %p %p\n", fwd ? "FWD" : "BCK", c, ch); + + /* Get available bufffer space */ if (NULL == c || GMC_is_terminal (c, fwd)) { + struct MeshTunnel3 *t; LOG (GNUNET_ERROR_TYPE_DEBUG, " getting from all connections\n"); - buffer = GMT_get_buffer (NULL == c ? ch->t : c->t, fwd); + t = (NULL == c) ? GMCH_get_tunnel (ch) : GMC_get_tunnel (c); + buffer = GMT_get_buffer (t, fwd); } else { @@ -1696,6 +1686,7 @@ send_ack (struct MeshConnection *c, struct MeshChannel *ch, int fwd) } LOG (GNUNET_ERROR_TYPE_DEBUG, " buffer available: %u\n", buffer); + /* Send available buffer space */ if ( (NULL != ch && GMCH_is_origin (ch, fwd)) || (NULL != c && GMC_is_origin (c, fwd)) ) { @@ -1704,14 +1695,14 @@ send_ack (struct MeshConnection *c, struct MeshChannel *ch, int fwd) { GNUNET_assert (NULL != ch); LOG (GNUNET_ERROR_TYPE_DEBUG, " really sending!\n"); - send_local_ack (ch, fwd); + GMCH_send_data_ack (ch, fwd); } } else if (NULL == c) { LOG (GNUNET_ERROR_TYPE_DEBUG, " sending on all connections\n"); GNUNET_assert (NULL != ch); - channel_send_connections_ack (ch, buffer, fwd); + GMT_send_acks (GMCH_get_tunnel (ch), buffer, fwd); } else { @@ -1721,7 +1712,6 @@ send_ack (struct MeshConnection *c, struct MeshChannel *ch, int fwd) } - /** * Initialize the connections subsystem * @@ -1730,11 +1720,12 @@ send_ack (struct MeshConnection *c, struct MeshChannel *ch, int fwd) void GMC_init (const struct GNUNET_CONFIGURATION_Handle *c) { + LOG (GNUNET_ERROR_TYPE_DEBUG, "init\n"); if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_MSGS_QUEUE", &max_msgs_queue)) { - LOG_config_invalid (GNUNET_ERROR_TYPE_ERROR, + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, "MESH", "MAX_MSGS_QUEUE", "MISSING"); GNUNET_SCHEDULER_shutdown (); return; @@ -1744,7 +1735,7 @@ GMC_init (const struct GNUNET_CONFIGURATION_Handle *c) GNUNET_CONFIGURATION_get_value_number (c, "MESH", "MAX_CONNECTIONS", &max_connections)) { - LOG_config_invalid (GNUNET_ERROR_TYPE_ERROR, + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, "MESH", "MAX_CONNECTIONS", "MISSING"); GNUNET_SCHEDULER_shutdown (); return; @@ -1754,7 +1745,7 @@ GMC_init (const struct GNUNET_CONFIGURATION_Handle *c) GNUNET_CONFIGURATION_get_value_time (c, "MESH", "REFRESH_CONNECTION_TIME", &refresh_connection_time)) { - LOG_config_invalid (GNUNET_ERROR_TYPE_ERROR, + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, "MESH", "REFRESH_CONNECTION_TIME", "MISSING"); GNUNET_SCHEDULER_shutdown (); return; @@ -1768,6 +1759,7 @@ GMC_init (const struct GNUNET_CONFIGURATION_Handle *c) void GMC_shutdown (void) { + GNUNET_CONTAINER_multihashmap_destroy (connections); } @@ -1812,8 +1804,6 @@ GMC_new (const struct GNUNET_HashCode *cid, void GMC_destroy (struct MeshConnection *c) { - struct MeshPeer *peer; - if (NULL == c) return; @@ -1835,7 +1825,8 @@ GMC_destroy (struct MeshConnection *c) /* Delete */ GNUNET_STATISTICS_update (stats, "# connections", -1, GNUNET_NO); - GMT_remove_connection (c->t, c); + if (NULL != c->t) + GMT_remove_connection (c->t, c); GNUNET_free (c); } @@ -1853,6 +1844,20 @@ GMC_get_id (const struct MeshConnection *c) } +/** + * Get the connection path. + * + * @param c Connection to get the path from. + * + * @return path used by the connection. + */ +const struct MeshPeerPath * +GMC_get_path (const struct MeshConnection *c) +{ + return c->path; +} + + /** * Get the connection state. * @@ -1866,6 +1871,19 @@ GMC_get_state (const struct MeshConnection *c) return c->state; } +/** + * Get the connection tunnel. + * + * @param c Connection to get the tunnel from. + * + * @return tunnel of the connection. + */ +struct MeshTunnel3 * +GMC_get_tunnel (const struct MeshConnection *c) +{ + return c->t; +} + /** * Get free buffer space in a connection. @@ -1885,6 +1903,27 @@ GMC_get_buffer (struct MeshConnection *c, int fwd) return (fc->queue_max - fc->queue_n); } +/** + * Get how many messages have we allowed to send to us from a direction.. + * + * @param c Connection. + * @param fwd Are we asking about traffic from FWD (BCK messages)? + * + * @return last_ack_sent - last_pid_recv + */ +unsigned int +GMC_get_allowed (struct MeshConnection *c, int fwd) +{ + struct MeshFlowControl *fc; + + fc = fwd ? &c->fwd_fc : &c->bck_fc; + if (GMC_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent)) + { + return 0; + } + return (fc->last_ack_sent - fc->last_pid_recv); +} + /** * Get messages queued in a connection. * @@ -1904,23 +1943,61 @@ GMC_get_qn (struct MeshConnection *c, int fwd) } +/** + * Allow the connection to advertise a buffer of the given size. + * + * The connection will send an @c fwd ACK message (so: in direction !fwd) + * allowing up to last_pid_recv + buffer. + * + * @param c Connection. + * @param buffer How many more messages the connection can accept. + * @param fwd Is this about FWD traffic? (The ack will go dest->root). + */ +void +GMC_allow (struct MeshConnection *c, unsigned int buffer, int fwd) +{ + connection_send_ack (c, buffer, fwd); +} + + +/** + * Send a notification that a connection is broken. + * + * @param c Connection that is broken. + * @param id1 Peer that has disconnected. + * @param id2 Peer that has disconnected. + * @param fwd Direction towards which to send it. + */ +static void +send_broken (struct MeshConnection *c, + const struct GNUNET_PeerIdentity *id1, + const struct GNUNET_PeerIdentity *id2, + int fwd) +{ + struct GNUNET_MESH_ConnectionBroken msg; + + msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken)); + msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN); + msg.cid = c->id; + msg.peer1 = *id1; + msg.peer2 = *id2; + GMC_send_prebuilt_message (&msg.header, c, NULL, fwd); +} + /** * Notify other peers on a connection of a broken link. Mark connections * to destroy after all traffic has been sent. * * @param c Connection on which there has been a disconnection. * @param peer Peer that disconnected. - * @param my_full_id My ID (to send to other peers). */ void GMC_notify_broken (struct MeshConnection *c, - struct MeshPeer *peer, - struct GNUNET_PeerIdentity *my_full_id) + struct MeshPeer *peer) { - struct GNUNET_MESH_ConnectionBroken msg; int fwd; - fwd = peer == connection_get_prev_hop (c); + fwd = peer == get_prev_hop (c); connection_cancel_queues (c, !fwd); if (GMC_is_terminal (c, fwd)) @@ -1930,12 +2007,11 @@ GMC_notify_broken (struct MeshConnection *c, return; } - msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN); - msg.cid = c->id; - msg.peer1 = *my_full_id; - msg.peer2 = *GMP_get_id (peer); - GMC_send_prebuilt_message (&msg.header, c, NULL, fwd); + send_broken (c, &my_full_id, GMP_get_id (peer), fwd); + + /* Connection will have at least one pending message + * (the one we just scheduled), so no point in checking whether to + * destroy immediately. */ c->destroy = GNUNET_YES; return; @@ -1977,6 +2053,25 @@ GMC_is_terminal (struct MeshConnection *c, int fwd) } +/** + * See if we are allowed to send by the next hop in the given direction. + * + * @param c Connection. + * @param fwd Is this about fwd traffic? + * + * @return GNUNET_YES in case it's OK. + */ +int +GMC_is_sendable (struct MeshConnection *c, int fwd) +{ + struct MeshFlowControl *fc; + + fc = fwd ? &c->fwd_fc : &c->bck_fc; + if (GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent)) + return GNUNET_YES; + return GNUNET_NO; +} + /** * Sends an already built message on a connection, properly registering * all used resources. @@ -1993,9 +2088,11 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, struct MeshChannel *ch, int fwd) { + struct MeshFlowControl *fc; void *data; size_t size; uint16_t type; + int droppable; size = ntohs (message->size); data = GNUNET_malloc (size); @@ -2004,6 +2101,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, LOG (GNUNET_ERROR_TYPE_DEBUG, "Send %s (%u) on connection %s\n", GNUNET_MESH_DEBUG_M2S (type), size, GNUNET_h2s (&c->id)); + droppable = GNUNET_YES; switch (type) { struct GNUNET_MESH_Encrypted *emsg; @@ -2013,8 +2111,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, struct GNUNET_MESH_ConnectionBroken *bmsg; uint32_t ttl; - case GNUNET_MESSAGE_TYPE_MESH_FWD: - case GNUNET_MESSAGE_TYPE_MESH_BCK: + case GNUNET_MESSAGE_TYPE_MESH_ENCRYPTED: emsg = (struct GNUNET_MESH_Encrypted *) data; ttl = ntohl (emsg->ttl); if (0 == ttl) @@ -2032,6 +2129,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, amsg = (struct GNUNET_MESH_ACK *) data; amsg->cid = c->id; LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", ntohl (amsg->ack)); + droppable = GNUNET_NO; break; case GNUNET_MESSAGE_TYPE_MESH_POLL: @@ -2039,6 +2137,7 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, pmsg->cid = c->id; pmsg->pid = htonl (fwd ? c->fwd_fc.last_pid_sent : c->bck_fc.last_pid_sent); LOG (GNUNET_ERROR_TYPE_DEBUG, " poll %u\n", ntohl (pmsg->pid)); + droppable = GNUNET_NO; break; case GNUNET_MESSAGE_TYPE_MESH_TUNNEL_DESTROY: @@ -2061,12 +2160,58 @@ GMC_send_prebuilt_message (const struct GNUNET_MessageHeader *message, GNUNET_break (0); } - GMP_queue_add (data, - type, - size, - c, - ch, - fwd); + fc = fwd ? &c->fwd_fc : &c->bck_fc; + if (fc->queue_n >= fc->queue_max && droppable) + { + GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)", + 1, GNUNET_NO); + GNUNET_break (0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "queue full: %u/%u\n", + fc->queue_n, fc->queue_max); + return; /* Drop this message */ + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, "last pid %u\n", fc->last_pid_sent); + LOG (GNUNET_ERROR_TYPE_DEBUG, " ack %u\n", fc->last_ack_recv); + LOG (GNUNET_ERROR_TYPE_DEBUG, " Q_N+ %p %u\n", fc, fc->queue_n); + if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv)) + { + GMC_start_poll (c, fwd); + } + fc->queue_n++; + c->pending_messages++; + + GMP_queue_add (get_hop (c, fwd), data, type, size, c, ch, fwd, + &message_sent, NULL); +} + + +/** + * Sends a CREATE CONNECTION message for a path to a peer. + * Changes the connection and tunnel states if necessary. + * + * @param connection Connection to create. + */ +void +GMC_send_create (struct MeshConnection *connection) +{ + enum MeshTunnel3State state; + size_t size; + + size = sizeof (struct GNUNET_MESH_ConnectionCreate); + size += connection->path->length * sizeof (struct GNUNET_PeerIdentity); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n"); + GMP_queue_add (get_next_hop (connection), NULL, + GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE, + size, connection, NULL, + GNUNET_YES, &message_sent, NULL); + state = GMT_get_state (connection->t); + if (MESH_TUNNEL3_SEARCHING == state || MESH_TUNNEL3_NEW == state) + GMT_change_state (connection->t, MESH_TUNNEL3_WAITING); + if (MESH_CONNECTION_NEW == connection->state) + connection_change_state (connection, MESH_CONNECTION_SENT); + connection->fwd_fc.queue_n++; } @@ -2099,4 +2244,52 @@ GMC_send_destroy (struct MeshConnection *c) if (GNUNET_NO == GMC_is_terminal (c, GNUNET_NO)) GMC_send_prebuilt_message (&msg.header, c, NULL, GNUNET_NO); c->destroy = GNUNET_YES; +} + + +/** + * @brief Start a polling timer for the connection. + * + * When a neighbor does not accept more traffic on the connection it could be + * caused by a simple congestion or by a lost ACK. Polling enables to check + * for the lastest ACK status for a connection. + * + * @param c Connection. + * @param fwd Should we poll in the FWD direction? + */ +void +GMC_start_poll (struct MeshConnection *c, int fwd) +{ + struct MeshFlowControl *fc; + + fc = fwd ? &c->fwd_fc : &c->bck_fc; + if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task) + { + return; + } + fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time, + &connection_poll, + fc); +} + + +/** + * @brief Stop polling a connection for ACKs. + * + * Once we have enough ACKs for future traffic, polls are no longer necessary. + * + * @param c Connection. + * @param fwd Should we stop the poll in the FWD direction? + */ +void +GMC_stop_poll (struct MeshConnection *c, int fwd) +{ + struct MeshFlowControl *fc; + + fc = fwd ? &c->fwd_fc : &c->bck_fc; + if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task) + { + GNUNET_SCHEDULER_cancel (fc->poll_task); + fc->poll_task = GNUNET_SCHEDULER_NO_TASK; + } } \ No newline at end of file