From cc9d16c9a38f3516fcfc3ddf62c4b9775ad0b220 Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Fri, 25 Jun 2010 09:49:00 +0000 Subject: [PATCH] trying message queues on receiver side --- src/dv/gnunet-service-dv.c | 125 +++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 46 deletions(-) diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index 00de78aae..fdff7e666 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c @@ -242,6 +242,38 @@ struct NeighborUpdateInfo }; +/** + * Struct to store a single message received with + * an unknown sender. + */ +struct UnknownSenderMessage +{ + /** + * Message sender (immediate) + */ + struct GNUNET_PeerIdentity sender; + + /** + * The actual message received + */ + struct GNUNET_MessageHeader *message; + + /** + * Latency of connection + */ + struct GNUNET_TIME_Relative latency; + + /** + * Distance to destination + */ + uint32_t distance; + + /** + * Unknown sender id + */ + uint32_t sender_id; +}; + /** * Struct where actual neighbor information is stored, * referenced by min_heap and max_heap. Freeing dealt @@ -279,6 +311,13 @@ struct DirectNeighbor * from DV? */ int hidden; + + /** + * Save a single message from a direct neighbor from a peer + * we don't know on the chance that it will be gossiped about + * and we can deliver the message. + */ + struct UnknownSenderMessage pending_message; }; @@ -1226,6 +1265,7 @@ void tokenized_message_handler (void *cls, } } +#if DELAY_FORWARDS struct DelayedMessageContext { struct GNUNET_PeerIdentity dest; @@ -1249,10 +1289,12 @@ void send_message_delayed (void *cls, default_dv_priority, msg_ctx->uid, GNUNET_TIME_relative_get_forever()); + GNUNET_free(msg_ctx->message); + GNUNET_free(msg_ctx); } - GNUNET_free(msg_ctx->message); - GNUNET_free(msg_ctx); } +#endif + /** * Core handler for dv data messages. Whatever this message @@ -1267,10 +1309,10 @@ void send_message_delayed (void *cls, * @param distance the distance to the immediate peer */ static int handle_dv_data_message (void *cls, - const struct GNUNET_PeerIdentity * peer, - const struct GNUNET_MessageHeader * message, - struct GNUNET_TIME_Relative latency, - uint32_t distance) + const struct GNUNET_PeerIdentity * peer, + const struct GNUNET_MessageHeader * message, + struct GNUNET_TIME_Relative latency, + uint32_t distance) { const p2p_dv_MESSAGE_Data *incoming = (const p2p_dv_MESSAGE_Data *) message; const struct GNUNET_MessageHeader *packed_message; @@ -1282,7 +1324,9 @@ static int handle_dv_data_message (void *cls, struct GNUNET_PeerIdentity *destination; struct FindDestinationContext fdc; struct TokenizedMessageContext tkm_ctx; +#if DELAY_FORWARDS struct DelayedMessageContext *delayed_context; +#endif #if USE_PEER_ID struct CheckPeerContext checkPeerCtx; #endif @@ -1313,15 +1357,10 @@ static int handle_dv_data_message (void *cls, } dn = GNUNET_CONTAINER_multihashmap_get (direct_neighbors, - &peer->hashPubKey); + &peer->hashPubKey); if (dn == NULL) - { -#if DEBUG_DV - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: dn NULL!\n", "dv"); -#endif - return GNUNET_OK; - } + return GNUNET_OK; + sid = ntohl (incoming->sender); #if USE_PEER_ID if (sid != 0) @@ -1344,11 +1383,10 @@ static int handle_dv_data_message (void *cls, if (pos == NULL) { - direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity)); #if DEBUG_DV_MESSAGES + direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: unknown sender (%u), Message uid %llu from %s!\n", GNUNET_i2s(&my_identity), ntohl(incoming->sender), ntohl(incoming->uid), direct_id); -#endif GNUNET_free(direct_id); pos = dn->referee_head; while ((NULL != pos) && (pos->referrer_id != sid)) @@ -1358,6 +1396,18 @@ static int handle_dv_data_message (void *cls, GNUNET_free(sender_id); pos = pos->next; } +#endif + if (dn->pending_message.sender_id != 0) + { + GNUNET_free(dn->pending_message.message); + } + + dn->pending_message.message = GNUNET_malloc(ntohs (message->size)); + memcpy(dn->pending_message.message, message, ntohs(message->size)); + dn->pending_message.distance = distance; + dn->pending_message.latency = latency; + memcpy(&dn->pending_message.sender, peer, sizeof(struct GNUNET_PeerIdentity)); + dn->pending_message.sender_id = sid; #if DEBUG_MESSAGE_DROP direct_id = GNUNET_strdup(GNUNET_i2s(&dn->identity)); @@ -1388,27 +1438,6 @@ static int handle_dv_data_message (void *cls, GNUNET_break_op(0); GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s: %s Received corrupt data, discarding!", my_short_id, "DV SERVICE"); } -#if NO_MST - offset = 0; - while(offset < packed_message_size) - { - packed_message = (struct GNUNET_MessageHeader *)&cbuf[offset]; - - GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP); - GNUNET_break_op (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA); - if ( (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_GOSSIP) && - (ntohs (packed_message->type) != GNUNET_MESSAGE_TYPE_DV_DATA) ) - { -#if DEBUG_DV_MESSAGES - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Receives %s message(s) for me, uid %u, total size %d cost %u from %s!\n", my_short_id, "DV DATA", ntohl(incoming->uid), ntohs(packed_message->size), pos->cost, GNUNET_i2s(&pos->identity)); -#endif - GNUNET_assert(memcmp(peer, &pos->identity, sizeof(struct GNUNET_PeerIdentity)) != 0); - send_to_plugin(peer, packed_message, ntohs(packed_message->size), &pos->identity, pos->cost); - } - offset += ntohs(packed_message->size); - } -#endif return GNUNET_OK; } else @@ -1436,7 +1465,7 @@ static int handle_dv_data_message (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Receives %s message uid %u for someone we don't know (id %u)!\n", my_short_id, "DV DATA", ntohl(incoming->uid), tid); #endif - return GNUNET_OK; + return GNUNET_OK; } destination = &fdc.dest->identity; @@ -1458,18 +1487,12 @@ static int handle_dv_data_message (void *cls, /* At this point we have a message, and we need to forward it on to the * next DV hop. */ - /* FIXME: Can't send message on, we have to behave. - * We have to tell core we have a message for the next peer, and let - * transport do transport selection on how to get this message to 'em */ - /*ret = send_message (&destination, - &original_sender, - packed_message, DV_PRIORITY, DV_DELAY);*/ - #if DEBUG_DV_MESSAGES GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: FORWARD %s message for %s, uid %u, size %d type %d, cost %u!\n", my_short_id, "DV DATA", GNUNET_i2s(destination), ntohl(incoming->uid), ntohs(packed_message->size), ntohs(packed_message->type), pos->cost); #endif +#if DELAY_FORWARDS if (GNUNET_TIME_absolute_get_duration(pos->last_gossip).value < GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 2).value) { delayed_context = GNUNET_malloc(sizeof(struct DelayedMessageContext)); @@ -1480,10 +1503,10 @@ static int handle_dv_data_message (void *cls, delayed_context->message_size = packed_message_size; delayed_context->uid = ntohl(incoming->uid); GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 2500), &send_message_delayed, delayed_context); - //GNUNET_SCHEDULER_add_now(sched, &send_message_delayed, delayed_context); return GNUNET_OK; } else +#endif { ret = send_message(destination, original_sender, @@ -2425,6 +2448,16 @@ addUpdateNeighbor (const struct GNUNET_PeerIdentity * peer, struct GNUNET_CRYPTO neighbor, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + if ((referrer_peer_id != 0) && (referrer->pending_message.sender_id == referrer_peer_id)) /* We have a queued message from just learned about peer! */ + { +#if DEBUG_DV_MESSAGES + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s: learned about peer %llu from which we have a previous unknown message, processing!\n", my_short_id, referrer_peer_id); +#endif + handle_dv_data_message(NULL, &referrer->pending_message.sender, referrer->pending_message.message, referrer->pending_message.latency, referrer->pending_message.distance); + GNUNET_free(referrer->pending_message.message); + referrer->pending_message.sender_id = 0; + } + if (cost != DIRECT_NEIGHBOR_COST) { /* Added neighbor, now send HELLO to transport */ -- 2.25.1