From 25d2cf2ca1882482d535c8ad0d879b9846e12ea2 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 12 Aug 2011 15:49:57 +0000 Subject: [PATCH] finishing neighbours --- .../gnunet-service-transport_neighbours.c | 228 +++++++++++++----- src/transport/plugin_transport_tcp.c | 6 +- 2 files changed, 175 insertions(+), 59 deletions(-) diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c index f30878966..f262c2b87 100644 --- a/src/transport/gnunet-service-transport_neighbours.c +++ b/src/transport/gnunet-service-transport_neighbours.c @@ -26,6 +26,7 @@ #include "platform.h" #include "gnunet_ats_service.h" #include "gnunet-service-transport_neighbours.h" +#include "gnunet-service-transport_plugins.h" #include "gnunet-service-transport_validation.h" #include "gnunet-service-transport.h" #include "gnunet_peerinfo_service.h" @@ -45,15 +46,12 @@ #define QUOTA_VIOLATION_DROP_THRESHOLD 10 -// TODO: -// - have a way to access the currently 'connected' session -// (for sending and to notice disconnect of it!) -// - have a way to access/update bandwidth/quota information per peer -// (for CostReport/TrafficReport callbacks) - - +/** + * Entry in neighbours. + */ struct NeighbourMapEntry; + /** * For each neighbour we keep a list of messages * that we still want to transmit to the neighbour. @@ -71,6 +69,12 @@ struct MessageQueue */ struct MessageQueue *prev; + /** + * Once this message is actively being transmitted, which + * neighbour is it associated with? + */ + struct NeighbourMapEntry *n; + /** * Function to call once we're done. */ @@ -129,6 +133,11 @@ struct NeighbourMapEntry */ struct GNUNET_TRANSPORT_ATS_Information *ats; + /** + * Are we currently trying to send a message? If so, which one? + */ + struct MessageQueue *is_active; + /** * Active session for communicating with the peer. */ @@ -161,18 +170,10 @@ struct NeighbourMapEntry GNUNET_SCHEDULER_TaskIdentifier timeout_task; /** - * ID of task scheduled to run when we should retry transmitting - * the head of the message queue. Actually triggered when the - * transmission is timing out (we trigger instantly when we have - * a chance of success). - */ - GNUNET_SCHEDULER_TaskIdentifier retry_task; - - /** - * How long until we should consider this peer dead (if we don't - * receive another message in the meantime)? + * ID of task scheduled to run when we should try transmitting + * the head of the message queue. */ - struct GNUNET_TIME_Absolute peer_timeout; + GNUNET_SCHEDULER_TaskIdentifier transmission_task; /** * Tracker for inbound bandwidth. @@ -191,16 +192,10 @@ struct NeighbourMapEntry */ unsigned int ats_count; - /** - * Have we seen an PONG from this neighbour in the past (and - * not had a disconnect since)? - */ - // int received_pong; - /** * Are we already in the process of disconnecting this neighbour? */ - // int in_disconnect; + int in_disconnect; /** * Do we currently consider this neighbour connected? (as far as @@ -246,7 +241,49 @@ lookup_neighbour (const struct GNUNET_PeerIdentity *pid) } -#if 0 +/** + * Task invoked to start a transmission to another peer. + * + * @param cls the 'struct NeighbourMapEntry' + * @param tc scheduler context + */ +static void +transmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * We're done with our transmission attempt, continue processing. + * + * @param cls the 'struct MessageQueue' of the message + * @param receiver intended receiver + * @param success whether it worked or not + */ +static void +transmit_send_continuation (void *cls, + const struct GNUNET_PeerIdentity *receiver, + int success) +{ + struct MessageQueue *mq; + struct NeighbourMapEntry *n; + + mq = cls; + n = mq->n; + if (NULL != n) + { + GNUNET_assert (n->is_active == mq); + n->is_active = NULL; + GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); + n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, + n); + } + if (NULL != mq->cont) + mq->cont (mq->cont_cls, + success); + GNUNET_free (mq); +} + + /** * Check the ready list for the given neighbour and if a plugin is * ready for transmission (and if we have a message), do so! @@ -259,41 +296,73 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) struct MessageQueue *mq; struct GNUNET_TIME_Relative timeout; ssize_t ret; + struct GNUNET_TRANSPORT_PluginFunctions *papi; - if (n->messages_head == NULL) + if (n->is_active != NULL) + return; /* transmission already pending */ + if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK) + return; /* currently waiting for bandwidth */ + mq = n->messages_head; + while (NULL != (mq = n->messages_head)) { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission queue for `%4s' is empty\n", - GNUNET_i2s (&n->id)); -#endif - return; /* nothing to do */ + timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); + if (timeout.rel_value > 0) + break; + transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */ + } + if (NULL == mq) + return; /* no more messages */ + + papi = GST_plugins_find (n->plugin_name); + if (papi == NULL) + { + GNUNET_break (0); + return; } - mq = n->messages_head; GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); + n->is_active = mq; + mq->n = n; ret = papi->send (papi->cls, - &n->pid, + &n->id, mq->message_buf, mq->message_buf_size, - mq->priority, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + 0 /* priority -- remove from plugin API? */, + timeout, n->session, n->addr, n->addrlen, - GNUNET_YES /*?*/, + GNUNET_YES, &transmit_send_continuation, mq); if (ret == -1) { /* failure, but 'send' would not call continuation in this case, so we need to do it here! */ transmit_send_continuation (mq, - &mq->neighbour_id, + &n->id, GNUNET_SYSERR); + n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, + n); } } -#endif + + +/** + * Task invoked to start a transmission to another peer. + * + * @param cls the 'struct NeighbourMapEntry' + * @param tc scheduler context + */ +static void +transmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourMapEntry *n = cls; + + n->transmission_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); +} /** @@ -325,22 +394,41 @@ disconnect_neighbour (struct NeighbourMapEntry *n) { struct MessageQueue *mq; - if (n->is_connected) + if (GNUNET_YES == n->in_disconnect) + return; + n->in_disconnect = GNUNET_YES; + while (NULL != (mq = n->messages_head)) { + GNUNET_CONTAINER_DLL_remove (n->messages_head, + n->messages_tail, + mq); + mq->cont (mq->cont_cls, GNUNET_SYSERR); + GNUNET_free (mq); + } + if (NULL != n->is_active) + { + n->is_active->n = NULL; + n->is_active = NULL; + } + if (GNUNET_YES == n->is_connected) + { + n->is_connected = GNUNET_NO; disconnect_notify_cb (callback_cls, &n->id); - n->is_connected = GNUNET_NO; } GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (neighbours, &n->id.hashPubKey, n)); - while (NULL != (mq = n->messages_head)) + if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task) { - GNUNET_CONTAINER_DLL_remove (n->messages_head, - n->messages_tail, - mq); - GNUNET_free (mq); + GNUNET_SCHEDULER_cancel (n->timeout_task); + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task) + { + GNUNET_SCHEDULER_cancel (n->timeout_task); + n->transmission_task = GNUNET_SCHEDULER_NO_TASK; } if (NULL != n->asc) { @@ -446,6 +534,7 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, uint32_t ats_count) { struct NeighbourMapEntry *n; + struct GNUNET_MessageHeader connect_msg; n = lookup_neighbour (peer); if (NULL == n) @@ -466,6 +555,17 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); GNUNET_free_non_null (n->plugin_name); n->plugin_name = GNUNET_strdup (plugin_name); + GNUNET_SCHEDULER_cancel (n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); + connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); + GST_neighbours_send (peer, + &connect_msg, + sizeof (connect_msg), + GNUNET_TIME_UNIT_FOREVER_REL, + NULL, NULL); } @@ -564,7 +664,7 @@ GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) n = lookup_neighbour (target); if ( (NULL == n) || - (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) ) + (n->is_connected == GNUNET_YES) ) return GNUNET_NO; /* not connected */ return GNUNET_YES; } @@ -593,7 +693,7 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, n = lookup_neighbour (target); if ( (n == NULL) || - (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) ) + (GNUNET_YES != n->is_connected) ) { GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# SET QUOTA messages ignored (no such peer)"), @@ -620,7 +720,10 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq); - // try_transmission_to_peer (n); + if ( (GNUNET_SCHEDULER_NO_TASK == n->transmission_task) && + (NULL == n->is_active) ) + n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, + n); } @@ -667,9 +770,6 @@ GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity *sender n->quota_violation_count--; } } - n->peer_timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); GNUNET_SCHEDULER_cancel (n->timeout_task); n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, @@ -773,8 +873,8 @@ neighbours_iterate (void *cls, struct IteratorContext *ic = cls; struct NeighbourMapEntry *n = value; - if (GNUNET_TIME_absolute_get_remaining (n->peer_timeout).rel_value == 0) - return GNUNET_OK; /* not connected */ + if (GNUNET_YES != n->is_connected) + return GNUNET_OK; GNUNET_assert (n->ats_count > 0); ic->cb (ic->cb_cls, &n->id, @@ -813,9 +913,25 @@ void GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) { struct NeighbourMapEntry *n; + struct GNUNET_TRANSPORT_PluginFunctions *papi; + struct GNUNET_MessageHeader disconnect_msg; n = lookup_neighbour (target); - /* FIXME: send disconnect message to target... */ + disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); + disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); + papi = GST_plugins_find (n->plugin_name); + if (papi != NULL) + papi->send (papi->cls, + target, + (const void*) &disconnect_msg, + sizeof (struct GNUNET_MessageHeader), + UINT32_MAX /* priority */, + GNUNET_TIME_UNIT_FOREVER_REL, + n->session, + n->addr, + n->addrlen, + GNUNET_YES, + NULL, NULL); disconnect_neighbour (n); } diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index e796dacf4..2c20ba35e 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -834,6 +834,9 @@ disconnect_session (struct Session *session) (session->transmit_handle); session->transmit_handle = NULL; } + session->plugin->env->session_end (session->plugin->env->cls, + &session->target, + session); while (NULL != (pm = session->pending_messages_head)) { #if DEBUG_TCP @@ -878,9 +881,6 @@ disconnect_session (struct Session *session) -1, GNUNET_NO); GNUNET_free_non_null (session->connect_addr); - session->plugin->env->session_end (session->plugin->env->cls, - &session->target, - session); GNUNET_assert (NULL == session->transmit_handle); GNUNET_free (session); } -- 2.25.1