From: Bart Polot Date: Tue, 5 Jun 2012 14:07:12 +0000 (+0000) Subject: - WiP new mesh service X-Git-Tag: initial-import-from-subversion-38251~13253 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=bf0e2eaecab85fb3c73b192a5027f3f3c26d1dfd;p=oweals%2Fgnunet.git - WiP new mesh service --- diff --git a/src/mesh/gnunet-service-mesh_new.c b/src/mesh/gnunet-service-mesh_new.c index 34a48dabe..b60544087 100644 --- a/src/mesh/gnunet-service-mesh_new.c +++ b/src/mesh/gnunet-service-mesh_new.c @@ -136,11 +136,6 @@ struct MeshPeerQueue */ struct MeshPeerInfo *peer; - /** - * Handle to stop queued transmission - */ - struct GNUNET_CORE_TransmitHandle *core_transmit; - /** * Pointer to info stucture used as cls. */ @@ -150,6 +145,11 @@ struct MeshPeerQueue * Type of message */ uint16_t type; + + /** + * Size of the message + */ + size_t size; }; @@ -168,9 +168,6 @@ struct MeshTransmissionDescriptor /** Ultimate destination of the packet */ GNUNET_PEER_Id destination; - /** Which handler was used to request the transmission */ - struct MeshPeerQueue *queue; - /** Data descriptor */ struct MeshData* mesh_data; }; @@ -238,6 +235,11 @@ struct MeshPeerInfo * Number of tunnels this peers participates in */ unsigned int ntunnels; + + /** + * Handle to stop queued transmission + */ + struct GNUNET_CORE_TransmitHandle *core_transmit; }; @@ -431,11 +433,6 @@ struct MeshPathInfo * Path itself */ struct MeshPeerPath *path; - - /** - * Position in peer's transmit queue - */ - struct MeshPeerQueue *queue; }; @@ -1160,37 +1157,6 @@ send_client_tunnel_disconnect (struct MeshTunnel *t, struct MeshClient *c) } -/** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -send_core_create_path (void *cls, size_t size, void *buf); - - -/** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. - * - * @param cls closure (data itself) - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * - * @return number of bytes written to buf - */ -static size_t -send_core_data_multicast (void *cls, size_t size, void *buf); - - /** * Decrements the reference counter and frees all resources if needed * @@ -1223,61 +1189,6 @@ data_descriptor_decrement_multicast (struct MeshData *mesh_data) } -/** - * Cancel a core transmission that was already queued, free all resources - * associated to the request and cancel all external requests (core, ...). - * - * @param queue Queue handler to cancel. - */ -static void -peer_info_cancel_transmission (struct MeshPeerQueue *queue) -{ - struct MeshTransmissionDescriptor *dd; - struct MeshPathInfo *path_info; - struct MeshPeerInfo *peer; - - peer = queue->peer; - if (NULL != queue->core_transmit) - { -#if MESH_DEBUG - { - struct GNUNET_PeerIdentity id; - - GNUNET_PEER_resolve (peer->id, &id); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " Cancelling data transmission at %s\n", - GNUNET_i2s (&id)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " message type %u\n", - queue->type); - } -#endif - GNUNET_CORE_notify_transmit_ready_cancel (queue->core_transmit); - } - /* TODO: notify that transmission has failed */ - switch (queue->type) - { - case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type payload\n"); - dd = queue->cls; - data_descriptor_decrement_multicast (dd->mesh_data); - break; - case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type create path\n"); - path_info = queue->cls; - path_destroy (path_info->path); - break; - default: - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type unknown!\n"); - } - GNUNET_free_non_null (queue->cls); - GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue); - GNUNET_free(queue); -} - - /** * Retrieve the MeshPeerInfo stucture associated with the peer, create one * and insert it in the appropiate structures if the peer is not known yet. @@ -1355,6 +1266,17 @@ peer_info_delete_tunnel (void *cls, const GNUNET_HashCode * key, void *value) } +/** + * Queue and pass message to core when possible. + * + * @param cls Closure (type dependant). + * @param type Type of the message. + * @param size Size of the message. + * @param dst Neighbor to send message to. + */ +static void +queue_add (void *cls, uint16_t type, size_t size, struct MeshPeerInfo *dst); + /** * Core callback to write a pre-constructed data packet to core buffer * @@ -1369,33 +1291,21 @@ send_core_data_raw (void *cls, size_t size, void *buf) { struct MeshTransmissionDescriptor *info = cls; struct GNUNET_MessageHeader *msg; - struct MeshPeerQueue *queue; size_t total_size; GNUNET_assert (NULL != info); GNUNET_assert (NULL != info->mesh_data); msg = (struct GNUNET_MessageHeader *) info->mesh_data->data; total_size = ntohs (msg->size); - queue = info->queue; if (total_size > size) { - struct GNUNET_PeerIdentity id; - - GNUNET_PEER_resolve (info->peer->id, &id); - queue->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 100, - GNUNET_TIME_UNIT_FOREVER_REL, &id, - size, &send_core_data_raw, info); + GNUNET_break (0); return 0; } memcpy (buf, msg, total_size); GNUNET_free (info->mesh_data); GNUNET_free (info); - GNUNET_CONTAINER_DLL_remove (queue->peer->queue_head, - queue->peer->queue_tail, - queue); - GNUNET_free (queue); return total_size; } @@ -1414,7 +1324,6 @@ send_message (const struct GNUNET_MessageHeader *message, const struct GNUNET_PeerIdentity *peer) { struct MeshTransmissionDescriptor *info; - struct MeshPeerQueue *queue; struct MeshPeerInfo *neighbor; struct MeshPeerPath *p; size_t size; @@ -1443,19 +1352,11 @@ send_message (const struct GNUNET_MessageHeader *message, GNUNET_free (info); return; } - queue = GNUNET_malloc (sizeof(struct MeshPeerQueue)); - queue->peer = neighbor; - info->queue = queue; info->peer = neighbor; - queue->type = GNUNET_MESSAGE_TYPE_MESH_UNICAST; - queue->cls = info; - queue->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 100, - GNUNET_TIME_UNIT_FOREVER_REL, peer, - size, &send_core_data_raw, info); - GNUNET_CONTAINER_DLL_insert (neighbor->queue_head, - neighbor->queue_tail, - queue); + queue_add (info, + GNUNET_MESSAGE_TYPE_MESH_UNICAST, + size, + neighbor); } @@ -1474,7 +1375,6 @@ send_create_path (struct MeshPeerInfo *peer, struct MeshPeerPath *p, struct GNUNET_PeerIdentity id; struct MeshPathInfo *path_info; struct MeshPeerInfo *neighbor; - struct MeshPeerQueue *queue; unsigned int i; @@ -1500,28 +1400,16 @@ send_create_path (struct MeshPeerInfo *peer, struct MeshPeerPath *p, } GNUNET_PEER_resolve (p->peers[i + 1], &id); - queue = GNUNET_malloc (sizeof(struct MeshPeerQueue)); path_info = GNUNET_malloc (sizeof (struct MeshPathInfo)); path_info->path = p; path_info->t = t; neighbor = peer_info_get (&id); path_info->peer = neighbor; - path_info->queue = queue; - queue->type = GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE; - queue->cls = path_info; - queue->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, /* handle */ - 0, /* cork */ - 0, /* priority */ - GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */ - &id, /* target */ - sizeof (struct GNUNET_MESH_ManipulatePath) + - (p->length * sizeof (struct GNUNET_PeerIdentity)), /*size */ - &send_core_create_path, /* callback */ - path_info); /* cls */ - GNUNET_CONTAINER_DLL_insert (neighbor->queue_head, - neighbor->queue_tail, - queue); + queue_add (path_info, + GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE, + sizeof (struct GNUNET_MESH_ManipulatePath) + + (p->length * sizeof (struct GNUNET_PeerIdentity)), + neighbor); } @@ -2344,7 +2232,6 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) struct MeshData *mdata = cls; struct MeshTransmissionDescriptor *info; struct GNUNET_PeerIdentity neighbor; - struct MeshPeerQueue *queue; info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); @@ -2356,19 +2243,11 @@ tunnel_send_multicast_iterator (void *cls, GNUNET_PEER_Id neighbor_id) GNUNET_i2s (&neighbor)); info->peer = peer_info_get (&neighbor); GNUNET_assert (NULL != info->peer); - queue = GNUNET_malloc (sizeof(struct MeshPeerQueue)); - info->queue = queue; - queue->cls = info; - queue->type = GNUNET_MESSAGE_TYPE_MESH_MULTICAST; - queue->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 0, - GNUNET_TIME_UNIT_FOREVER_REL, - &neighbor, info->mesh_data->data_len, - &send_core_data_multicast, info); - queue->peer = info->peer; - GNUNET_CONTAINER_DLL_insert (queue->peer->queue_head, - queue->peer->queue_tail, - queue); + queue_add(info, + GNUNET_MESSAGE_TYPE_MESH_MULTICAST, + info->mesh_data->data_len, + info->peer + ); } /** @@ -2649,10 +2528,7 @@ tunnel_reset_timeout (struct MeshTunnel *t) /******************************************************************************/ /** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. + * Function to send a create path packet to a peer. * * @param cls closure * @param size number of bytes available in buf @@ -2660,12 +2536,11 @@ tunnel_reset_timeout (struct MeshTunnel *t) * @return number of bytes written to buf */ static size_t -send_core_create_path (void *cls, size_t size, void *buf) +send_core_path_create (void *cls, size_t size, void *buf) { struct MeshPathInfo *info = cls; struct GNUNET_MESH_ManipulatePath *msg; struct GNUNET_PeerIdentity *peer_ptr; - struct MeshPeerInfo *peer = info->peer; struct MeshTunnel *t = info->t; struct MeshPeerPath *p = info->path; size_t size_needed; @@ -2678,17 +2553,7 @@ send_core_create_path (void *cls, size_t size, void *buf) if (size < size_needed || NULL == buf) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "create path retransmit!\n"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " buf: %p\n", buf); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " size: (%u/%u)\n", size, - size_needed); - info->queue->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 0, - GNUNET_TIME_UNIT_FOREVER_REL, - tree_get_first_hop (t->tree, - peer->id), - size_needed, &send_core_create_path, - info); + GNUNET_break (0); return 0; } msg = (struct GNUNET_MESH_ManipulatePath *) buf; @@ -2703,10 +2568,6 @@ send_core_create_path (void *cls, size_t size, void *buf) } path_destroy (p); - GNUNET_CONTAINER_DLL_remove(info->queue->peer->queue_head, - info->queue->peer->queue_tail, - info->queue); - GNUNET_free (info->queue); GNUNET_free (info); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2716,10 +2577,7 @@ send_core_create_path (void *cls, size_t size, void *buf) /** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. + * Fill the core buffer * * @param cls closure (data itself) * @param size number of bytes available in buf @@ -2741,18 +2599,7 @@ send_core_data_multicast (void *cls, size_t size, void *buf) if (total_size > size) { - /* Retry */ - struct GNUNET_PeerIdentity id; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Multicast: retransmitting... (%u/%u)\n", size, - total_size); - GNUNET_PEER_resolve (info->peer->id, &id); - info->queue->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 0, - GNUNET_TIME_UNIT_FOREVER_REL, &id, - total_size, - &send_core_data_multicast, info); + GNUNET_break (0); return 0; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " copying data...\n"); @@ -2781,10 +2628,6 @@ send_core_data_multicast (void *cls, size_t size, void *buf) #endif data_descriptor_decrement_multicast (info->mesh_data); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "freeing info...\n"); - GNUNET_CONTAINER_DLL_remove(info->queue->peer->queue_head, - info->queue->peer->queue_tail, - info->queue); - GNUNET_free (info->queue); GNUNET_free (info); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "return %u\n", total_size); return total_size; @@ -2792,10 +2635,7 @@ send_core_data_multicast (void *cls, size_t size, void *buf) /** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. + * Creates a path ack message in buf and frees all unused resources. * * @param cls closure (MeshTransmissionDescriptor) * @param size number of bytes available in buf @@ -2820,13 +2660,6 @@ send_core_path_ack (void *cls, size_t size, void *buf) msg->tid = htonl (info->origin->tid); msg->peer_id = my_full_id; - if (info->queue) - { - GNUNET_CONTAINER_DLL_remove(info->queue->peer->queue_head, - info->queue->peer->queue_tail, - info->queue); - GNUNET_free (info->queue); - } GNUNET_free (info); /* TODO add signature */ @@ -2835,6 +2668,170 @@ send_core_path_ack (void *cls, size_t size, void *buf) } +/** + * Free a transmission that was already queued with all resources + * associated to the request. + * + * @param queue Queue handler to cancel. + */ +static void +queue_destroy (struct MeshPeerQueue *queue) +{ + struct MeshTransmissionDescriptor *dd; + struct MeshPathInfo *path_info; + struct MeshPeerInfo *peer; + + peer = queue->peer; + switch (queue->type) + { + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type payload\n"); + dd = queue->cls; + data_descriptor_decrement_multicast (dd->mesh_data); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type create path\n"); + path_info = queue->cls; + path_destroy (path_info->path); + break; + default: + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type unknown!\n"); + } + GNUNET_free_non_null (queue->cls); + GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue); + GNUNET_free(queue); +} + + +/** + * Core callback to write a queued packet to core buffer + * + * @param cls Closure (peer info). + * @param size Number of bytes available in buf. + * @param buf Where the to write the message. + * + * @return number of bytes written to buf + */ +static size_t +queue_send (void *cls, size_t size, void *buf) +{ + struct MeshPeerInfo *peer = cls; + struct MeshPeerQueue *queue; + size_t data_size; + + peer->core_transmit = NULL; + queue = peer->queue_head; + + /* If queue is empty, send should have been cancelled */ + if (NULL == queue) + { + GNUNET_break(0); + return 0; + } + + /* Check if buffer size is enough for the message */ + if (queue->size < size) + { + struct GNUNET_PeerIdentity id; + + GNUNET_PEER_resolve (peer->id, &id); + peer->core_transmit = + GNUNET_CORE_notify_transmit_ready(core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + queue->size, + &queue_send, + peer); + return 0; + } + + /* Fill buf */ + switch (queue->type) + { + case GNUNET_MESSAGE_TYPE_MESH_UNICAST: + data_size = send_core_data_raw (queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_MULTICAST: + data_size = send_core_data_multicast(queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + data_size = 0; + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_CREATE: + data_size = send_core_path_create(queue->cls, size, buf); + break; + case GNUNET_MESSAGE_TYPE_MESH_PATH_ACK: + data_size = send_core_path_ack(queue->cls, size, buf); + break; + default: + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " type unknown!\n"); + data_size = 0; + } + + /* Free resources */ + queue_destroy(queue); + + /* If more data in queue, send next */ + if (NULL != peer->queue_head) + { + struct GNUNET_PeerIdentity id; + + GNUNET_PEER_resolve (peer->id, &id); + peer->core_transmit = + GNUNET_CORE_notify_transmit_ready(core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + peer->queue_head->size, + &queue_send, + peer); + } + return data_size; +} + + +/** + * Queue and pass message to core when possible. + * + * @param cls Closure (type dependant). + * @param type Type of the message. + * @param size Size of the message. + * @param dst Neighbor to send message to. + */ +static void +queue_add (void *cls, uint16_t type, size_t size, struct MeshPeerInfo *dst) +{ + struct MeshPeerQueue *queue; + + queue = GNUNET_malloc (sizeof (struct MeshPeerQueue)); + queue->cls = cls; + queue->type = type; + queue->size = size; + queue->peer = dst; + GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue); + if (NULL == dst->core_transmit) + { + struct GNUNET_PeerIdentity id; + + GNUNET_PEER_resolve (dst->id, &id); + dst->core_transmit = + GNUNET_CORE_notify_transmit_ready(core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + size, + &queue_send, + dst); + } +} + + /******************************************************************************/ /******************** MESH NETWORK HANDLERS **************************/ /******************************************************************************/ @@ -2981,7 +2978,6 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, { /* It is for us! Send ack. */ struct MeshTransmissionDescriptor *info; - struct MeshPeerQueue *queue; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " It's for us!\n"); peer_info_add_path_to_origin (orig_peer_info, path, GNUNET_NO); @@ -2996,24 +2992,14 @@ handle_mesh_path_create (void *cls, const struct GNUNET_PeerIdentity *peer, peer_info_get (&my_full_id), GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE)); - /* FIXME use send_message */ info = GNUNET_malloc (sizeof (struct MeshTransmissionDescriptor)); info->origin = &t->id; info->peer = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey); GNUNET_assert (NULL != info->peer); - queue = GNUNET_malloc (sizeof (struct MeshPeerQueue)); - info->queue = queue; - queue->peer = info->peer; - queue->type = GNUNET_MESSAGE_TYPE_MESH_PATH_ACK; - queue->cls = info; - queue->core_transmit = - GNUNET_CORE_notify_transmit_ready (core_handle, 0, 10, - GNUNET_TIME_UNIT_FOREVER_REL, peer, - sizeof (struct GNUNET_MESH_PathACK), - &send_core_path_ack, info); - GNUNET_CONTAINER_DLL_insert (queue->peer->queue_head, - queue->peer->queue_tail, - queue); + queue_add(info, + GNUNET_MESSAGE_TYPE_MESH_PATH_ACK, + sizeof (struct GNUNET_MESH_PathACK), + info->peer); } else {