From 1af740f9589aee283ef9473e8528b5a9ce76e60d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 14 Apr 2010 19:08:44 +0000 Subject: [PATCH] work in progress --- src/transport/gnunet-service-transport.c | 144 +++++++++++++++-------- 1 file changed, 94 insertions(+), 50 deletions(-) diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index 267f3c4e1..2ac33c3e1 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -24,7 +24,6 @@ * @author Christian Grothoff * * TODO: - * - Need to defer forwarding messages until after CONNECT message * - CHECK that 'address' being NULL in 'struct ForeignAddressList' is * tolerated in the code everywhere (could not happen before) * @@ -452,6 +451,14 @@ struct NeighbourList */ struct MessageQueue *messages_tail; + /** + * Buffer for at most one payload message used when we receive + * payload data before our PING-PONG has succeeded. We then + * store such messages in this intermediary buffer until the + * connection is fully up. + */ + struct GNUNET_MessageHeader *pre_connect_message_buffer; + /** * Context for peerinfo iteration. * NULL after we are done processing peerinfo's information. @@ -2474,6 +2481,82 @@ schedule_next_ping (struct ForeignAddressList *fal) } + + +/** + * Function that will be called if we receive some payload + * from another peer. + * + * @param message the payload + * @param n peer who claimed to be the sender + */ +static void +handle_payload_message (const struct GNUNET_MessageHeader *message, + struct NeighbourList *n) +{ + struct InboundMessage *im; + struct TransportClient *cpos; + uint16_t msize; + + msize = ntohs (message->size); + if (n->received_pong == GNUNET_NO) + { + GNUNET_free_non_null (n->pre_connect_message_buffer); + n->pre_connect_message_buffer = GNUNET_malloc (msize); + memcpy (n->pre_connect_message_buffer, message, msize); + return; + } +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s', sending to all clients.\n", + ntohs (message->type), + GNUNET_i2s (&n->id)); +#endif + if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, + (ssize_t) msize)) + { + n->quota_violation_count++; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bandwidth quota (%u b/s) violation detected (total of %u).\n", + n->in_tracker.available_bytes_per_s__, + n->quota_violation_count); +#endif + /* Discount 32k per violation */ + GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, + - 32 * 1024); + } + else + { + if (n->quota_violation_count > 0) + { + /* try to add 32k back */ + GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, + 32 * 1024); + n->quota_violation_count--; + } + } + GNUNET_STATISTICS_update (stats, + gettext_noop ("# payload received from other peers"), + msize, + GNUNET_NO); + /* transmit message to all clients */ + im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); + im->header.size = htons (sizeof (struct InboundMessage) + msize); + im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + im->latency = GNUNET_TIME_relative_hton (n->latency); + im->peer = n->id; + memcpy (&im[1], message, msize); + cpos = clients; + while (cpos != NULL) + { + transmit_to_client (cpos, &im->header, GNUNET_YES); + cpos = cpos->next; + } + GNUNET_free (im); +} + + /** * Iterator over hash map entries. Checks if the given validation * entry is for the same challenge as what is given in the PONG. @@ -2497,6 +2580,7 @@ check_pending_validation (void *cls, struct GNUNET_PeerIdentity target; struct NeighbourList *n; struct ForeignAddressList *fal; + struct GNUNET_MessageHeader *prem; if (ve->challenge != challenge) return GNUNET_YES; @@ -2565,6 +2649,12 @@ check_pending_validation (void *cls, { n->received_pong = GNUNET_YES; notify_clients_connect (&target, n->latency, n->distance); + if (NULL != (prem = n->pre_connect_message_buffer)) + { + n->pre_connect_message_buffer = NULL; + handle_payload_message (prem, n); + GNUNET_free (prem); + } } if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) { @@ -3106,6 +3196,7 @@ disconnect_neighbour (struct NeighbourList *n, int check) gettext_noop ("# active neighbours"), -1, GNUNET_NO); + GNUNET_free_non_null (n->pre_connect_message_buffer); GNUNET_free (n); } @@ -3257,8 +3348,6 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, { struct TransportPlugin *plugin = cls; struct ReadyList *service_context; - struct TransportClient *cpos; - struct InboundMessage *im; struct ForeignAddressList *peer_address; uint16_t msize; struct NeighbourList *n; @@ -3339,53 +3428,8 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, handle_pong (plugin, message, peer, sender_address, sender_address_len); break; default: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from `%4s', sending to all clients.\n", - ntohs (message->type), GNUNET_i2s (peer)); -#endif - if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, - (ssize_t) msize)) - { - n->quota_violation_count++; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Bandwidth quota (%u b/s) violation detected (total of %u).\n", - n->in_tracker.available_bytes_per_s__, - n->quota_violation_count); -#endif - /* Discount 32k per violation */ - GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, - - 32 * 1024); - } - else - { - if (n->quota_violation_count > 0) - { - /* try to add 32k back */ - GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, - 32 * 1024); - n->quota_violation_count--; - } - } - GNUNET_STATISTICS_update (stats, - gettext_noop ("# payload received from other peers"), - msize, - GNUNET_NO); - /* transmit message to all clients */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); - im->header.size = htons (sizeof (struct InboundMessage) + msize); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (n->latency); - im->peer = *peer; - memcpy (&im[1], message, msize); - cpos = clients; - while (cpos != NULL) - { - transmit_to_client (cpos, &im->header, GNUNET_YES); - cpos = cpos->next; - } - GNUNET_free (im); + handle_payload_message (message, n); + break; } } ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0); -- 2.25.1