From 54ce17ec50827d3a905a3cb182ccd3c18bbd3668 Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Fri, 12 Mar 2010 17:30:49 +0000 Subject: [PATCH] wont work because someone made my random heap function unpossible. committing for posterity mostly --- src/dv/dv.h | 8 +- src/dv/dv_api.c | 19 ++- src/dv/gnunet-service-dv.c | 262 ++++++++++++++++++++++++++++++++++--- 3 files changed, 263 insertions(+), 26 deletions(-) diff --git a/src/dv/dv.h b/src/dv/dv.h index 10120a4b0..d43befd70 100644 --- a/src/dv/dv.h +++ b/src/dv/dv.h @@ -33,7 +33,8 @@ typedef void (*GNUNET_DV_MessageReceivedHandler) (void *cls, struct GNUNET_PeerIdentity *sender, - struct GNUNET_MessageHeader *msg, + char *msg, + size_t msg_len, unsigned int distance, char *sender_address, size_t sender_address_len); @@ -42,7 +43,8 @@ typedef void (*GNUNET_DV_MessageReceivedHandler) (void *cls, * DV Message, contains a message that was received * via DV for this peer! * - * Sender address is copied to the end of this struct. + * Sender address is copied to the end of this struct, + * followed by the actual message received. */ struct GNUNET_DV_MessageReceived { @@ -59,7 +61,7 @@ struct GNUNET_DV_MessageReceived /** * The message that was sent */ - struct GNUNET_MessageHeader *msg; + size_t msg_len; /** * The distance to the peer that we received the message from diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c index 15ef822d6..9e99790fb 100644 --- a/src/dv/dv_api.c +++ b/src/dv/dv_api.c @@ -250,7 +250,10 @@ void handle_message_receipt (void *cls, { struct GNUNET_DV_Handle *handle = cls; struct GNUNET_DV_MessageReceived *received_msg; + size_t packed_msg_len; + size_t sender_address_len; char *sender_address; + char *packed_msg; GNUNET_assert(ntohs(msg->type) == GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE); @@ -258,17 +261,22 @@ void handle_message_receipt (void *cls, return; received_msg = (struct GNUNET_DV_MessageReceived *)msg; - GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + ntohs(received_msg->msg->size) + ntohs(received_msg->sender_address_len))); + packed_msg_len = ntohs(received_msg->msg_len); + sender_address_len = ntohs(received_msg->sender_address_len); + GNUNET_assert(ntohs(msg->size) == (sizeof(struct GNUNET_DV_MessageReceived) + packed_msg_len + sender_address_len)); - sender_address = GNUNET_malloc(ntohs(received_msg->sender_address_len)); - sender_address = memcpy(sender_address, &received_msg[1], ntohs(received_msg->sender_address_len)); + sender_address = GNUNET_malloc(sender_address_len); + memcpy(sender_address, &received_msg[1], sender_address_len); + packed_msg = GNUNET_malloc(packed_msg_len); + memcpy(packed_msg, &received_msg[1 + sender_address_len], packed_msg_len); handle->receive_handler(handle->receive_cls, &received_msg->sender, - received_msg->msg, + packed_msg, + packed_msg_len, ntohl(received_msg->distance), sender_address, - ntohs(received_msg->sender_address_len)); + sender_address_len); GNUNET_free(sender_address); @@ -281,6 +289,7 @@ void handle_message_receipt (void *cls, * Send a message from the plugin to the DV service indicating that * a message should be sent via DV to some peer. * + * @param dv_handle the handle to the DV api * @param target the final target of the message * @param msgbuf the msg(s) to send * @param msgbuf_size the size of msgbuf diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index bb5e03193..0e182fca8 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c @@ -67,18 +67,18 @@ static struct GNUNET_SCHEDULER_Handle *sched; * How often do we check about sending out more peer information (if * we are connected to no peers previously). */ -#define GNUNET_DV_DEFAULT_SEND_INTERVAL (500 * GNUNET_CRON_MILLISECONDS) +#define GNUNET_DV_DEFAULT_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 500)) /** * How long do we wait at most between sending out information? */ -#define GNUNET_DV_MAX_SEND_INTERVAL (5000 * GNUNET_CRON_MILLISECONDS) +#define GNUNET_DV_MAX_SEND_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5)) /** * How long can we have not heard from a peer and * still have it in our tables? */ -#define GNUNET_DV_PEER_EXPIRATION_TIME (3000 * GNUNET_CRON_SECONDS) +#define GNUNET_DV_PEER_EXPIRATION_TIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1000)) /** * Priority for gossip. @@ -89,12 +89,12 @@ static struct GNUNET_SCHEDULER_Handle *sched; * How often should we check if expiration time has elapsed for * some peer? */ -#define GNUNET_DV_MAINTAIN_FREQUENCY (5 * GNUNET_CRON_SECONDS) +#define GNUNET_DV_MAINTAIN_FREQUENCY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5)) /** * How long to allow a message to be delayed? */ -#define DV_DELAY (5000 * GNUNET_CRON_MILLISECONDS) +#define DV_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5)) /** * Priority to use for DV data messages. @@ -108,13 +108,52 @@ static struct GNUNET_SCHEDULER_Handle *sched; */ static struct GNUNET_SERVER_Client * client_handle; +/** + * Task to run when we shut down, cleaning up all our trash + */ GNUNET_SCHEDULER_TaskIdentifier cleanup_task; +/** + * Task to run to gossip about peers. Will reschedule itself forever until shutdown! + */ +GNUNET_SCHEDULER_TaskIdentifier gossip_task; + /** * Struct where neighbor information is stored. */ struct DistantNeighbor *referees; +static struct GNUNET_TIME_Relative client_transmit_timeout; + +static struct GNUNET_TIME_Relative default_dv_delay; + +static size_t default_dv_priority = 0; + + +/** + * Pending message struct, also a test to see if these can + * safely ONLY be freed by callback. + */ +struct PendingMessage +{ + /** + * Copy of message to be sent + */ + struct GNUNET_MessageHeader *msg; + + /** + * Size of message to be sent + */ + size_t msg_size; + + /** + * Transmit handle, for cancellation if necessary. + */ + struct GNUNET_CORE_TransmitHandle *transmit_handle; + +}; + + /** * Struct to hold information for updating existing neighbors */ @@ -273,14 +312,13 @@ struct GNUNET_DV_Context unsigned long long max_table_size; - unsigned int send_interval; + struct GNUNET_TIME_Relative send_interval; unsigned int neighbor_id_loc; int closing; -}; -static char shortID[5]; +}; static struct GNUNET_DV_Context ctx; @@ -342,27 +380,136 @@ void send_to_plugin(const struct GNUNET_PeerIdentity * sender, const struct GNUN struct GNUNET_DV_MessageReceived *received_msg; int size; - if (ntohs(msg->size) < sizeof(struct GNUNET_DV_MessageReceived)) - return; - - size = sizeof(struct GNUNET_DV_MessageReceived) + message_size + sizeof(struct GNUNET_PeerIdentity); + size = sizeof(struct GNUNET_DV_MessageReceived) + sizeof(struct GNUNET_PeerIdentity) + message_size; received_msg = GNUNET_malloc(size); received_msg->header.size = htons(size); received_msg->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_DV_RECEIVE); received_msg->sender_address_len = sizeof(struct GNUNET_PeerIdentity); received_msg->distance = htonl(distant_neighbor->cost); + received_msg->msg_len = htons(message_size); /* Set the sender in this message to be the original sender! */ memcpy(&received_msg->sender, &distant_neighbor->identity, sizeof(struct GNUNET_PeerIdentity)); /* Copy the intermediate sender to the end of the message, this is how the transport identifies this peer */ memcpy(&received_msg[1], sender, sizeof(struct GNUNET_PeerIdentity)); + /* Copy the actual message after the sender */ + memcpy(&received_msg[1 + sizeof(struct GNUNET_PeerIdentity)], message, message_size); /* FIXME: Send to the client please */ GNUNET_SERVER_notify_transmit_ready (client_handle, - size, CLIENT_TRANSMIT_TIMEOUT, + size, client_transmit_timeout, &transmit_to_plugin, &received_msg); } + +/** + * Function called to notify a client about the socket + * begin ready to queue more data. "buf" will be + * NULL and "size" zero if the socket was closed for + * writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +size_t core_transmit_notify (void *cls, + size_t size, void *buf) +{ + struct PendingMessage *pending_message = cls; + char *send_buf = buf; + if ((buf == NULL) || (size < pending_message->msg_size)) + { + return 0; + } + + memcpy(send_buf, pending_message->msg, pending_message->msg_size); + + GNUNET_free(pending_message->msg); + GNUNET_free(pending_message); + + return size; +} + +/** + * Send a DV data message via DV. + * + * @param recipient the ultimate recipient of this message + * @param the original sender of the message + * @param message the packed message + * @param importance what priority to send this message with + * @param timeout how long to possibly delay sending this message + */ +static int +send_message (const struct GNUNET_PeerIdentity * recipient, + const struct GNUNET_PeerIdentity * sender, + const struct GNUNET_MessageHeader * message, + unsigned int importance, struct GNUNET_TIME_Relative timeout) +{ + p2p_dv_MESSAGE_Data *toSend; + unsigned int msg_size; + unsigned int cost; + unsigned int recipient_id; + unsigned int sender_id; + struct DistantNeighbor *target; + struct DistantNeighbor *source; + struct PendingMessage *pending_message; + + msg_size = ntohs (message->size) + sizeof (p2p_dv_MESSAGE_Data); + + target = GNUNET_CONTAINER_multihashmap_get (ctx.extended_neighbors, + &recipient->hashPubKey); + if (target == NULL) + { + /* target unknown to us, drop! */ + return GNUNET_SYSERR; + } + recipient_id = target->referrer_id; + + source = GNUNET_CONTAINER_multihashmap_get (ctx.extended_neighbors, + &sender->hashPubKey); + if (source == NULL) + { + if (0 != (memcmp (my_identity, + sender, sizeof (struct GNUNET_PeerIdentity)))) + { + /* sender unknown to us, drop! */ + return GNUNET_SYSERR; + } + sender_id = 0; /* 0 == us */ + } + else + { + /* find out the number that we use when we gossip about + the sender */ + sender_id = source->our_id; + } + + cost = target->cost; + pending_message = GNUNET_malloc(sizeof(struct PendingMessage)); + pending_message->msg = GNUNET_malloc (msg_size); + toSend = (p2p_dv_MESSAGE_Data *)pending_message->msg; + toSend->header.size = htons (msg_size); + toSend->header.type = htons (GNUNET_MESSAGE_TYPE_DV_DATA); + toSend->sender = htonl (sender_id); + toSend->recipient = htonl (recipient_id); + memcpy (&toSend[1], message, ntohs (message->size)); + pending_message->msg_size = msg_size; + + pending_message->transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, importance, timeout, &target->referrer->identity, msg_size, &core_transmit_notify, pending_message); + if (NULL == pending_message->transmit_handle) + { + GNUNET_free (pending_message->msg); + GNUNET_free (pending_message); + return GNUNET_SYSERR; + } + + /*coreAPI->ciphertext_send (&target->referrer->identity, + &toSend->header, importance, maxdelay);*/ + return (int) cost; +} + + /** * Core handler for dv data messages. Whatever this message * contains all we really have to do is rip it out of its @@ -434,9 +581,6 @@ static int handle_dv_data_message (void *cls, if ( (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP) && (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA) ) { - /* FIXME: send the message, wrap it up and return it to the DV plugin */ - /*coreAPI->loopback_send (&original_sender, (const char *) packed_message, - ntohs (packed_message->size), GNUNET_YES, NULL);*/ send_to_plugin(peer, packed_message, ntohs(packed_message->size), pos); } @@ -463,15 +607,93 @@ static int handle_dv_data_message (void *cls, return GNUNET_OK; } + /* At this point we have a message, and we need to forward it on to the + * next DV hop. + */ /* FIXME: Can't send message on, we have to behave. * We have to tell core we have a message for the next peer, and let * transport do transport selection on how to get this message to 'em */ /*ret = send_message (&destination, &original_sender, packed_message, DV_PRIORITY, DV_DELAY);*/ - send_to_core(&destination, &original_sender, packed_message, DV_PRIORITY, DV_DELAY); + ret = send_message(&destination, &original_sender, packed_message, default_dv_priority, default_dv_delay); - return GNUNET_OK; + if (ret != GNUNET_SYSERR) + return GNUNET_OK; + else + return GNUNET_SYSERR; +} + + +/** + * Thread which chooses a peer to gossip about and a peer to gossip + * to, then constructs the message and sends it out. Will run until + * done_module_dv is called. + */ +static void +neighbor_send_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ +#if DEBUG_DV + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Entering neighbor_send_thread...\n", + GNUNET_i2s(my_identity)); + char * encPeerAbout; + char * encPeerTo; +#endif + struct DistantNeighbor *about; + struct DirectNeighbor *to; + + p2p_dv_MESSAGE_NeighborInfo *message; + struct PendingMessage *pending_message; + + if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) + return; + + about = GNUNET_CONTAINER_heap_walk_get_next (ctx.neighbor_min_heap); + to = GNUNET_CONTAINER_heap_get_random (ctx.neighbor_min_heap, GNUNET_CONTAINER_multihashmap_size(ctx.direct_neighbors)); + + if ((about != NULL) && (to != about->referrer /* split horizon */ ) && +#if SUPPORT_HIDING + (about->hidden == GNUNET_NO) && +#endif + (to != NULL) && + (0 != memcmp (&about->identity, + &to->identity, sizeof (struct GNUNET_PeerIdentity)))) + { +#if DEBUG_DV + encPeerAbout = GNUNET_strdup(GNUNET_i2s(&about->identity)); + encPeerTo = GNUNET_strdup(GNUNET_i2s(&to->identity)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Sending info about peer %s to directly connected peer %s\n", + GNUNET_i2s(my_identity), + encPeerAbout, encPeerTo); +#endif + pending_message = GNUNET_malloc(sizeof(struct PendingMessage)); + pending_message->msg = GNUNET_malloc(sizeof(p2p_dv_MESSAGE_NeighborInfo)); + message = (p2p_dv_MESSAGE_NeighborInfo *)pending_message->msg; + message->header.size = htons (sizeof (p2p_dv_MESSAGE_NeighborInfo)); + message->header.type = htons (GNUNET_MESSAGE_TYPE_DV_GOSSIP); + message->cost = htonl (about->cost); + message->neighbor_id = htonl (about->our_id); + memcpy (&message->neighbor, + &about->identity, sizeof (struct GNUNET_PeerIdentity)); + + pending_message->transmit_handle = GNUNET_CORE_notify_transmit_ready(coreAPI, default_dv_priority, default_dv_delay, &to->identity, sizeof(p2p_dv_MESSAGE_NeighborInfo), &core_transmit_notify, pending_message); + + if (NULL == pending_message->transmit_handle) + { + GNUNET_free (pending_message->msg); + GNUNET_free (pending_message); + return; + } + /*coreAPI->ciphertext_send (&to->identity, &message.header, + GNUNET_DV_DHT_GOSSIP_PRIORITY, + ctx.send_interval);*/ + } + + gossip_task = GNUNET_SCHEDULER_add_delayed(sched, ctx.send_interval, &neighbor_send_task, NULL); + return; } /** @@ -840,6 +1062,9 @@ run (void *cls, timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5); sched = scheduler; cfg = c; + + client_transmit_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5); + default_dv_delay = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5); GNUNET_SERVER_add_handlers (server, plugin_handlers); coreAPI = GNUNET_CORE_connect (sched, @@ -862,6 +1087,7 @@ run (void *cls, /* Scheduled the task to clean up when shutdown is called */ + gossip_task = GNUNET_SCHEDULER_add_delayed(sched, ctx.send_interval, &neighbor_send_task, NULL); cleanup_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, -- 2.25.1