From 6a665489add525880f1f609271dace270c868786 Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Tue, 17 Jan 2012 17:29:00 +0000 Subject: [PATCH] Fixed #2070 and simplified data transmission unicast/multicast handling --- src/mesh/gnunet-service-mesh.c | 171 ++++++++++++++++----------------- 1 file changed, 85 insertions(+), 86 deletions(-) diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index fb6d09e5a..56ae515f9 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -82,39 +82,50 @@ /** FWD declaration */ struct MeshPeerInfo; + +/** + * Struct representing a piece of data being sent to other peers + */ +struct MeshData +{ + /** Tunnel it belongs to. */ + struct MeshTunnel *t; + + /** In case of a multicast, task to allow a client to send more data if + * some neighbor is too slow. */ + GNUNET_SCHEDULER_TaskIdentifier *task; + + /** How many remaining neighbors we need to send this to. */ + unsigned int *reference_counter; + + /** Size of the data. */ + size_t data_len; + + /** Data itself */ + void *data; +}; + + /** * Struct containing all info possibly needed to build a package when called * back by core. */ -struct MeshDataDescriptor +struct MeshTransmissionDescriptor { /** ID of the tunnel this packet travels in */ struct MESH_TunnelID *origin; - /** Data itself */ - void *data; - - /** Client that asked for the transmission, if any */ - struct GNUNET_SERVER_Client *client; - /** Who was this message being sent to */ struct MeshPeerInfo *peer; /** Ultimate destination of the packet */ GNUNET_PEER_Id destination; - /** Number of identical messages sent to different hops (multicast) */ - unsigned int *copies; - /** Which handler was used to request the transmission */ unsigned int handler_n; - /** Size of the data */ - size_t size; - - /** Used to allow a client send more traffic to the service after a - * previous packet was tried to be sent to a neighbor and couldn't */ - GNUNET_SCHEDULER_TaskIdentifier *timeout_task; + /** Data descriptor */ + struct MeshData* mesh_data; }; @@ -723,17 +734,18 @@ client_is_subscribed (uint16_t message_type, struct MeshClient *c) static void client_allow_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct MeshClient *c = cls; + struct MeshData *mdata = cls; if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) return; -#if 0 +#if MESH_DEBUG + GNUNET_assert (NULL != mdata->reference_counter); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: CLIENT ALLOW SEND DESPITE %u COPIES PENDING\n", - (info->copies != NULL) ? *(info->copies) : 0); + mdata->reference_counter); #endif -// *(info->timeout_task) = GNUNET_SCHEDULER_NO_TASK; - GNUNET_SERVER_receive_done (c->handle, GNUNET_OK); + *(mdata->task) = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SERVER_receive_done (mdata->t->client->handle, GNUNET_OK); } @@ -912,29 +924,30 @@ send_core_data_multicast (void *cls, size_t size, void *buf); /** * Decrements the reference counter and frees all resources if needed * - * @param dd Data Descriptor used in a multicast message + * @param dd Data Descriptor used in a multicast message. Freed if needed. */ static void -data_descriptor_decrement_multicast (struct MeshDataDescriptor *dd) +data_descriptor_decrement_multicast (struct MeshData *mesh_data) { - if (0 == --(*(dd->copies))) + /* Make sure it's a multicast packet */ + GNUNET_assert (NULL != mesh_data->reference_counter); + + if (0 == --(*(mesh_data->reference_counter))) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: Last copy!\n"); - if (NULL != dd->client) + if (NULL != mesh_data->task) { - if (GNUNET_SCHEDULER_NO_TASK != *(dd->timeout_task)) + if (GNUNET_SCHEDULER_NO_TASK != *(mesh_data->task)) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "MESH: cancelling client timeout (%u)...\n", - *(dd->timeout_task)); - GNUNET_SCHEDULER_cancel (*(dd->timeout_task)); + GNUNET_SCHEDULER_cancel (*(mesh_data->task)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: notifying client...\n"); - GNUNET_SERVER_receive_done (dd->client, GNUNET_OK); + GNUNET_SERVER_receive_done (mesh_data->t->client->handle, GNUNET_OK); } - GNUNET_free (dd->timeout_task); + GNUNET_free (mesh_data->task); } - GNUNET_free (dd->copies); - GNUNET_free (dd->data); + GNUNET_free (mesh_data->reference_counter); + GNUNET_free (mesh_data->data); + GNUNET_free (mesh_data); } } @@ -951,7 +964,7 @@ peer_info_cancel_transmission (struct MeshPeerInfo *peer, unsigned int i) { if (NULL != peer->core_transmit[i]) { - struct MeshDataDescriptor *dd; + struct MeshTransmissionDescriptor *dd; struct MeshPathInfo *path_info; #if MESH_DEBUG @@ -974,7 +987,7 @@ peer_info_cancel_transmission (struct MeshPeerInfo *peer, unsigned int i) case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: type payload\n"); dd = peer->infos[i]; - data_descriptor_decrement_multicast (dd); + data_descriptor_decrement_multicast (dd->mesh_data); break; case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: type create path\n"); @@ -1099,7 +1112,7 @@ peer_info_delete_tunnel (void *cls, const GNUNET_HashCode * key, void *value) /** * Core callback to write a * - * @param cls Closure (MeshDataDescriptor with data in "data" member). + * @param cls Closure (MeshTransmissionDescriptor with data in "data" member). * @param size Number of bytes available in buf. * @param buf Where the to write the message. * @@ -1108,13 +1121,13 @@ peer_info_delete_tunnel (void *cls, const GNUNET_HashCode * key, void *value) static size_t send_core_data_raw (void *cls, size_t size, void *buf) { - struct MeshDataDescriptor *info = cls; + struct MeshTransmissionDescriptor *info = cls; struct GNUNET_MessageHeader *msg; size_t total_size; GNUNET_assert (NULL != info); - GNUNET_assert (NULL != info->data); - msg = (struct GNUNET_MessageHeader *) info->data; + GNUNET_assert (NULL != info->mesh_data); + msg = (struct GNUNET_MessageHeader *) info->mesh_data->data; total_size = ntohs (msg->size); if (total_size > size) @@ -1130,7 +1143,7 @@ send_core_data_raw (void *cls, size_t size, void *buf) } info->peer->core_transmit[info->handler_n] = NULL; memcpy (buf, msg, total_size); - GNUNET_free (info->data); + GNUNET_free (info->mesh_data); GNUNET_free (info); return total_size; } @@ -1142,12 +1155,14 @@ send_core_data_raw (void *cls, size_t size, void *buf) * * @param message Message to send. Fucntion makes a copy of it. * @param peer Short ID of the neighbor whom to send the message. + * + * FIXME tunnel? */ static void send_message (const struct GNUNET_MessageHeader *message, const struct GNUNET_PeerIdentity *peer) { - struct MeshDataDescriptor *info; + struct MeshTransmissionDescriptor *info; struct MeshPeerInfo *neighbor; struct MeshPeerPath *p; unsigned int i; @@ -1156,9 +1171,11 @@ send_message (const struct GNUNET_MessageHeader *message, // GNUNET_TRANSPORT_try_connect(); size = ntohs (message->size); - info = GNUNET_malloc (sizeof (struct MeshDataDescriptor)); - info->data = GNUNET_malloc (size); - memcpy (info->data, message, size); + info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); + info->mesh_data = GNUNET_malloc (sizeof (struct MeshData)); + info->mesh_data->data = GNUNET_malloc (size); + memcpy (info->mesh_data->data, message, size); + info->mesh_data->data_len = size; neighbor = peer_info_get (peer); for (p = neighbor->path_head; NULL != p; p = p->next) { @@ -1170,6 +1187,8 @@ send_message (const struct GNUNET_MessageHeader *message, if (NULL == p) { GNUNET_break (0); + GNUNET_free (info->mesh_data->data); + GNUNET_free (info->mesh_data); GNUNET_free (info); return; } @@ -1978,20 +1997,6 @@ tunnel_notify_connection_broken (struct MeshTunnel *t, GNUNET_PEER_Id p1, } -struct MeshMulticastData -{ - struct MeshTunnel *t; - - GNUNET_SCHEDULER_TaskIdentifier *task; - - unsigned int *reference_counter; - - size_t data_len; - - void *data; -}; - - /** * Send a multicast packet to a neighbor. * @@ -2001,23 +2006,15 @@ struct MeshMulticastData static void tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) { - struct MeshMulticastData *mdata = cls; - struct MeshDataDescriptor *info; + struct MeshData *mdata = cls; + struct MeshTransmissionDescriptor *info; struct GNUNET_PeerIdentity neighbor; unsigned int i; - info = GNUNET_malloc (sizeof (struct MeshDataDescriptor)); - - info->data = mdata->data; - info->size = mdata->data_len; - info->copies = mdata->reference_counter; - (*(mdata->reference_counter))++; + info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); - if (NULL != mdata->t->client) - { - info->client = mdata->t->client->handle; - info->timeout_task = mdata->task; - } + info->mesh_data = mdata; + (*(mdata->reference_counter)) ++; info->destination = neighbor_id; GNUNET_PEER_resolve (neighbor_id, &neighbor); #if MESH_DEBUG @@ -2033,7 +2030,7 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) info->peer->core_transmit[i] = GNUNET_CORE_notify_transmit_ready (core_handle, 0, 0, GNUNET_TIME_UNIT_FOREVER_REL, - &neighbor, info->size, + &neighbor, info->mesh_data->data_len, &send_core_data_multicast, info); } @@ -2048,13 +2045,13 @@ static void tunnel_send_multicast (struct MeshTunnel *t, const struct GNUNET_MessageHeader *msg) { - struct MeshMulticastData *mdata; + struct MeshData *mdata; #if MESH_DEBUG GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: sending a multicast packet...\n"); #endif - mdata = GNUNET_malloc (sizeof (struct MeshMulticastData)); + mdata = GNUNET_malloc (sizeof (struct MeshData)); mdata->data_len = ntohs (msg->size); mdata->reference_counter = GNUNET_malloc (sizeof (unsigned int)); mdata->t = t; @@ -2078,9 +2075,11 @@ tunnel_send_multicast (struct MeshTunnel *t, if (NULL != t->client) { mdata->task = GNUNET_malloc (sizeof (GNUNET_SCHEDULER_TaskIdentifier)); - *(mdata->task) = + (*(mdata->task)) = GNUNET_SCHEDULER_add_delayed (UNACKNOWLEDGED_WAIT, &client_allow_send, - t->client->handle); + mdata); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: timeout task %u\n", + *(mdata->task)); } tree_iterate_children (t->tree, &tunnel_send_multicast_iterator, mdata); @@ -2089,8 +2088,8 @@ tunnel_send_multicast (struct MeshTunnel *t, GNUNET_free (mdata->data); GNUNET_free (mdata->reference_counter); GNUNET_free_non_null (mdata->task); + GNUNET_free (mdata); } - GNUNET_free (mdata); #if MESH_DEBUG GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: sending a multicast packet done\n"); @@ -2361,13 +2360,13 @@ send_core_create_path (void *cls, size_t size, void *buf) static size_t send_core_data_multicast (void *cls, size_t size, void *buf) { - struct MeshDataDescriptor *info = cls; + struct MeshTransmissionDescriptor *info = cls; size_t total_size; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: Multicast callback.\n"); GNUNET_assert (NULL != info); GNUNET_assert (NULL != info->peer); - total_size = info->size; + total_size = info->mesh_data->data_len; GNUNET_assert (total_size < GNUNET_SERVER_MAX_MESSAGE_SIZE); if (total_size > size) @@ -2389,7 +2388,7 @@ send_core_data_multicast (void *cls, size_t size, void *buf) info->peer->core_transmit[info->handler_n] = NULL; info->peer->infos[info->handler_n] = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: copying data...\n"); - memcpy (buf, info->data, total_size); + memcpy (buf, info->mesh_data->data, total_size); #if MESH_DEBUG { struct GNUNET_MESH_Multicast *mc; @@ -2412,7 +2411,7 @@ send_core_data_multicast (void *cls, size_t size, void *buf) } } #endif - data_descriptor_decrement_multicast (info); + data_descriptor_decrement_multicast (info->mesh_data); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: freeing info...\n"); GNUNET_free (info); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: return %u\n", total_size); @@ -2426,7 +2425,7 @@ send_core_data_multicast (void *cls, size_t size, void *buf) * NULL and "size" zero if the socket was closed for * writing in the meantime. * - * @param cls closure (MeshDataDescriptor) + * @param cls closure (MeshTransmissionDescriptor) * @param size number of bytes available in buf * @param buf where the callee should write the message * @return number of bytes written to buf @@ -2434,7 +2433,7 @@ send_core_data_multicast (void *cls, size_t size, void *buf) static size_t send_core_path_ack (void *cls, size_t size, void *buf) { - struct MeshDataDescriptor *info = cls; + struct MeshTransmissionDescriptor *info = cls; struct GNUNET_MESH_PathACK *msg = buf; GNUNET_assert (NULL != info); @@ -2605,7 +2604,7 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, if (own_pos == size - 1) { /* It is for us! Send ack. */ - struct MeshDataDescriptor *info; + struct MeshTransmissionDescriptor *info; unsigned int j; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MESH: It's for us!\n"); @@ -2631,7 +2630,7 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, (&my_full_id), GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE)); /* FIXME use send_message */ - info = GNUNET_malloc (sizeof (struct MeshDataDescriptor)); + info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); info->origin = &t->id; info->peer = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey); GNUNET_assert (NULL != info->peer); -- 2.25.1