From 7ffd0a61175c047fdd525f7b4e9079441e7e95da Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Tue, 12 Jun 2012 12:58:35 +0000 Subject: [PATCH] Removed core queue requirements from mesh (backporting mesh_new) --- src/mesh/gnunet-service-mesh.c | 786 ++++++++++++++++++--------------- 1 file changed, 419 insertions(+), 367 deletions(-) diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index 0882dc44e..26984836e 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -116,6 +116,48 @@ struct MeshData }; +/** + * Struct containing info about a queued transmission to this peer + */ +struct MeshPeerQueue +{ + /** + * DLL next + */ + struct MeshPeerQueue *next; + + /** + * DLL previous + */ + struct MeshPeerQueue *prev; + + /** + * Peer this transmission is directed to. + */ + struct MeshPeerInfo *peer; + + /** + * Tunnel this message belongs to. + */ + struct MeshTunnel *tunnel; + + /** + * Pointer to info stucture used as cls. + */ + void *cls; + + /** + * Type of message + */ + uint16_t type; + + /** + * Size of the message + */ + size_t size; +}; + + /** * Struct containing all info possibly needed to build a package when called * back by core. @@ -131,9 +173,6 @@ struct MeshTransmissionDescriptor /** Ultimate destination of the packet */ GNUNET_PEER_Id destination; - /** Which handler was used to request the transmission */ - unsigned int handler_n; - /** Data descriptor */ struct MeshData* mesh_data; }; @@ -179,21 +218,6 @@ struct MeshPeerInfo */ struct MeshPathInfo *dhtgetcls; - /** - * Handles to stop queued transmissions for this peer - */ - struct GNUNET_CORE_TransmitHandle *core_transmit[CORE_QUEUE_SIZE]; - - /** - * Pointer to info stuctures used as cls for queued transmissions - */ - void *infos[CORE_QUEUE_SIZE]; - - /** - * Type of message being in each transmission - */ - uint16_t types[CORE_QUEUE_SIZE]; - /** * Array of tunnels this peer participates in * (most probably a small amount, therefore not a hashmap) @@ -206,46 +230,29 @@ struct MeshPeerInfo * Number of tunnels this peers participates in */ unsigned int ntunnels; -}; - - -/** - * Data scheduled to transmit (to local client or remote peer) - */ -struct MeshQueue -{ - /** - * Double linked list - */ - struct MeshQueue *next; - struct MeshQueue *prev; - /** - * Target of the data (NULL if target is client) - */ - struct MeshPeerInfo *peer; - - /** - * Client to send the data to (NULL if target is peer) - */ - struct MeshClient *client; + /** + * Transmission queue to core DLL head + */ + struct MeshPeerQueue *queue_head; - /** - * Size of the message to transmit - */ - unsigned int size; + /** + * Transmission queue to core DLL tail + */ + struct MeshPeerQueue *queue_tail; - /** - * How old is the data? - */ - struct GNUNET_TIME_Absolute timestamp; + /** + * How many messages are in the queue to this peer. + */ + unsigned int queue_n; - /** - * Data itself - */ - struct GNUNET_MessageHeader *data; + /** + * Handle to for queued transmissions + */ + struct GNUNET_CORE_TransmitHandle *core_transmit; }; + /** * Globally unique tunnel identification (owner + number) * DO NOT USE OVER THE NETWORK @@ -294,10 +301,30 @@ struct MeshTunnel MESH_TunnelNumber local_tid_dest; /** - * ID of the last multicast packet seen/sent. + * Global count ID of the last *multicast* packet seen/sent. */ uint32_t mid; + /** + * Local count ID of the last packet seen/sent. + */ + uint32_t pid; + + /** + * SKIP value for this tunnel. + */ + uint32_t skip; + + /** + * How many messages are in the queue. + */ + unsigned int queue_n; + + /** + * How many messages do we accept in the queue. + */ + unsigned int queue_max; + /** * Last time the tunnel was used */ @@ -344,12 +371,6 @@ struct MeshTunnel */ unsigned int nignore; - /** - * Messages ready to transmit - */ - struct MeshQueue *queue_head; - struct MeshQueue *queue_tail; - /** * Tunnel paths */ @@ -399,11 +420,6 @@ struct MeshPathInfo * Path itself */ struct MeshPeerPath *path; - - /** - * Position in peer's transmit queue - */ - unsigned int pos; }; @@ -519,15 +535,15 @@ static struct GNUNET_CONTAINER_MultiHashMap *incoming_tunnels; */ static struct GNUNET_CONTAINER_MultiHashMap *peers; -/** - * Handle to communicate with core +/* + * Handle to communicate with transport */ -static struct GNUNET_CORE_Handle *core_handle; +// static struct GNUNET_TRANSPORT_Handle *transport_handle; /** - * Handle to communicate with transport + * Handle to communicate with core */ -// static struct GNUNET_TRANSPORT_Handle *transport_handle; +static struct GNUNET_CORE_Handle *core_handle; /** * Handle to use DHT @@ -1128,37 +1144,6 @@ send_client_tunnel_disconnect (struct MeshTunnel *t, struct MeshClient *c) } -/** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -send_core_create_path (void *cls, size_t size, void *buf); - - -/** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. - * - * @param cls closure (data itself) - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * - * @return number of bytes written to buf - */ -static size_t -send_core_data_multicast (void *cls, size_t size, void *buf); - - /** * Decrements the reference counter and frees all resources if needed * @@ -1191,86 +1176,6 @@ data_descriptor_decrement_multicast (struct MeshData *mesh_data) } -/** - * Cancel a core transmission that was already requested and free all resources - * associated to the request. - * - * @param peer PeeInfo of the peer whose transmission is cancelled. - * @param i Position of the transmission to be cancelled. - */ -static void -peer_info_cancel_transmission (struct MeshPeerInfo *peer, unsigned int i) -{ - if (NULL != peer->core_transmit[i]) - { - struct MeshTransmissionDescriptor *dd; - struct MeshPathInfo *path_info; - -#if MESH_DEBUG - { - struct GNUNET_PeerIdentity id; - - GNUNET_PEER_resolve (peer->id, &id); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " Cancelling data transmission at %s [%u]\n", - GNUNET_i2s (&id), i); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " message type %u\n", - peer->types[i]); - } -#endif - /* TODO: notify that tranmission has failed */ - switch (peer->types[i]) - { - case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type payload\n"); - dd = peer->infos[i]; - data_descriptor_decrement_multicast (dd->mesh_data); - break; - case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type create path\n"); - path_info = peer->infos[i]; - path_destroy (path_info->path); - break; - default: - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type unknown!\n"); - } - GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit[i]); - peer->core_transmit[i] = NULL; - GNUNET_free (peer->infos[i]); - } -} - - -/** - * Get a unused CORE slot to transmit a message to a peer. If all the slots - * are used, cancel one and return it's position. - * - * @param peer PeerInfo of the neighbor we want to transmit to. - * - * @return The index of an available slot to transmit to the neighbor. - */ -static unsigned int -peer_info_transmit_slot (struct MeshPeerInfo *peer) -{ - unsigned int i; - - for (i = 0; peer->core_transmit[i]; i++) - { - if (i == (CORE_QUEUE_SIZE - 1)) - { - /* All positions are taken! Overwriting! */ - GNUNET_break (0); - peer_info_cancel_transmission (peer, 0); - return 0; - } - } - return i; -} - - /** * Retrieve the MeshPeerInfo stucture associated with the peer, create one * and insert it in the appropiate structures if the peer is not known yet. @@ -1349,7 +1254,20 @@ peer_info_delete_tunnel (void *cls, const GNUNET_HashCode * key, void *value) /** - * Core callback to write a + * Queue and pass message to core when possible. + * + * @param cls Closure (type dependant). + * @param type Type of the message. + * @param size Size of the message. + * @param dst Neighbor to send message to. + * @param t Tunnel this message belongs to. + */ +static void +queue_add (void *cls, uint16_t type, size_t size, + struct MeshPeerInfo *dst, struct MeshTunnel *t); + +/** + * Core callback to write a pre-constructed data packet to core buffer * * @param cls Closure (MeshTransmissionDescriptor with data in "data" member). * @param size Number of bytes available in buf. @@ -1371,16 +1289,9 @@ send_core_data_raw (void *cls, size_t size, void *buf) if (total_size > size) { - struct GNUNET_PeerIdentity id; - - GNUNET_PEER_resolve (info->peer->id, &id); - info->peer->core_transmit[info->handler_n] = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 100, - GNUNET_TIME_UNIT_FOREVER_REL, &id, - size, &send_core_data_raw, info); + GNUNET_break (0); return 0; } - info->peer->core_transmit[info->handler_n] = NULL; memcpy (buf, msg, total_size); GNUNET_free (info->mesh_data); GNUNET_free (info); @@ -1389,25 +1300,24 @@ send_core_data_raw (void *cls, size_t size, void *buf) /** - * Sends an already built message to a peer, properly registrating + * Sends an already built unicast message to a peer, properly registrating * all used resources. * - * @param message Message to send. Fucntion makes a copy of it. + * @param message Message to send. Function makes a copy of it. * @param peer Short ID of the neighbor whom to send the message. - * - * FIXME tunnel? + * @param t Tunnel on which this message is transmitted. */ static void send_message (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer) + const struct GNUNET_PeerIdentity *peer, + struct MeshTunnel *t) { struct MeshTransmissionDescriptor *info; struct MeshPeerInfo *neighbor; struct MeshPeerPath *p; - unsigned int i; size_t size; -// GNUNET_TRANSPORT_try_connect(); +// GNUNET_TRANSPORT_try_connect(); FIXME use? size = ntohs (message->size); info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); @@ -1431,16 +1341,12 @@ send_message (const struct GNUNET_MessageHeader *message, GNUNET_free (info); return; } - i = peer_info_transmit_slot (neighbor); - info->handler_n = i; info->peer = neighbor; - neighbor->types[i] = GNUNET_MESSAGE_TYPE_MESH_UNICAST; - neighbor->infos[i] = info; - neighbor->core_transmit[i] = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 100, - GNUNET_TIME_UNIT_FOREVER_REL, peer, - size, &send_core_data_raw, info); - + queue_add (info, + GNUNET_MESSAGE_TYPE_MESH_UNICAST, + size, + neighbor, + t); } @@ -1459,6 +1365,7 @@ send_create_path (struct MeshPeerInfo *peer, struct MeshPeerPath *p, struct GNUNET_PeerIdentity id; struct MeshPathInfo *path_info; struct MeshPeerInfo *neighbor; + unsigned int i; if (NULL == p) @@ -1488,19 +1395,12 @@ send_create_path (struct MeshPeerInfo *peer, struct MeshPeerPath *p, path_info->t = t; neighbor = peer_info_get (&id); path_info->peer = neighbor; - path_info->pos = peer_info_transmit_slot (neighbor); - neighbor->types[path_info->pos] = GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE; - neighbor->infos[path_info->pos] = path_info; - neighbor->core_transmit[path_info->pos] = - GNUNET_CORE_notify_transmit_ready (core_handle, /* handle */ - 0, /* cork */ - 0, /* priority */ - GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */ - &id, /* target */ - sizeof (struct GNUNET_MESH_ManipulatePath) + - (p->length * sizeof (struct GNUNET_PeerIdentity)), /*size */ - &send_core_create_path, /* callback */ - path_info); /* cls */ + queue_add (path_info, + GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE, + sizeof (struct GNUNET_MESH_ManipulatePath) + + (p->length * sizeof (struct GNUNET_PeerIdentity)), + neighbor, + t); } @@ -1539,7 +1439,7 @@ send_destroy_path (struct MeshTunnel *t, GNUNET_PEER_Id destination) { GNUNET_PEER_resolve (p->peers[i], &pi[i]); } - send_message (&msg->header, tree_get_first_hop (t->tree, destination)); + send_message (&msg->header, tree_get_first_hop (t->tree, destination), t); } path_destroy (p); } @@ -1651,7 +1551,6 @@ peer_info_destroy (struct MeshPeerInfo *pi) struct GNUNET_PeerIdentity id; struct MeshPeerPath *p; struct MeshPeerPath *nextp; - unsigned int i; GNUNET_PEER_resolve (pi->id, &id); GNUNET_PEER_change_rc (pi->id, -1); @@ -1668,10 +1567,6 @@ peer_info_destroy (struct MeshPeerInfo *pi) GNUNET_DHT_get_stop (pi->dhtget); GNUNET_free (pi->dhtgetcls); } - for (i = 0; i < CORE_QUEUE_SIZE; i++) - { - peer_info_cancel_transmission (pi, i); - } p = pi->path_head; while (NULL != p) { @@ -2304,7 +2199,7 @@ tunnel_notify_connection_broken (struct MeshTunnel *t, GNUNET_PEER_Id p1, msg.peer1 = my_full_id; GNUNET_PEER_resolve (pid, &msg.peer2); GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &neighbor); - send_message (&msg.header, &neighbor); + send_message (&msg.header, &neighbor, t); } } return pid; @@ -2323,7 +2218,6 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) struct MeshData *mdata = cls; struct MeshTransmissionDescriptor *info; struct GNUNET_PeerIdentity neighbor; - unsigned int i; info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); @@ -2335,15 +2229,11 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) GNUNET_i2s (&neighbor)); info->peer = peer_info_get (&neighbor); GNUNET_assert (NULL != info->peer); - i = peer_info_transmit_slot (info->peer); - info->handler_n = i; - info->peer->infos[i] = info; - info->peer->types[i] = GNUNET_MESSAGE_TYPE_MESH_MULTICAST; - info->peer->core_transmit[i] = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 0, - GNUNET_TIME_UNIT_FOREVER_REL, - &neighbor, info->mesh_data->data_len, - &send_core_data_multicast, info); + queue_add(info, + GNUNET_MESSAGE_TYPE_MESH_MULTICAST, + info->mesh_data->data_len, + info->peer, + mdata->t); } /** @@ -2444,8 +2334,6 @@ static int tunnel_destroy (struct MeshTunnel *t) { struct MeshClient *c; - struct MeshQueue *q; - struct MeshQueue *qn; GNUNET_HashCode hash; unsigned int i; int r; @@ -2514,16 +2402,7 @@ tunnel_destroy (struct MeshTunnel *t) t); GNUNET_CONTAINER_multihashmap_destroy (t->peers); } - q = t->queue_head; - while (NULL != q) - { - if (NULL != q->data) - GNUNET_free (q->data); - qn = q->next; - GNUNET_free (q); - q = qn; - /* TODO cancel core transmit ready in case it was active */ - } + tree_destroy (t->tree); if (NULL != t->dht_get_type) GNUNET_DHT_get_stop (t->dht_get_type); @@ -2536,6 +2415,60 @@ tunnel_destroy (struct MeshTunnel *t) } +/** + * Create a new tunnel + * + * @param owner Who is the owner of the tunnel (short ID). + * @param id Tunnel Number of the tunnel. + * + */ +static struct MeshTunnel * +tunnel_new (GNUNET_PEER_Id owner, + MESH_TunnelNumber tid, + struct MeshClient *client, + MESH_TunnelNumber local) +{ + struct MeshTunnel *t; + struct GNUNET_HashCode hash; + + t = GNUNET_malloc (sizeof (struct MeshTunnel)); + t->id.oid = owner; + t->id.tid = tid; + t->queue_max = 1000; // FIXME API parameter + t->tree = tree_new (owner); + t->owner = client; + t->local_tid = local; + + GNUNET_CRYPTO_hash (&t->id, sizeof (struct MESH_TunnelID), &hash); + if (GNUNET_OK != + GNUNET_CONTAINER_multihashmap_put (tunnels, &hash, t, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) + { + GNUNET_break (0); + tunnel_destroy (t); + if (NULL != client) + GNUNET_SERVER_receive_done (client->handle, GNUNET_SYSERR); + return NULL; + } + + if (NULL != client) + { + GNUNET_CRYPTO_hash (&t->local_tid, sizeof (MESH_TunnelNumber), &hash); + if (GNUNET_OK != + GNUNET_CONTAINER_multihashmap_put (client->own_tunnels, &hash, t, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) + { + GNUNET_break (0); + tunnel_destroy (t); + GNUNET_SERVER_receive_done (client->handle, GNUNET_SYSERR); + return NULL; + } + } + + return t; +} + + /** * Removes an explicit path from a tunnel, freeing all intermediate nodes * that are no longer needed, as well as nodes of no longer reachable peers. @@ -2624,10 +2557,7 @@ tunnel_reset_timeout (struct MeshTunnel *t) /******************************************************************************/ /** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. + * Function to send a create path packet to a peer. * * @param cls closure * @param size number of bytes available in buf @@ -2635,12 +2565,11 @@ tunnel_reset_timeout (struct MeshTunnel *t) * @return number of bytes written to buf */ static size_t -send_core_create_path (void *cls, size_t size, void *buf) +send_core_path_create (void *cls, size_t size, void *buf) { struct MeshPathInfo *info = cls; struct GNUNET_MESH_ManipulatePath *msg; struct GNUNET_PeerIdentity *peer_ptr; - struct MeshPeerInfo *peer = info->peer; struct MeshTunnel *t = info->t; struct MeshPeerPath *p = info->path; size_t size_needed; @@ -2653,30 +2582,9 @@ send_core_create_path (void *cls, size_t size, void *buf) if (size < size_needed || NULL == buf) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "create path retransmit!\n"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " buf: %p\n", buf); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " size: (%u/%u)\n", size, - size_needed); - info->peer->core_transmit[info->pos] = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 0, - GNUNET_TIME_UNIT_FOREVER_REL, - tree_get_first_hop (t->tree, - peer->id), - size_needed, &send_core_create_path, - info); + GNUNET_break (0); return 0; } - info->peer->core_transmit[info->pos] = NULL; -#if MESH_DEBUG - { - struct GNUNET_PeerIdentity id; - - GNUNET_PEER_resolve (peer->id, &id); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " setting core_transmit %s [%u] to NULL\n", - GNUNET_i2s (&id), info->pos); - } -#endif msg = (struct GNUNET_MESH_ManipulatePath *) buf; msg->header.size = htons (size_needed); msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE); @@ -2698,10 +2606,7 @@ send_core_create_path (void *cls, size_t size, void *buf) /** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. + * Fill the core buffer * * @param cls closure (data itself) * @param size number of bytes available in buf @@ -2723,22 +2628,9 @@ send_core_data_multicast (void *cls, size_t size, void *buf) if (total_size > size) { - /* Retry */ - struct GNUNET_PeerIdentity id; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Multicast: retransmitting... (%u/%u)\n", size, - total_size); - GNUNET_PEER_resolve (info->peer->id, &id); - info->peer->core_transmit[info->handler_n] = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 0, - GNUNET_TIME_UNIT_FOREVER_REL, &id, - total_size, - &send_core_data_multicast, info); + GNUNET_break (0); return 0; } - info->peer->core_transmit[info->handler_n] = NULL; - info->peer->infos[info->handler_n] = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " copying data...\n"); memcpy (buf, info->mesh_data->data, total_size); #if MESH_DEBUG @@ -2772,10 +2664,7 @@ send_core_data_multicast (void *cls, size_t size, void *buf) /** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. + * Creates a path ack message in buf and frees all unused resources. * * @param cls closure (MeshTransmissionDescriptor) * @param size number of bytes available in buf @@ -2789,10 +2678,6 @@ send_core_path_ack (void *cls, size_t size, void *buf) struct GNUNET_MESH_PathACK *msg = buf; GNUNET_assert (NULL != info); - if (info->peer) - { - info->peer->core_transmit[info->handler_n] = NULL; - } if (sizeof (struct GNUNET_MESH_PathACK) > size) { GNUNET_break (0); @@ -2803,6 +2688,7 @@ send_core_path_ack (void *cls, size_t size, void *buf) GNUNET_PEER_resolve (info->origin->oid, &msg->oid); msg->tid = htonl (info->origin->tid); msg->peer_id = my_full_id; + GNUNET_free (info); /* TODO add signature */ @@ -2811,6 +2697,196 @@ send_core_path_ack (void *cls, size_t size, void *buf) } +/** + * Free a transmission that was already queued with all resources + * associated to the request. + * + * @param queue Queue handler to cancel. + * @param clear_cls Is it necessary to free associated cls? + */ +static void +queue_destroy (struct MeshPeerQueue *queue, int clear_cls) +{ + struct MeshTransmissionDescriptor *dd; + struct MeshPathInfo *path_info; + + if (GNUNET_YES == clear_cls) + { + switch (queue->type) + { + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type payload\n"); + dd = queue->cls; + data_descriptor_decrement_multicast (dd->mesh_data); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type create path\n"); + path_info = queue->cls; + path_destroy (path_info->path); + break; + default: + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type unknown!\n"); + } + GNUNET_free_non_null (queue->cls); + } + GNUNET_CONTAINER_DLL_remove (queue->peer->queue_head, + queue->peer->queue_tail, + queue); + GNUNET_free (queue); +} + + +/** + * Core callback to write a queued packet to core buffer + * + * @param cls Closure (peer info). + * @param size Number of bytes available in buf. + * @param buf Where the to write the message. + * + * @return number of bytes written to buf + */ +static size_t +queue_send (void *cls, size_t size, void *buf) +{ + struct MeshPeerInfo *peer = cls; + struct MeshPeerQueue *queue; + size_t data_size; + + peer->core_transmit = NULL; + queue = peer->queue_head; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* Queue send\n"); + + + /* If queue is empty, send should have been cancelled */ + if (NULL == queue) + { + GNUNET_break(0); + return 0; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* not empty\n"); + + /* Check if buffer size is enough for the message */ + if (queue->size > size) + { + struct GNUNET_PeerIdentity id; + + GNUNET_PEER_resolve (peer->id, &id); + peer->core_transmit = + GNUNET_CORE_notify_transmit_ready(core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + queue->size, + &queue_send, + peer); + return 0; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* size ok\n"); + + /* Fill buf */ + switch (queue->type) + { + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* unicast\n"); + data_size = send_core_data_raw (queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* multicast\n"); + data_size = send_core_data_multicast(queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path create\n"); + data_size = send_core_path_create(queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* path ack\n"); + data_size = send_core_path_ack(queue->cls, size, buf); + break; + default: + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* type unknown\n"); + data_size = 0; + } + queue->tunnel->queue_n--; + + /* Free queue, but cls was freed by send_core_* */ + queue_destroy(queue, GNUNET_NO); + + /* If more data in queue, send next */ + if (NULL != peer->queue_head) + { + struct GNUNET_PeerIdentity id; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* more data!\n"); + GNUNET_PEER_resolve (peer->id, &id); + peer->core_transmit = + GNUNET_CORE_notify_transmit_ready(core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + peer->queue_head->size, + &queue_send, + peer); + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "********* return %d\n", data_size); + return data_size; +} + + +/** + * Queue and pass message to core when possible. + * + * @param cls Closure (type dependant). + * @param type Type of the message. + * @param size Size of the message. + * @param dst Neighbor to send message to. + * @param t Tunnel this message belongs to. + */ +static void +queue_add (void *cls, uint16_t type, size_t size, + struct MeshPeerInfo *dst, struct MeshTunnel *t) +{ + struct MeshPeerQueue *queue; + + if (t->queue_n >= t->queue_max) + { + if (NULL == t->owner) + GNUNET_break_op(0); // TODO: kill connection? + else + GNUNET_break(0); + return; // Drop message + } + t->queue_n++; + queue = GNUNET_malloc (sizeof (struct MeshPeerQueue)); + queue->cls = cls; + queue->type = type; + queue->size = size; + queue->peer = dst; + queue->tunnel = t; + GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue); + if (NULL == dst->core_transmit) + { + struct GNUNET_PeerIdentity id; + + GNUNET_PEER_resolve (dst->id, &id); + dst->core_transmit = + GNUNET_CORE_notify_transmit_ready(core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + size, + &queue_send, + dst); + } +} + + /******************************************************************************/ /******************** MESH NETWORK HANDLERS **************************/ /******************************************************************************/ @@ -2876,27 +2952,16 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " path is for tunnel %s [%X].\n", GNUNET_i2s (pi), tid); t = tunnel_get (pi, tid); - if (NULL == t) + if (NULL == t) // FIXME only for INCOMING tunnels? { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Creating tunnel\n"); - t = GNUNET_malloc (sizeof (struct MeshTunnel)); - t->id.oid = GNUNET_PEER_intern (pi); - t->id.tid = tid; + t = tunnel_new (GNUNET_PEER_intern (pi), tid, NULL, 0); + while (NULL != tunnel_get_incoming (next_local_tid)) next_local_tid = (next_local_tid + 1) | GNUNET_MESH_LOCAL_TUNNEL_ID_SERV; t->local_tid_dest = next_local_tid++; next_local_tid = next_local_tid | GNUNET_MESH_LOCAL_TUNNEL_ID_SERV; - t->tree = tree_new (t->id.oid); - GNUNET_CRYPTO_hash (&t->id, sizeof (struct MESH_TunnelID), &hash); - if (GNUNET_OK != - GNUNET_CONTAINER_multihashmap_put (tunnels, &hash, t, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)) - { - tunnel_destroy (t); - GNUNET_break (0); - return GNUNET_OK; - } tunnel_reset_timeout (t); GNUNET_CRYPTO_hash (&t->local_tid_dest, sizeof (MESH_TunnelNumber), &hash); if (GNUNET_OK != @@ -2957,7 +3022,6 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, { /* It is for us! Send ack. */ struct MeshTransmissionDescriptor *info; - unsigned int j; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " It's for us!\n"); peer_info_add_path_to_origin (orig_peer_info, path, GNUNET_NO); @@ -2972,20 +3036,15 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, peer_info_get (&my_full_id), GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE)); - /* FIXME use send_message */ info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); info->origin = &t->id; info->peer = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey); GNUNET_assert (NULL != info->peer); - j = peer_info_transmit_slot (info->peer); - info->handler_n = j; - info->peer->types[j] = GNUNET_MESSAGE_TYPE_MESH_PATH_ACK; - info->peer->infos[j] = info; - info->peer->core_transmit[j] = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 10, - GNUNET_TIME_UNIT_FOREVER_REL, peer, - sizeof (struct GNUNET_MESH_PathACK), - &send_core_path_ack, info); + queue_add(info, + GNUNET_MESSAGE_TYPE_MESH_PATH_ACK, + sizeof (struct GNUNET_MESH_PathACK), + info->peer, + t); } else { @@ -3077,7 +3136,7 @@ handle_mesh_path_destroy (void *cls, const struct GNUNET_PeerIdentity *peer, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " Own position: %u\n", own_pos); if (own_pos < path->length - 1) - send_message (message, &pi[own_pos + 1]); + send_message (message, &pi[own_pos + 1], t); else send_client_tunnel_disconnect(t, NULL); @@ -3234,7 +3293,7 @@ handle_mesh_data_unicast (void *cls, const struct GNUNET_PeerIdentity *peer, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n"); - send_message (message, tree_get_first_hop (t->tree, pid)); + send_message (message, tree_get_first_hop (t->tree, pid), t); return GNUNET_OK; } @@ -3391,7 +3450,7 @@ handle_mesh_data_to_orig (void *cls, const struct GNUNET_PeerIdentity *peer, return GNUNET_OK; } GNUNET_PEER_resolve (tree_get_predecessor (t->tree), &id); - send_message (message, &id); + send_message (message, &id, t); return GNUNET_OK; } @@ -3477,7 +3536,7 @@ handle_mesh_path_ack (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_break (0); return GNUNET_OK; } - send_message (message, &id); + send_message (message, &id, t); return GNUNET_OK; } @@ -3898,7 +3957,6 @@ handle_local_tunnel_create (void *cls, struct GNUNET_SERVER_Client *client, struct GNUNET_MESH_TunnelMessage *t_msg; struct MeshTunnel *t; struct MeshClient *c; - GNUNET_HashCode hash; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new tunnel requested\n"); @@ -3935,39 +3993,14 @@ handle_local_tunnel_create (void *cls, struct GNUNET_SERVER_Client *client, return; } - t = GNUNET_malloc (sizeof (struct MeshTunnel)); while (NULL != tunnel_get_by_pi (myid, next_tid)) next_tid = (next_tid + 1) & ~GNUNET_MESH_LOCAL_TUNNEL_ID_CLI; - t->id.tid = next_tid++; + t = tunnel_new (myid, next_tid++, c, ntohl (t_msg->tunnel_id)); next_tid = next_tid & ~GNUNET_MESH_LOCAL_TUNNEL_ID_CLI; - t->id.oid = myid; - t->local_tid = ntohl (t_msg->tunnel_id); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "CREATED TUNNEL %s [%x] (%x)\n", GNUNET_i2s (&my_full_id), t->id.tid, t->local_tid); - t->owner = c; t->peers = GNUNET_CONTAINER_multihashmap_create (32); - GNUNET_CRYPTO_hash (&t->local_tid, sizeof (MESH_TunnelNumber), &hash); - if (GNUNET_OK != - GNUNET_CONTAINER_multihashmap_put (c->own_tunnels, &hash, t, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - - GNUNET_CRYPTO_hash (&t->id, sizeof (struct MESH_TunnelID), &hash); - if (GNUNET_OK != - GNUNET_CONTAINER_multihashmap_put (tunnels, &hash, t, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - t->tree = tree_new (myid); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new tunnel created\n"); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; @@ -4640,7 +4673,8 @@ static void core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) { struct MeshPeerInfo *pi; - unsigned int i; + struct MeshPeerQueue *q; + struct MeshPeerQueue *n; DEBUG_CONN ("Peer disconnected\n"); pi = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey); @@ -4649,10 +4683,16 @@ core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) GNUNET_break (0); return; } - for (i = 0; i < CORE_QUEUE_SIZE; i++) + q = pi->queue_head; + while (NULL != q) { - /* TODO: notify that the transmission failed */ - peer_info_cancel_transmission (pi, i); + n = q->next; + if (q->peer == pi) + { + /* try to reroute this traffic instead */ + queue_destroy(q, GNUNET_YES); + } + q = n; } peer_info_remove_path (pi, pi->id, myid); if (myid == pi->id) @@ -4698,7 +4738,19 @@ static int shutdown_peer (void *cls, const GNUNET_HashCode * key, void *value) { struct MeshPeerInfo *p = value; + struct MeshPeerQueue *q; + struct MeshPeerQueue *n; + q = p->queue_head; + while (NULL != q) + { + n = q->next; + if (q->peer == p) + { + queue_destroy(q, GNUNET_YES); + } + q = n; + } peer_info_destroy (p); return GNUNET_YES; } @@ -4798,12 +4850,12 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, &my_full_id.hashPubKey); myid = GNUNET_PEER_intern (&my_full_id); -// // transport_handle = GNUNET_TRANSPORT_connect(c, -// // &my_full_id, -// // NULL, -// // NULL, -// // NULL, -// // NULL); +// transport_handle = GNUNET_TRANSPORT_connect(c, +// &my_full_id, +// NULL, +// NULL, +// NULL, +// NULL); dht_handle = GNUNET_DHT_connect (c, 64); if (dht_handle == NULL) -- 2.25.1