From aa4361ff05e8cecfa11ca0d1fdc94be0503db07b Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Wed, 3 Aug 2011 23:31:50 +0000 Subject: [PATCH] Added real ACK for path acknowledgement --- src/include/gnunet_protocols.h | 4 + src/mesh/gnunet-service-mesh.c | 199 +++++++++++++++++++++++++-------- src/mesh/mesh_protocol.h | 25 +++++ 3 files changed, 182 insertions(+), 46 deletions(-) diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 36cfe0535..d31b67c95 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -825,6 +825,10 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_DATA_MESSAGE_TO_ORIGIN 262 +/** + * Send origin an ACK that the path is complete + */ +#define GNUNET_MESSAGE_TYPE_PATH_ACK 263 /** * We need flow control diff --git a/src/mesh/gnunet-service-mesh.c b/src/mesh/gnunet-service-mesh.c index e1ee58ed9..1d2994585 100644 --- a/src/mesh/gnunet-service-mesh.c +++ b/src/mesh/gnunet-service-mesh.c @@ -245,7 +245,7 @@ struct MeshTunnel struct GNUNET_TIME_Absolute timestamp; /** - * Peers in the tunnelindexed by PeerIdentity (MeshPeerInfo) + * Peers in the tunnel, indexed by PeerIdentity -> (MeshPeerInfo) */ struct GNUNET_CONTAINER_MultiHashMap* peers; @@ -338,6 +338,11 @@ static struct GNUNET_CORE_Handle *core_handle; */ static struct GNUNET_DHT_Handle *dht_handle; +/** + * Handle to server + */ +static struct GNUNET_SERVER_Handle *server_handle; + /** * Local peer own ID (memory efficient handle) */ @@ -735,28 +740,29 @@ struct MeshDataDescriptor { /** ID of the tunnel this packet travels in */ struct MESH_TunnelID *origin; - + /** Ultimate destination of the packet */ GNUNET_PEER_Id destination; - + /** Number of identical messages sent to different hops (multicast) */ unsigned int copies; - + /** Size of the data */ size_t size; - + /** Client that asked for the transmission, if any */ struct GNUNET_SERVER_Client *client; - + /** Who was this message directed to */ struct MeshPeerInfo *peer; - + /** Which handler was used to request the transmission */ unsigned int handler_n; - + /* Data at the end */ }; +#if LATER /** * Function called to notify a client about the socket * being ready to queue more data. "buf" will be @@ -797,7 +803,7 @@ send_core_data_to_origin (void *cls, size_t size, void *buf) GNUNET_free(info); return total_size; } - +#endif /** * Function called to notify a client about the socket @@ -863,7 +869,10 @@ send_core_data_multicast (void *cls, size_t size, void *buf) GNUNET_assert(NULL != info); total_size = info->size + sizeof(struct GNUNET_MESH_DataMessageMulticast); GNUNET_assert(total_size < GNUNET_SERVER_MAX_MESSAGE_SIZE); - + + if (info->peer) { + info->peer->core_transmit[info->handler_n] = NULL; + } if (total_size > size) { GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "not enough buffer to send data futher\n"); @@ -884,6 +893,39 @@ send_core_data_multicast (void *cls, size_t size, void *buf) } +/** + * Function called to notify a client about the socket + * being ready to queue more data. "buf" will be + * NULL and "size" zero if the socket was closed for + * writing in the meantime. + * + * @param cls closure (MeshDataDescriptor) + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +send_core_path_ack (void *cls, size_t size, void *buf) { + struct MeshDataDescriptor *info = cls; + struct GNUNET_MESH_PathACK *msg = buf; + + GNUNET_assert(NULL != info); + if (info->peer) { + info->peer->core_transmit[info->handler_n] = NULL; + } + if (sizeof(struct GNUNET_MESH_PathACK) > size) { + GNUNET_break(0); + return 0; + } + msg->header.size = htons(sizeof(struct GNUNET_MESH_PathACK)); + msg->header.type = htons(GNUNET_MESSAGE_TYPE_PATH_ACK); + GNUNET_PEER_resolve(info->origin->oid, &msg->oid); + msg->tid = htonl(info->origin->tid); + + return sizeof(struct GNUNET_MESH_PathACK); +} + + /** * Function called to notify a client about the socket * being ready to queue more data. "buf" will be @@ -969,35 +1011,34 @@ send_client_raw (void *cls, size_t size, void *buf) /** - * Iterator over hash map peer entries to resend a data packet to all peers - * down the tunnel. + * Iterator over hash map peer entries collect all neighbors who to resend the + * data to. * - * @param cls closure (original message) + * @param cls closure (**GNUNET_PEER_Id to store hops to send packet) * @param key current key code (peer id hash) * @param value value in the hash map (peer_info) * @return GNUNET_YES if we should continue to iterate, GNUNET_NO if not. */ -static int iterate_resend_multicast (void *cls, - const GNUNET_HashCode * key, - void *value) +static int iterate_collect_neighbors (void *cls, + const GNUNET_HashCode * key, + void *value) { - struct GNUNET_MESH_DataMessageMulticast *msg = cls; - struct GNUNET_PeerIdentity id; struct MeshPeerInfo *peer_info = value; + GNUNET_PEER_Id **neighbors = cls; + GNUNET_PEER_Id id; + unsigned int i; if (peer_info->id == myid) { -// TODO retransmit to interested clients return GNUNET_YES; } - GNUNET_PEER_resolve(get_first_hop(peer_info->path), &id); - GNUNET_CORE_notify_transmit_ready(core_handle, - 0, - 0, - GNUNET_TIME_UNIT_FOREVER_REL, - &id, - ntohs(msg->header.size), - &send_core_data_raw, - msg); + id = get_first_hop(peer_info->path); + for (i = 0; *neighbors[i] != 0; i++) { + if (*neighbors[i] == id) return GNUNET_YES; + } + *neighbors = GNUNET_realloc(*neighbors, (i + 2) * sizeof(GNUNET_PEER_Id)); + *neighbors[i] = id; + *neighbors[i + 1] = 0; + return GNUNET_YES; } @@ -1125,21 +1166,31 @@ handle_mesh_path_create (void *cls, return 0; } if (own_pos == size - 1) { /* it is for us! */ -// struct MeshDataDescriptor *info; - - /* FIXME: implement real dedicated ACK */ -// add_path_to_origin(orig_peer_info, path); /* inverts path! */ -// GNUNET_PEER_resolve(get_first_hop(path), &id); /* path is inverted :) */ -// info = GNUNET_malloc(sizeof(struct MeshDataDescriptor)); -// info->origin = &t->id; -// GNUNET_CORE_notify_transmit_ready(core_handle, -// 0, -// 0, -// GNUNET_TIME_UNIT_FOREVER_REL, -// &id, -// sizeof(struct GNUNET_MessageHeader), -// &send_core_data_to_origin, -// info); + struct MeshDataDescriptor *info; + unsigned int j; + + add_path_to_origin(orig_peer_info, path); /* inverts path! */ + GNUNET_PEER_resolve(get_first_hop(path), &id); /* path is inverted :) */ + info = GNUNET_malloc(sizeof(struct MeshDataDescriptor)); + info->origin = &t->id; + info->peer = GNUNET_CONTAINER_multihashmap_get(peers, &id.hashPubKey); + GNUNET_assert(info->peer); + for (j = 0; info->peer->core_transmit[j]; j++) { + if (j == 9) { + GNUNET_break(0); + return GNUNET_OK; + } + } + info->handler_n = j; + info->peer->core_transmit[j] = GNUNET_CORE_notify_transmit_ready( + core_handle, + 0, + 100, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + sizeof(struct GNUNET_MessageHeader), + &send_core_path_ack, + info); } else { add_path_to_peer(dest_peer_info, path); GNUNET_PEER_resolve(get_first_hop(path), &id); @@ -1248,8 +1299,17 @@ handle_mesh_data_multicast (void *cls, *atsi) { struct GNUNET_MESH_DataMessageMulticast *msg; + struct GNUNET_PeerIdentity id; struct MeshTunnel *t; + struct MeshClient *c; + struct MeshDataDescriptor *dd; + struct GNUNET_SERVER_NotificationContext *nc; + GNUNET_PEER_Id *neighbors; size_t size; + uint16_t type; + uint16_t i; + uint16_t j; + size = ntohs(message->size); if (size < sizeof(struct GNUNET_MESH_DataMessageMulticast)) { @@ -1263,10 +1323,56 @@ handle_mesh_data_multicast (void *cls, return GNUNET_OK; } - GNUNET_CONTAINER_multihashmap_iterate(t->peers, - &iterate_resend_multicast, - msg); + /* Transmit to locally interested clients */ + GNUNET_PEER_resolve(myid, &id); + if (GNUNET_CONTAINER_multihashmap_contains(t->peers, &id.hashPubKey)) { + type = ntohs(msg[1].header.type); + nc = GNUNET_SERVER_notification_context_create(server_handle, 10U); + for (c = clients; c != NULL; c = c->next) { + for (i = 0; i < c->type_counter; i++) { + if (c->types[i] == type) { + GNUNET_SERVER_notification_context_add(nc, c->handle); + } + } + } + } + /* Retransmit to other peers */ + neighbors = GNUNET_malloc(sizeof(GNUNET_PEER_Id)); + neighbors[0] = 0; + GNUNET_CONTAINER_multihashmap_iterate(t->peers, + &iterate_collect_neighbors, + &neighbors); + if (!neighbors[0]) { + return GNUNET_OK; + } + size -= sizeof(struct GNUNET_MESH_DataMessageMulticast); + dd = GNUNET_malloc(sizeof(struct MeshDataDescriptor) + size); + dd->origin = &t->id; + dd->copies = 0; + for (i = 0; 0 != neighbors[i]; i++) { + GNUNET_PEER_resolve(neighbors[i], &id); + dd->copies++; + dd->destination = neighbors[i]; + dd->peer = GNUNET_CONTAINER_multihashmap_get(peers, &id.hashPubKey); + GNUNET_assert(dd->peer); + for (j = 0; dd->peer->core_transmit[j]; j++) { + if (j == 9) { + GNUNET_break(0); + return GNUNET_OK; + } + } + dd->handler_n = j; + dd->peer->core_transmit[j] = GNUNET_CORE_notify_transmit_ready( + core_handle, + 0, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + &id, + ntohs(msg->header.size), + &send_core_data_multicast, + dd); + } return GNUNET_OK; } @@ -2237,6 +2343,7 @@ run (void *cls, "starting to run\n"); GNUNET_SERVER_add_handlers (server, plugin_handlers); GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); + server_handle = server; core_handle = GNUNET_CORE_connect (c, /* Main configuration */ CORE_QUEUE_SIZE, /* queue size */ NULL, /* Closure passed to MESH functions */ diff --git a/src/mesh/mesh_protocol.h b/src/mesh/mesh_protocol.h index c18d56400..8868d0c66 100644 --- a/src/mesh/mesh_protocol.h +++ b/src/mesh/mesh_protocol.h @@ -151,6 +151,31 @@ struct GNUNET_MESH_DataMessageToOrigin */ }; + +/** + * Message for ack'ing a path + */ +struct GNUNET_MESH_PathACK +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PATH_ACK + */ + struct GNUNET_MessageHeader header; + + /** + * TID of the tunnel + */ + uint32_t tid GNUNET_PACKED; + + /** + * OID of the tunnel + */ + struct GNUNET_PeerIdentity oid; + + /* TODO: signature */ +}; + + /** * Message for mesh flow control */ -- 2.25.1