From 5133e679d1c77f276ea7a23f2c054f61fa61ac08 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 7 Aug 2011 06:04:52 +0000 Subject: [PATCH] towards neighbour management --- .../gnunet-service-transport_neighbours.c | 539 ++++++++++++++++++ 1 file changed, 539 insertions(+) diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c index f961ea4f2..a3fdab9b7 100644 --- a/src/transport/gnunet-service-transport_neighbours.c +++ b/src/transport/gnunet-service-transport_neighbours.c @@ -26,6 +26,14 @@ #include "platform.h" #include "gnunet-service-transport_neighbours.h" #include "gnunet-service-transport.h" +#include "gnunet_constants.h" + + +/** + * Size of the neighbour hash map. + */ +#define NEIGHBOUR_TABLE_SIZE 256 + // TODO: // - have a way to access the currently 'connected' session @@ -34,6 +42,498 @@ // (for CostReport/TrafficReport callbacks) +struct NeighbourMapEntry; + +/** + * For each neighbour we keep a list of messages + * that we still want to transmit to the neighbour. + */ +struct MessageQueue +{ + + /** + * This is a doubly linked list. + */ + struct MessageQueue *next; + + /** + * This is a doubly linked list. + */ + struct MessageQueue *prev; + + /** + * The message(s) we want to transmit, GNUNET_MessageHeader(s) + * stuck together in memory. Allocated at the end of this struct. + */ + const char *message_buf; + + /** + * Size of the message buf + */ + size_t message_buf_size; + + /** + * Client responsible for queueing the message; used to check that a + * client has no two messages pending for the same target and to + * notify the client of a successful transmission; NULL if this is + * an internal message. + */ + struct TransportClient *client; + + /** + * At what time should we fail? + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Internal message of the transport system that should not be + * included in the usual SEND-SEND_OK transmission confirmation + * traffic management scheme. Typically, "internal_msg" will + * be set whenever "client" is NULL (but it is not strictly + * required). + */ + int internal_msg; + + /** + * How important is the message? + */ + unsigned int priority; + +}; + + + +/** + * Entry in neighbours. + */ +struct NeighbourMapEntry +{ + + /** + * Head of list of messages we would like to send to this peer; + * must contain at most one message per client. + */ + struct MessageQueue *messages_head; + + /** + * Tail of list of messages we would like to send to this peer; must + * contain at most one message per client. + */ + struct MessageQueue *messages_tail; + + /** + * Context for peerinfo iteration. + * NULL after we are done processing peerinfo's information. + */ + struct GNUNET_PEERINFO_IteratorContext *piter; + + /** + * Performance data for the peer. + */ + struct GNUNET_TRANSPORT_ATS_Information *ats; + + /** + * Public key for this peer. Valid only if the respective flag is set below. + */ + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; + + /** + * Identity of this neighbour. + */ + struct GNUNET_PeerIdentity id; + + /** + * ID of task scheduled to run when this peer is about to + * time out (will free resources associated with the peer). + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * ID of task scheduled to run when we should retry transmitting + * the head of the message queue. Actually triggered when the + * transmission is timing out (we trigger instantly when we have + * a chance of success). + */ + GNUNET_SCHEDULER_TaskIdentifier retry_task; + + /** + * How long until we should consider this peer dead (if we don't + * receive another message in the meantime)? + */ + struct GNUNET_TIME_Absolute peer_timeout; + + /** + * Tracker for inbound bandwidth. + */ + struct GNUNET_BANDWIDTH_Tracker in_tracker; + + /** + * The latency we have seen for this particular address for + * this particular peer. This latency may have been calculated + * over multiple transports. This value reflects how long it took + * us to receive a response when SENDING via this particular + * transport/neighbour/address combination! + * + * FIXME: we need to periodically send PINGs to update this + * latency (at least more often than the current "huge" (11h?) + * update interval). + */ + struct GNUNET_TIME_Relative latency; + + /** + * How often has the other peer (recently) violated the inbound + * traffic limit? Incremented by 10 per violation, decremented by 1 + * per non-violation (for each time interval). + */ + unsigned int quota_violation_count; + + /** + * DV distance to this peer (1 if no DV is used). + */ + uint32_t distance; + + /** + * Have we seen an PONG from this neighbour in the past (and + * not had a disconnect since)? + */ + int received_pong; + + /** + * Do we have a valid public key for this neighbour? + */ + int public_key_valid; + + /** + * Are we already in the process of disconnecting this neighbour? + */ + int in_disconnect; + +}; + + +/** + * All known neighbours and their HELLOs. + */ +static struct GNUNET_CONTAINER_MultiHashMap *neighbours; + +/** + * Closure for connect_notify_cb and disconnect_notify_cb + */ +static void *callback_cls; + +/** + * Function to call when we connected to a neighbour. + */ +static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb; + +/** + * Function to call when we disconnected from a neighbour. + */ +static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb; + + +#if 0 +/** + * Check the ready list for the given neighbour and if a plugin is + * ready for transmission (and if we have a message), do so! + * + * @param neighbour target peer for which to transmit + */ +static void +try_transmission_to_peer (struct NeighbourMapEntry *n) +{ + struct ReadyList *rl; + struct MessageQueue *mq; + struct GNUNET_TIME_Relative timeout; + ssize_t ret; + int force_address; + + if (n->messages_head == NULL) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission queue for `%4s' is empty\n", + GNUNET_i2s (&n->id)); +#endif + return; /* nothing to do */ + } + rl = NULL; + mq = n->messages_head; + force_address = GNUNET_YES; + if (mq->specific_address == NULL) + { + /* TODO: ADD ATS */ + mq->specific_address = get_preferred_ats_address(n); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# transport selected peer address freely"), + 1, + GNUNET_NO); + force_address = GNUNET_NO; + } + if (mq->specific_address == NULL) + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# transport failed to selected peer address"), + 1, + GNUNET_NO); + timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); + if (timeout.rel_value == 0) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No destination address available to transmit message of size %u to peer `%4s'\n", + mq->message_buf_size, + GNUNET_i2s (&mq->neighbour_id)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes in message queue for other peers"), + - (int64_t) mq->message_buf_size, + GNUNET_NO); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes discarded (no destination address available)"), + mq->message_buf_size, + GNUNET_NO); + if (mq->client != NULL) + transmit_send_ok (mq->client, n, &n->id, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (n->messages_head, + n->messages_tail, + mq); + GNUNET_free (mq); + return; /* nobody ready */ + } + GNUNET_STATISTICS_update (stats, + gettext_noop ("# message delivery deferred (no address)"), + 1, + GNUNET_NO); + if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (n->retry_task); + n->retry_task = GNUNET_SCHEDULER_add_delayed (timeout, + &retry_transmission_task, + n); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n", + mq->message_buf_size, + GNUNET_i2s (&mq->neighbour_id), + timeout.rel_value); +#endif + /* FIXME: might want to trigger peerinfo lookup here + (unless that's already pending...) */ + return; + } + GNUNET_CONTAINER_DLL_remove (n->messages_head, + n->messages_tail, + mq); + if (mq->specific_address->connected == GNUNET_NO) + mq->specific_address->connect_attempts++; + rl = mq->specific_address->ready_list; + mq->plugin = rl->plugin; + if (!mq->internal_msg) + mq->specific_address->in_transmit = GNUNET_YES; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n", + mq->message_buf_size, + GNUNET_i2s (&n->id), + (mq->specific_address->addr != NULL) + ? a2s (mq->plugin->short_name, + mq->specific_address->addr, + mq->specific_address->addrlen) + : "", + rl->plugin->short_name); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes in message queue for other peers"), + - (int64_t) mq->message_buf_size, + GNUNET_NO); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes pending with plugins"), + mq->message_buf_size, + GNUNET_NO); + + GNUNET_CONTAINER_DLL_insert (n->cont_head, + n->cont_tail, + mq); + + ret = rl->plugin->api->send (rl->plugin->api->cls, + &mq->neighbour_id, + mq->message_buf, + mq->message_buf_size, + mq->priority, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + mq->specific_address->session, + mq->specific_address->addr, + mq->specific_address->addrlen, + force_address, + &transmit_send_continuation, mq); + if (ret == -1) + { + /* failure, but 'send' would not call continuation in this case, + so we need to do it here! */ + transmit_send_continuation (mq, + &mq->neighbour_id, + GNUNET_SYSERR); + } +} + + +/** + * Send the specified message to the specified peer. + * + * @param client source of the transmission request (can be NULL) + * @param peer_address ForeignAddressList where we should send this message + * @param priority how important is the message + * @param timeout how long do we have to transmit? + * @param message_buf message(s) to send GNUNET_MessageHeader(s) + * @param message_buf_size total size of all messages in message_buf + * @param is_internal is this an internal message; these are pre-pended and + * also do not count for plugins being "ready" to transmit + * @param neighbour handle to the neighbour for transmission + */ +static void +transmit_to_peer (struct TransportClient *client, + struct ForeignAddressList *peer_address, + unsigned int priority, + struct GNUNET_TIME_Relative timeout, + const char *message_buf, + size_t message_buf_size, + int is_internal, struct NeighbourMapEntry *neighbour) +{ + struct MessageQueue *mq; + +#if EXTRA_CHECKS + if (client != NULL) + { + /* check for duplicate submission */ + mq = neighbour->messages_head; + while (NULL != mq) + { + if (mq->client == client) + { + /* client transmitted to same peer twice + before getting SEND_OK! */ + GNUNET_break (0); + return; + } + mq = mq->next; + } + } +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes in message queue for other peers"), + message_buf_size, + GNUNET_NO); + mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size); + mq->specific_address = peer_address; + mq->client = client; + /* FIXME: this memcpy can be up to 7% of our total runtime! */ + memcpy (&mq[1], message_buf, message_buf_size); + mq->message_buf = (const char*) &mq[1]; + mq->message_buf_size = message_buf_size; + memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity)); + mq->internal_msg = is_internal; + mq->priority = priority; + mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); + if (is_internal) + GNUNET_CONTAINER_DLL_insert (neighbour->messages_head, + neighbour->messages_tail, + mq); + else + GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head, + neighbour->messages_tail, + neighbour->messages_tail, + mq); + try_transmission_to_peer (neighbour); +} + + +/** + * Create a fresh entry in our neighbour list for the given peer. + * Will try to transmit our current HELLO to the new neighbour. + * Do not call this function directly, use 'setup_peer_check_blacklist. + * + * @param peer the peer for which we create the entry + * @param do_hello should we schedule transmitting a HELLO + * @return the new neighbour list entry + */ +static struct NeighbourMapEntry * +setup_new_neighbour (const struct GNUNET_PeerIdentity *peer, + int do_hello) +{ + struct NeighbourMapEntry *n; + struct TransportPlugin *tp; + struct ReadyList *rl; + + GNUNET_assert (0 != memcmp (peer, + &my_identity, + sizeof (struct GNUNET_PeerIdentity))); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Setting up state for neighbour `%4s'\n", + GNUNET_i2s (peer)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# active neighbours"), + 1, + GNUNET_NO); + n = GNUNET_malloc (sizeof (struct NeighbourMapEntry)); + n->id = *peer; + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, + GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, + MAX_BANDWIDTH_CARRY_S); + tp = plugins; + while (tp != NULL) + { + if ((tp->api->send != NULL) && (!is_blacklisted(peer, tp))) + { + rl = GNUNET_malloc (sizeof (struct ReadyList)); + rl->neighbour = n; + rl->next = n->plugins; + n->plugins = rl; + rl->plugin = tp; + rl->addresses = NULL; + } + tp = tp->next; + } + n->latency = GNUNET_TIME_UNIT_FOREVER_REL; + n->distance = -1; + n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + GNUNET_CONTAINER_multihashmap_put (neighbours, + &n->id.hashPubKey, + n, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + if (do_hello) + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# peerinfo new neighbor iterate requests"), + 1, + GNUNET_NO); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# outstanding peerinfo iterate requests"), + 1, + GNUNET_NO); + n->piter = GNUNET_PEERINFO_iterate (peerinfo, peer, + GNUNET_TIME_UNIT_FOREVER_REL, + &add_hello_for_peer, n); + + GNUNET_STATISTICS_update (stats, + gettext_noop ("# HELLO's sent to new neighbors"), + 1, + GNUNET_NO); + if (NULL != our_hello) + transmit_to_peer (NULL, NULL, 0, + HELLO_ADDRESS_EXPIRATION, + (const char *) our_hello, GNUNET_HELLO_size(our_hello), + GNUNET_NO, n); + } + return n; +} +#endif + /** * Initialize the neighbours subsystem. @@ -47,6 +547,37 @@ GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb, GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb) { + callback_cls = cls; + connect_notify_cb = connect_cb; + disconnect_notify_cb = disconnect_cb; + neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE); +} + + +/** + * Disconnect from the given neighbour. + * + * @param cls unused + * @param key hash of neighbour's public key (not used) + * @param value the 'struct NeighbourMapEntry' of the neighbour + */ +static int +disconnect_all_neighbours (void *cls, + const GNUNET_HashCode *key, + void *value) +{ + struct NeighbourMapEntry *n = value; + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Disconnecting peer `%4s', %s\n", + GNUNET_i2s(&n->id), + "SHUTDOWN_TASK"); +#endif + // FIXME: + // disconnect_neighbour (n); + n++; + return GNUNET_OK; } @@ -56,6 +587,14 @@ GST_neighbours_start (void *cls, void GST_neighbours_stop () { + GNUNET_CONTAINER_multihashmap_iterate (neighbours, + &disconnect_all_neighbours, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (neighbours); + neighbours = NULL; + callback_cls = NULL; + connect_notify_cb = NULL; + disconnect_notify_cb = NULL; } -- 2.25.1