X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fgnunet-service-transport.c;h=6899ef5fbc00aff0579ecbfe9b9e7dcd0d702af0;hb=e854e44db02c14e84e5d03a65177138d2c206596;hp=5c6c37378d3676136b3643c5fc600b3b7f578cbd;hpb=0cc9e15f6613fa10058b429de0350e5632d38695;p=oweals%2Fgnunet.git diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index 5c6c37378..6899ef5fb 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -23,11 +23,15 @@ * @brief low-level P2P messaging * @author Christian Grothoff * - * TODO: - * - remove AddressValidations, incorporate them into the PeerAddressLists + * NOTE: + * - This code uses 'GNUNET_a2s' for debug printing in many places, + * which is technically wrong since it assumes we have IP+Port + * (v4/v6) addresses. Once we add transports like http or smtp + * this will have to be changed! */ #include "platform.h" #include "gnunet_client_lib.h" +#include "gnunet_container_lib.h" #include "gnunet_constants.h" #include "gnunet_getopt_lib.h" #include "gnunet_hello_lib.h" @@ -40,6 +44,12 @@ #include "plugin_transport.h" #include "transport.h" +/** + * Should we do some additional checks (to validate behavior + * of clients)? + */ +#define EXTRA_CHECKS GNUNET_YES + /** * How many messages can we have pending for a given client process * before we start to drop incoming messages? We typically should @@ -58,11 +68,18 @@ */ #define MAX_CONNECT_RETRY 3 +/** + * Limit on the number of ready-to-run tasks when validating + * HELLOs. If more tasks are ready to run, we will drop + * HELLOs instead of validating them. + */ +#define MAX_HELLO_LOAD 4 + /** * How often must a peer violate bandwidth quotas before we start * to simply drop its messages? */ -#define QUOTA_VIOLATION_DROP_THRESHOLD 100 +#define QUOTA_VIOLATION_DROP_THRESHOLD 10 /** * How long until a HELLO verification attempt should time out? @@ -79,7 +96,15 @@ */ #define TRANSPORT_DEFAULT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15) -#define TRANSPORT_DEFAULT_PRIORITY 4 /* Tired of remembering arbitrary priority names */ +/** + * How often will we re-validate for latency information + */ +#define TRANSPORT_DEFAULT_REVALIDATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60) + +/** + * Priority to use for PONG messages. + */ +#define TRANSPORT_PONG_PRIORITY 4 /** * How often do we re-add (cheaper) plugins to our list of plugins @@ -88,44 +113,36 @@ #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15) /** - * After how long do we expire an address in a HELLO - * that we just validated? This value is also used - * for our own addresses when we create a HELLO. + * After how long do we expire an address in a HELLO that we just + * validated? This value is also used for our own addresses when we + * create a HELLO. */ #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12) +/** + * How long before an existing address expires should we again try to + * validate it? Must be (significantly) smaller than + * HELLO_ADDRESS_EXPIRATION. + */ +#define HELLO_REVALIDATION_START_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 1) + + /** * List of addresses of other peers */ -struct PeerAddressList +struct ForeignAddressList { /** * This is a linked list. */ - struct PeerAddressList *next; - - /* - * Pointer to the validation associated with this - * address. May be NULL if already validated! - */ - struct ValidationAddress *validation; - - /** - * Which of our transport plugins does this entry - * belong to? - */ - struct TransportPlugin *plugin; + struct ForeignAddressList *next; /** - * Neighbor this entry belongs to. - */ - struct NeighborList *neighbor; - - /* - * Ready list (transport) that this peer belongs to + * Which ready list does this entry belong to. */ struct ReadyList *ready_list; + /** * How long until we auto-expire this address (unless it is * re-confirmed by the transport)? @@ -133,22 +150,20 @@ struct PeerAddressList struct GNUNET_TIME_Absolute expires; /** - * Length of addr. + * Task used to re-validate addresses, updates latencies and + * verifies liveness. */ - size_t addrlen; + GNUNET_SCHEDULER_TaskIdentifier revalidate_task; /** - * The address + * Length of addr. */ - char *addr; + size_t addrlen; /** - * Is this plugin ready to transmit to the specific target? - * GNUNET_NO if not. Initially, all plugins are marked ready. If a - * transmission is in progress, "transmit_ready" is set to - * GNUNET_NO. + * The address. */ - int transmit_ready; + const void *addr; /** * What was the last latency observed for this plugin @@ -166,43 +181,58 @@ struct PeerAddressList struct GNUNET_TIME_Absolute timeout; /** - * Is this plugin currently connected? The first time - * we transmit or send data to a peer via a particular - * plugin, we set this to GNUNET_YES. If we later get - * an error (disconnect notification or transmission - * failure), we set it back to GNUNET_NO. Each time the - * value is set to GNUNET_YES, we increment the - * "connect_attempts" counter. If that one reaches a - * particular threshold, we consider the plugin to not - * be working properly at this time for the given peer - * and remove it from the eligible list. + * Are we currently connected via this address? The first time we + * successfully transmit or receive data to a peer via a particular + * address, we set this to GNUNET_YES. If we later get an error + * (disconnect notification, transmission failure, timeout), we set + * it back to GNUNET_NO. */ int connected; /** - * How often have we tried to connect using this plugin? + * Is this plugin currently busy transmitting to the specific target? + * GNUNET_NO if not (initial, default state is GNUNET_NO). Internal + * messages do not count as 'in transmit'. + */ + int in_transmit; + + /** + * Has this address been validated yet? + */ + int validated; + + /** + * How often have we tried to connect using this plugin? Used to + * discriminate against addresses that do not work well. + * FIXME: not yet used, but should be! */ unsigned int connect_attempts; + /** + * DV distance to this peer (1 if no DV is used). + * FIXME: need to set this from transport plugins! + */ + uint32_t distance; + }; /** - * Entry in linked list of network addresses. + * Entry in linked list of network addresses for ourselves. */ -struct AddressList +struct OwnAddressList { /** * This is a linked list. */ - struct AddressList *next; + struct OwnAddressList *next; /** * The address, actually a pointer to the end * of this struct. Do not free! */ - void *addr; - + const void *addr; + /** * How long until we auto-expire this address (unless it is * re-confirmed by the transport)? @@ -247,7 +277,7 @@ struct TransportPlugin /** * List of our known addresses for this transport. */ - struct AddressList *addresses; + struct OwnAddressList *addresses; /** * Environment this transport service is using @@ -260,7 +290,6 @@ struct TransportPlugin */ GNUNET_SCHEDULER_TaskIdentifier address_update_task; - /** * Set to GNUNET_YES if we need to scrap the existing * list of "addresses" and start fresh when we receive @@ -269,29 +298,35 @@ struct TransportPlugin * to the list and wait for the commit call. */ int rebuild; + }; -struct NeighborList; +struct NeighbourList; /** - * For each neighbor we keep a list of messages - * that we still want to transmit to the neighbor. + * For each neighbour we keep a list of messages + * that we still want to transmit to the neighbour. */ struct MessageQueue { /** - * This is a linked list. + * This is a doubly linked list. */ struct MessageQueue *next; + /** + * This is a doubly linked list. + */ + struct MessageQueue *prev; + /** * The message(s) we want to transmit, GNUNET_MessageHeader(s) - * stuck together in memory. + * stuck together in memory. Allocated at the end of this struct. */ - char *message_buf; + const char *message_buf; - /* + /** * Size of the message buf */ size_t message_buf_size; @@ -304,14 +339,14 @@ struct MessageQueue struct TransportClient *client; /** - * Neighbor this entry belongs to. + * Using which specific address should we send this message? */ - /*struct NeighborList *neighbor;*/ + struct ForeignAddressList *specific_address; /** - * Peer ID of the Neighbor this entry belongs to. + * Peer ID of the Neighbour this entry belongs to. */ - struct GNUNET_PeerIdentity *neighbor_id; + struct GNUNET_PeerIdentity neighbour_id; /** * Plugin that we used for the transmission. @@ -319,6 +354,11 @@ struct MessageQueue */ struct TransportPlugin *plugin; + /** + * At what time should we fail? + */ + struct GNUNET_TIME_Absolute timeout; + /** * Internal message of the transport system that should not be * included in the usual SEND-SEND_OK transmission confirmation @@ -333,16 +373,11 @@ struct MessageQueue */ unsigned int priority; - /* - * Using which specific address should we send this message? - */ - struct PeerAddressList *specific_peer; - }; /** - * For a given Neighbor, which plugins are available + * For a given Neighbour, which plugins are available * to talk to this peer and what are their costs? */ struct ReadyList @@ -359,41 +394,24 @@ struct ReadyList struct TransportPlugin *plugin; /** - * Neighbor this entry belongs to. - */ - struct NeighborList *neighbor; - - /* * Transport addresses, latency, and readiness for * this particular plugin. */ - struct PeerAddressList *addresses; + struct ForeignAddressList *addresses; - /** - * Is this plugin ready to transmit to the specific target? - * GNUNET_NO if not. Initially, all plugins are marked ready. If a - * transmission is in progress, "transmit_ready" is set to - * GNUNET_NO. - */ - int plugin_transmit_ready; - - /* - * Are any of our PeerAddressList addresses still connected? - */ - int connected; /* FIXME: dynamically check PeerAddressList addresses when asked to! */ }; /** - * Entry in linked list of all of our current neighbors. + * Entry in linked list of all of our current neighbours. */ -struct NeighborList +struct NeighbourList { /** * This is a linked list. */ - struct NeighborList *next; + struct NeighbourList *next; /** * Which of our transports is connected to this peer @@ -402,13 +420,19 @@ struct NeighborList struct ReadyList *plugins; /** - * List of messages we would like to send to this peer; + * Head of list of messages we would like to send to this peer; * must contain at most one message per client. */ - struct MessageQueue *messages; + struct MessageQueue *messages_head; /** - * Identity of this neighbor. + * Tail of list of messages we would like to send to this peer; must + * contain at most one message per client. + */ + struct MessageQueue *messages_tail; + + /** + * Identity of this neighbour. */ struct GNUNET_PeerIdentity id; @@ -418,6 +442,12 @@ struct NeighborList */ GNUNET_SCHEDULER_TaskIdentifier timeout_task; + /** + * ID of task scheduled to run when we should retry transmitting + * the head of the message queue. + */ + GNUNET_SCHEDULER_TaskIdentifier retry_task; + /** * How long until we should consider this peer dead * (if we don't receive another message in the @@ -426,26 +456,22 @@ struct NeighborList struct GNUNET_TIME_Absolute peer_timeout; /** - * At what time did we reset last_received last? - */ - struct GNUNET_TIME_Absolute last_quota_update; - - /** - * At what time should we try to again add plugins to - * our ready list? - */ - struct GNUNET_TIME_Absolute retry_plugins_time; - - /** - * How many bytes have we received since the "last_quota_update" - * timestamp? + * Tracker for inbound bandwidth. */ - uint64_t last_received; + struct GNUNET_BANDWIDTH_Tracker in_tracker; /** - * Global quota for inbound traffic for the neighbor in bytes/ms. + * The latency we have seen for this particular address for + * this particular peer. This latency may have been calculated + * over multiple transports. This value reflects how long it took + * us to receive a response when SENDING via this particular + * transport/neighbour/address combination! + * + * FIXME: we need to periodically send PINGs to update this + * latency (at least more often than the current "huge" (11h?) + * update interval). */ - uint32_t quota_in; + struct GNUNET_TIME_Relative latency; /** * How often has the other peer (recently) violated the @@ -456,27 +482,21 @@ struct NeighborList unsigned int quota_violation_count; /** - * Have we seen an ACK from this neighbor in the past? - * (used to make up a fake ACK for clients connecting after - * the neighbor connected to us). + * DV distance to this peer (1 if no DV is used). */ - int received_pong; + uint32_t distance; - /* The latency we have seen for this particular address for - * this particular peer. This latency may have been calculated - * over multiple transports. This value reflects how long it took - * us to receive a response when SENDING via this particular - * transport/neighbor/address combination! + /** + * Have we seen an PONG from this neighbour in the past (and + * not had a disconnect since)? */ - struct GNUNET_TIME_RelativeNBO latency; + int received_pong; }; /** * Message used to ask a peer to validate receipt (to check an address - * from a HELLO). Followed by the address used. Note that the - * recipients response does not affirm that he has this address, - * only that he got the challenge message. + * from a HELLO). */ struct TransportPingMessage { @@ -544,24 +564,29 @@ struct TransportPongMessage */ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded signer; - /* + /** * Size of address appended to this message */ size_t addrlen; }; + /** - * Linked list of messages to be transmitted to - * the client. Each entry is followed by the - * actual message. + * Linked list of messages to be transmitted to the client. Each + * entry is followed by the actual message. */ struct ClientMessageQueueEntry { /** - * This is a linked list. + * This is a doubly-linked list. */ struct ClientMessageQueueEntry *next; + + /** + * This is a doubly-linked list. + */ + struct ClientMessageQueueEntry *prev; }; @@ -593,6 +618,11 @@ struct TransportClient */ struct ClientMessageQueueEntry *message_queue_tail; + /** + * Current transmit request handle. + */ + struct GNUNET_CONNECTION_TransmitHandle *th; + /** * Is a call to "transmit_send_continuation" pending? If so, we * must not free this struct (even if the corresponding client @@ -610,20 +640,16 @@ struct TransportClient /** - * For each HELLO, we may have to validate multiple addresses; - * each address gets its own request entry. + * Entry in map of all HELLOs awaiting validation. */ -struct ValidationAddress +struct ValidationEntry { - /** - * This is a linked list. - */ - struct ValidationAddress *next; - /* - * What peer_address does this validation belong to? + /** + * The address, actually a pointer to the end + * of this struct. Do not free! */ - struct PeerAddressList *peer_address; + const void *addr; /** * Name of the transport. @@ -631,101 +657,104 @@ struct ValidationAddress char *transport_name; /** - * When should this validated address expire? + * The public key of the peer. */ - struct GNUNET_TIME_Absolute expiration; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; + + /** + * ID of task that will clean up this entry if we don't succeed + * with the validation first. + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; - /* + /** * At what time did we send this validation? */ struct GNUNET_TIME_Absolute send_time; /** - * Challenge number we used. + * Length of addr. */ - uint32_t challenge; + size_t addrlen; /** - * Set to GNUNET_YES if the challenge was met, - * GNUNET_SYSERR if we know it failed, GNUNET_NO - * if we are waiting on a response. + * Challenge number we used. */ - int ok; + uint32_t challenge; + }; /** - * Entry in linked list of all HELLOs awaiting validation. + * Context of currently active requests to peerinfo + * for validation of HELLOs. */ -struct ValidationList +struct CheckHelloValidatedContext { /** - * This is a linked list. + * This is a doubly-linked list. */ - struct ValidationList *next; + struct CheckHelloValidatedContext *next; /** - * Linked list with one entry per address from the HELLO - * that needs to be validated. + * This is a doubly-linked list. */ - struct ValidationAddress *addresses; + struct CheckHelloValidatedContext *prev; /** - * The public key of the peer. + * Hello that we are validating. */ - struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; + const struct GNUNET_HELLO_Message *hello; /** - * When does this record time-out? (assuming the - * challenge goes unanswered) + * Context for peerinfo iteration. + * NULL after we are done processing peerinfo's information. + */ + struct GNUNET_PEERINFO_IteratorContext *piter; + + /** + * Was a HELLO known for this peer to peerinfo? */ - struct GNUNET_TIME_Absolute timeout; + int hello_known; }; - -struct CheckHelloValidatedContext +/** + * Struct for keeping information about addresses to validate + * so that we can re-use for sending around ping's and receiving + * pongs periodically to keep connections alive and also better + * estimate latency of connections. + * + */ +struct PeriodicValidationContext { - /** - * Plugin for which we are validating. - */ - struct TransportPlugin *plugin; /** - * Hello that we are validating. + * The address we are keeping alive */ - struct GNUNET_HELLO_Message *hello; + struct ForeignAddressList *foreign_address; /** - * Validation list being built. + * The name of the transport */ - struct ValidationList *e; + char *transport; /** - * Context for peerinfo iteration. - * NULL after we are done processing peerinfo's information. + * Public Key of the peer to re-validate */ - struct GNUNET_PEERINFO_IteratorContext *piter; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; }; - - -/** - * HELLOs awaiting validation. - */ -static struct ValidationList *pending_validations; - /** * Our HELLO message. */ static struct GNUNET_HELLO_Message *our_hello; /** - * "version" of "our_hello". Used to see if a given - * neighbor has already been sent the latest version - * of our HELLO message. + * "version" of "our_hello". Used to see if a given neighbour has + * already been sent the latest version of our HELLO message. */ static unsigned int our_hello_version; @@ -770,58 +799,79 @@ static struct TransportPlugin *plugins; static struct GNUNET_SERVER_Handle *server; /** - * All known neighbors and their HELLOs. + * All known neighbours and their HELLOs. */ -static struct NeighborList *neighbors; +static struct NeighbourList *neighbours; /** - * Number of neighbors we'd like to have. + * Number of neighbours we'd like to have. */ static uint32_t max_connect_per_transport; /** - * The peer specified by the given neighbor has timed-out or a plugin + * Head of linked list. + */ +static struct CheckHelloValidatedContext *chvc_head; + +/** + * Tail of linked list. + */ +static struct CheckHelloValidatedContext *chvc_tail; + +/** + * Map of PeerIdentities to 'struct ValidationEntry*'s (addresses + * of the given peer that we are currently validating). + */ +static struct GNUNET_CONTAINER_MultiHashMap *validation_map; + +/** + * Handle for reporting statistics. + */ +static struct GNUNET_STATISTICS_Handle *stats; + + +/** + * The peer specified by the given neighbour has timed-out or a plugin * has disconnected. We may either need to do nothing (other plugins * still up), or trigger a full disconnect and clean up. This * function updates our state and do the necessary notifications. - * Also notifies our clients that the neighbor is now officially + * Also notifies our clients that the neighbour is now officially * gone. * - * @param n the neighbor list entry for the peer + * @param n the neighbour list entry for the peer * @param check should we just check if all plugins * disconnected or must we ask all plugins to * disconnect? */ -static void disconnect_neighbor (struct NeighborList *n, int check); - +static void disconnect_neighbour (struct NeighbourList *n, int check); /** - * Check the ready list for the given neighbor and - * if a plugin is ready for transmission (and if we - * have a message), do so! + * Check the ready list for the given neighbour and if a plugin is + * ready for transmission (and if we have a message), do so! * - * @param neighbor target peer for which to check the plugins + * @param neighbour target peer for which to transmit */ -static ssize_t try_transmission_to_peer (struct NeighborList *neighbor); +static void try_transmission_to_peer (struct NeighbourList *neighbour); /** - * Find an entry in the neighbor list for a particular peer. + * Find an entry in the neighbour list for a particular peer. * if sender_address is not specified (NULL) then return the * first matching entry. If sender_address is specified, then * make sure that the address and address_len also matches. - * + * + * FIXME: This description does not fit the function. + * * @return NULL if not found. */ -static struct NeighborList * -find_neighbor (const struct GNUNET_PeerIdentity *key) +static struct NeighbourList * +find_neighbour (const struct GNUNET_PeerIdentity *key) { - struct NeighborList *head = neighbors; + struct NeighbourList *head = neighbours; while ((head != NULL) && (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity)))) head = head->next; - return head; } @@ -842,53 +892,9 @@ find_transport (const char *short_name) /** - * Update the quota values for the given neighbor now. - */ -static void -update_quota (struct NeighborList *n) -{ - struct GNUNET_TIME_Relative delta; - uint64_t allowed; - uint64_t remaining; - - delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - if (delta.value < MIN_QUOTA_REFRESH_TIME) - return; /* not enough time passed for doing quota update */ - allowed = delta.value * n->quota_in; - - if (n->last_received < allowed) - { - remaining = allowed - n->last_received; - if (n->quota_in > 0) - remaining /= n->quota_in; - else - remaining = 0; - if (remaining > MAX_BANDWIDTH_CARRY) - remaining = MAX_BANDWIDTH_CARRY; - n->last_received = 0; - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->last_quota_update.value -= remaining; - if (n->quota_violation_count > 0) - n->quota_violation_count--; - } - else - { - n->last_received -= allowed; - n->last_quota_update = GNUNET_TIME_absolute_get (); - if (n->last_received > allowed) - { - /* more than twice the allowed rate! */ - n->quota_violation_count += 10; - } - } -} - - -/** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. + * Function called to notify a client about the socket being ready to + * queue more data. "buf" will be NULL and "size" zero if the socket + * was closed for writing in the meantime. * * @param cls closure * @param size number of bytes available in buf @@ -903,9 +909,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) uint16_t msize; size_t tsize; const struct GNUNET_MessageHeader *msg; - struct GNUNET_CONNECTION_TransmitHandle *th; char *cbuf; + client->th = NULL; if (buf == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -913,10 +919,15 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) /* fatal error with client, free message queue! */ while (NULL != (q = client->message_queue_head)) { - client->message_queue_head = q->next; + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes discarded (could not transmit to client)"), + ntohs (((const struct GNUNET_MessageHeader*)&q[1])->size), + GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); GNUNET_free (q); } - client->message_queue_tail = NULL; client->message_count = 0; return 0; } @@ -933,9 +944,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) "Transmitting message of type %u to client.\n", ntohs (msg->type)); #endif - client->message_queue_head = q->next; - if (q->next == NULL) - client->message_queue_tail = NULL; + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); memcpy (&cbuf[tsize], msg, msize); tsize += msize; GNUNET_free (q); @@ -944,12 +955,12 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) if (NULL != q) { GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } return tsize; } @@ -971,7 +982,6 @@ transmit_to_client (struct TransportClient *client, { struct ClientMessageQueueEntry *q; uint16_t msize; - struct GNUNET_CONNECTION_TransmitHandle *th; if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop)) { @@ -982,61 +992,48 @@ transmit_to_client (struct TransportClient *client, /* TODO: call to statistics... */ return; } - client->message_count++; msize = ntohs (msg->size); GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize); memcpy (&q[1], msg, msize); - /* append to message queue */ - if (client->message_queue_tail == NULL) - { - client->message_queue_tail = q; - } - else - { - client->message_queue_tail->next = q; - client->message_queue_tail = q; - } - if (client->message_queue_head == NULL) + GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head, + client->message_queue_tail, + client->message_queue_tail, + q); + client->message_count++; + if (client->th == NULL) { - client->message_queue_head = q; - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } } /** - * Find alternative plugins for communication. + * Transmit a 'SEND_OK' notification to the given client for the + * given neighbour. * - * @param neighbor for which neighbor should we try to find - * more plugins? + * @param client who to notify + * @param n neighbour to notify about + * @param result status code for the transmission request */ static void -try_alternative_plugins (struct NeighborList *neighbor) +transmit_send_ok (struct TransportClient *client, + struct NeighbourList *n, + int result) { - struct ReadyList *rl; - - if ((neighbor->plugins != NULL) && - (neighbor->retry_plugins_time.value > - GNUNET_TIME_absolute_get ().value)) - return; /* don't try right now */ - neighbor->retry_plugins_time - = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY); + struct SendOkMessage send_ok_msg; - rl = neighbor->plugins; -#if WTF /* FIXME: What is this supposed to do? */ - while (rl != NULL) - { - if (rl->connect_attempts > 0) - rl->connect_attempts--; /* amnesty */ - rl = rl->next; - } -#endif + send_ok_msg.header.size = htons (sizeof (send_ok_msg)); + send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); + send_ok_msg.success = htonl (result); + send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency); + send_ok_msg.peer = n->id; + transmit_to_client (client, &send_ok_msg.header, GNUNET_NO); } @@ -1060,88 +1057,77 @@ transmit_send_continuation (void *cls, int result) { struct MessageQueue *mq = cls; - /*struct ReadyList *rl;*/ /* We no longer use the ReadyList for anything here, safe to remove? */ - struct SendOkMessage send_ok_msg; - struct NeighborList *n; - - GNUNET_assert (mq != NULL); - n = find_neighbor(mq->neighbor_id); - if (n == NULL) /* Neighbor must have been removed asynchronously! */ - return; - - /* Otherwise, let's make sure we've got the right peer */ - GNUNET_assert (0 == - memcmp (&n->id, target, - sizeof (struct GNUNET_PeerIdentity))); + struct NeighbourList *n; if (result == GNUNET_OK) { - mq->specific_peer->timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes successfully transmitted by plugins"), + mq->message_buf_size, + GNUNET_NO); } else { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission to peer `%s' failed, marking connection as down.\n", - GNUNET_i2s (target)); - mq->specific_peer->connected = GNUNET_NO; - } - if (!mq->internal_msg) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Setting transmit_ready on transport!\n"); -#endif - mq->specific_peer->transmit_ready = GNUNET_YES; + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes with transmission failure by plugins"), + mq->message_buf_size, + GNUNET_NO); + } + n = find_neighbour(&mq->neighbour_id); + GNUNET_assert (n != NULL); + if (mq->specific_address != NULL) + { + if (result == GNUNET_OK) + { + mq->specific_address->timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + mq->specific_address->connected = GNUNET_YES; + } + else + { + mq->specific_address->connected = GNUNET_NO; + } + if (! mq->internal_msg) + mq->specific_address->in_transmit = GNUNET_NO; } - if (mq->client != NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Notifying client %p about transmission to peer `%4s'.\n", - mq->client, GNUNET_i2s (target)); - send_ok_msg.header.size = htons (sizeof (send_ok_msg)); - send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - send_ok_msg.success = htonl (result); - send_ok_msg.peer = n->id; - transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO); - } - GNUNET_free (mq->message_buf); - GNUNET_free (mq->neighbor_id); + transmit_send_ok (mq->client, n, result); GNUNET_free (mq); - /* one plugin just became ready again, try transmitting - another message (if available) */ - if (result == GNUNET_OK) - try_transmission_to_peer (n); - else - disconnect_neighbor (n, GNUNET_YES); + try_transmission_to_peer (n); } - - -struct PeerAddressList * -find_ready_address(struct NeighborList *neighbor) +/** + * Find an address in any of the available transports for + * the given neighbour that would be good for message + * transmission. This is essentially the transport selection + * routine. + * + * @param neighbour for whom to select an address + * @return selected address, NULL if we have none + */ +struct ForeignAddressList * +find_ready_address(struct NeighbourList *neighbour) { - struct ReadyList *head = neighbor->plugins; - struct PeerAddressList *addresses; + struct ReadyList *head = neighbour->plugins; + struct ForeignAddressList *addresses; struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); - struct PeerAddressList *best_address; + struct ForeignAddressList *best_address; best_address = NULL; while (head != NULL) { addresses = head->addresses; - while (addresses != NULL) { - if ((addresses->timeout.value < now.value) && (addresses->connected == GNUNET_YES)) + if ( (addresses->timeout.value < now.value) && + (addresses->connected == GNUNET_YES) ) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Marking long-time inactive connection to `%4s' as down.\n", - GNUNET_i2s (&addresses->ready_list->neighbor->id)); + GNUNET_i2s (&neighbour->id)); #endif addresses->connected = GNUNET_NO; } @@ -1151,17 +1137,15 @@ find_ready_address(struct NeighborList *neighbor) addresses = head->addresses; while (addresses != NULL) { - if ((addresses->connected == GNUNET_YES) && - (addresses->transmit_ready == GNUNET_YES) && - ((best_address == NULL) || (addresses->latency.value < best_address->latency.value))) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found address with latency %llu (previous best was %llu), setting as best found yet!\n", - addresses->latency.value, best_address == NULL ? -1LL : best_address->latency.value); -#endif - best_address = addresses; - } + if ( ( (best_address == NULL) || + (addresses->connected == GNUNET_YES) || + (best_address->connected == GNUNET_NO) ) && + (addresses->in_transmit == GNUNET_NO) && + ( (best_address == NULL) || + (addresses->latency.value < best_address->latency.value)) ) + best_address = addresses; + /* FIXME: also give lower-latency addresses that are not + connected a chance some times... */ addresses = addresses->next; } head = head->next; @@ -1170,7 +1154,7 @@ find_ready_address(struct NeighborList *neighbor) if (best_address != NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Best address found has latency of %llu!\n", + "Best address found has latency of %llu ms.\n", best_address->latency.value); } #endif @@ -1178,62 +1162,123 @@ find_ready_address(struct NeighborList *neighbor) } + +/** + * We should re-try transmitting to the given peer, + * hopefully we've learned something in the meantime. + */ +static void +retry_transmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourList *n = cls; + + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); +} + + /** - * Check the ready list for the given neighbor and - * if a plugin is ready for transmission (and if we - * have a message), do so! + * Check the ready list for the given neighbour and if a plugin is + * ready for transmission (and if we have a message), do so! + * + * @param neighbour target peer for which to transmit */ -static ssize_t -try_transmission_to_peer (struct NeighborList *neighbor) +static void +try_transmission_to_peer (struct NeighbourList *neighbour) { - struct GNUNET_TIME_Relative min_latency; struct ReadyList *rl; struct MessageQueue *mq; - struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Relative timeout; - if (neighbor->messages == NULL) - return 0; /* nothing to do */ - try_alternative_plugins (neighbor); - min_latency = GNUNET_TIME_UNIT_FOREVER_REL; + if (neighbour->messages_head == NULL) + return; /* nothing to do */ rl = NULL; - mq = neighbor->messages; - now = GNUNET_TIME_absolute_get (); - - if (mq->specific_peer == NULL) - mq->specific_peer = find_ready_address(neighbor); /* Find first available (or best!) address to transmit to */ - - if (mq->specific_peer == NULL) - { + mq = neighbour->messages_head; + /* FIXME: support bi-directional use of TCP */ + if (mq->specific_address == NULL) + mq->specific_address = find_ready_address(neighbour); + if (mq->specific_address == NULL) + { + timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); + if (timeout.value == 0) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No destination address available to transmit message of size %u to peer `%4s'\n", + mq->message_buf_size, + GNUNET_i2s (&mq->neighbour_id)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes in message queue for other peers"), + -mq->message_buf_size, + GNUNET_NO); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes discarded (no destination address available)"), + mq->message_buf_size, + GNUNET_NO); + if (mq->client != NULL) + transmit_send_ok (mq->client, neighbour, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (neighbour->messages_head, + neighbour->messages_tail, + mq); + GNUNET_free (mq); + return; /* nobody ready */ + } + if (neighbour->retry_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, + neighbour->retry_task); + neighbour->retry_task = GNUNET_SCHEDULER_add_delayed (sched, + timeout, + &retry_transmission_task, + neighbour); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No plugin ready to transmit message\n"); + "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n", + mq->message_buf_size, + GNUNET_i2s (&mq->neighbour_id), + timeout.value); #endif - return 0; /* nobody ready */ - } - - rl = mq->specific_peer->ready_list; - neighbor->messages = mq->next; + /* FIXME: might want to trigger peerinfo lookup here + (unless that's already pending...) */ + return; + } + GNUNET_CONTAINER_DLL_remove (neighbour->messages_head, + neighbour->messages_tail, + mq); + if (mq->specific_address->connected == GNUNET_NO) + mq->specific_address->connect_attempts++; + rl = mq->specific_address->ready_list; mq->plugin = rl->plugin; if (!mq->internal_msg) - mq->specific_peer->transmit_ready = GNUNET_NO; + mq->specific_address->in_transmit = GNUNET_YES; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Giving message of size `%u' for `%4s' to plugin `%s'\n", + "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n", mq->message_buf_size, - GNUNET_i2s (&neighbor->id), rl->plugin->short_name); + GNUNET_i2s (&neighbour->id), + GNUNET_a2s (mq->specific_address->addr, + mq->specific_address->addrlen), + rl->plugin->short_name); #endif - - return rl->plugin->api->send (rl->plugin->api->cls, - mq->neighbor_id, - mq->message_buf, - mq->message_buf_size, - mq->priority, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - mq->specific_peer->addr, - mq->specific_peer->addrlen, - GNUNET_YES, - &transmit_send_continuation, mq); - + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes in message queue for other peers"), + -mq->message_buf_size, + GNUNET_NO); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes transmitted to other peers"), + mq->message_buf_size, + GNUNET_NO); + rl->plugin->api->send (rl->plugin->api->cls, + &mq->neighbour_id, + mq->message_buf, + mq->message_buf_size, + mq->priority, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + mq->specific_address->addr, + mq->specific_address->addrlen, + GNUNET_YES /* FIXME: sometimes, we want to be more tolerant here! */, + &transmit_send_continuation, mq); } @@ -1241,70 +1286,68 @@ try_transmission_to_peer (struct NeighborList *neighbor) * Send the specified message to the specified peer. * * @param client source of the transmission request (can be NULL) - * @param peer_address PeerAddressList where we should send this message + * @param peer_address ForeignAddressList where we should send this message * @param priority how important is the message + * @param timeout how long do we have to transmit? * @param message_buf message(s) to send GNUNET_MessageHeader(s) * @param message_buf_size total size of all messages in message_buf - * @param is_internal is this an internal message - * @param neighbor handle to the neighbor for transmission + * @param is_internal is this an internal message; these are pre-pended and + * also do not count for plugins being "ready" to transmit + * @param neighbour handle to the neighbour for transmission */ -static ssize_t +static void transmit_to_peer (struct TransportClient *client, - struct PeerAddressList *peer_address, + struct ForeignAddressList *peer_address, unsigned int priority, + struct GNUNET_TIME_Relative timeout, const char *message_buf, size_t message_buf_size, - int is_internal, struct NeighborList *neighbor) + int is_internal, struct NeighbourList *neighbour) { struct MessageQueue *mq; - struct MessageQueue *mqe; - char *m; +#if EXTRA_CHECKS if (client != NULL) { /* check for duplicate submission */ - mq = neighbor->messages; + mq = neighbour->messages_head; while (NULL != mq) { if (mq->client == client) { /* client transmitted to same peer twice - before getting SendOk! */ + before getting SEND_OK! */ GNUNET_break (0); - return 0; + return; } mq = mq->next; } } - mq = GNUNET_malloc (sizeof (struct MessageQueue)); - mq->specific_peer = peer_address; +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes in message queue for other peers"), + message_buf_size, + GNUNET_NO); + mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size); + mq->specific_address = peer_address; mq->client = client; - m = GNUNET_malloc (message_buf_size); - memcpy (m, message_buf, message_buf_size); - mq->message_buf = m; + memcpy (&mq[1], message_buf, message_buf_size); + mq->message_buf = (const char*) &mq[1]; mq->message_buf_size = message_buf_size; - mq->neighbor_id = GNUNET_malloc(sizeof (struct GNUNET_PeerIdentity)); - - memcpy(mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity)); + memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity)); mq->internal_msg = is_internal; mq->priority = priority; - - /* find tail */ - mqe = neighbor->messages; - if (mqe != NULL) - while (mqe->next != NULL) - mqe = mqe->next; - if (mqe == NULL) - { - /* new head */ - neighbor->messages = mq; - } + mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); + if (is_internal) + GNUNET_CONTAINER_DLL_insert (neighbour->messages_head, + neighbour->messages_tail, + mq); else - { - /* append */ - mqe->next = mq; - } - return try_transmission_to_peer (neighbor); + GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head, + neighbour->messages_tail, + neighbour->messages_tail, + mq); + try_transmission_to_peer (neighbour); } @@ -1314,7 +1357,7 @@ transmit_to_peer (struct TransportClient *client, struct GeneratorContext { struct TransportPlugin *plug_pos; - struct AddressList *addr_pos; + struct OwnAddressList *addr_pos; struct GNUNET_TIME_Absolute expiration; }; @@ -1356,7 +1399,7 @@ refresh_hello () { struct GNUNET_HELLO_Message *hello; struct TransportClient *cpos; - struct NeighborList *npos; + struct NeighbourList *npos; struct GeneratorContext gc; gc.plug_pos = plugins; @@ -1367,6 +1410,10 @@ refresh_hello () GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Refreshed my `%s', new size is %d\n", "HELLO", GNUNET_HELLO_size(hello)); #endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# refreshed my HELLO"), + 1, + GNUNET_NO); cpos = clients; while (cpos != NULL) { @@ -1380,17 +1427,23 @@ refresh_hello () our_hello = hello; our_hello_version++; GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello); - npos = neighbors; + npos = neighbours; while (npos != NULL) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Transmitting updated `%s' to neighbor `%4s'\n", + "Transmitting updated `%s' to neighbour `%4s'\n", "HELLO", GNUNET_i2s (&npos->id)); -#endif // FIXME: just testing - //transmit_to_peer (NULL, NULL, 0, - // (const char *) our_hello, GNUNET_HELLO_size(our_hello), - // GNUNET_YES, npos); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# transmitted my HELLO to other peers"), + 1, + GNUNET_NO); + transmit_to_peer (NULL, NULL, 0, + HELLO_ADDRESS_EXPIRATION, + (const char *) our_hello, + GNUNET_HELLO_size(our_hello), + GNUNET_NO, npos); npos = npos->next; } } @@ -1422,9 +1475,9 @@ update_addresses (struct TransportPlugin *plugin, int fresh) struct GNUNET_TIME_Relative min_remaining; struct GNUNET_TIME_Relative remaining; struct GNUNET_TIME_Absolute now; - struct AddressList *pos; - struct AddressList *prev; - struct AddressList *next; + struct OwnAddressList *pos; + struct OwnAddressList *prev; + struct OwnAddressList *next; int expired; if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK) @@ -1503,7 +1556,7 @@ plugin_env_notify_address (void *cls, struct GNUNET_TIME_Relative expires) { struct TransportPlugin *p = cls; - struct AddressList *al; + struct OwnAddressList *al; struct GNUNET_TIME_Absolute abex; abex = GNUNET_TIME_relative_to_absolute (expires); @@ -1521,7 +1574,7 @@ plugin_env_notify_address (void *cls, al = al->next; } - al = GNUNET_malloc (sizeof (struct AddressList) + addrlen); + al = GNUNET_malloc (sizeof (struct OwnAddressList) + addrlen); al->addr = &al[1]; al->next = p->addresses; p->addresses = al; @@ -1537,14 +1590,24 @@ plugin_env_notify_address (void *cls, */ static void notify_clients_connect (const struct GNUNET_PeerIdentity *peer, - struct GNUNET_TIME_Relative latency) + struct GNUNET_TIME_Relative latency, + uint32_t distance) { struct ConnectInfoMessage cim; struct TransportClient *cpos; - cim.header.size = htons (sizeof (struct ConnectInfoMessage)); - cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Notifying clients about connection from `%s'\n", + GNUNET_i2s (peer)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# peers connected"), + 1, + GNUNET_NO); + cim.header.size = htons (sizeof (struct ConnectInfoMessage)); + cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); + cim.distance = htonl (distance); cim.latency = GNUNET_TIME_relative_hton (latency); memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity)); cpos = clients; @@ -1565,6 +1628,15 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) struct DisconnectInfoMessage dim; struct TransportClient *cpos; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Notifying clients about lost connection to `%s'\n", + GNUNET_i2s (peer)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# peers connected"), + -1, + GNUNET_NO); dim.header.size = htons (sizeof (struct DisconnectInfoMessage)); dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); dim.reserved = htonl (0); @@ -1579,109 +1651,251 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) /** - * Copy any validated addresses to buf. + * Find a ForeignAddressList entry for the given neighbour + * that matches the given address and transport. + * + * @param neighbour which peer we care about + * @param tname name of the transport plugin + * @param addr binary address + * @param addrlen length of addr + * @return NULL if no such entry exists + */ +static struct ForeignAddressList * +find_peer_address(struct NeighbourList *neighbour, + const char *tname, + const char *addr, + size_t addrlen) +{ + struct ReadyList *head; + struct ForeignAddressList *address_head; + + head = neighbour->plugins; + while (head != NULL) + { + if (0 == strcmp (tname, head->plugin->short_name)) + break; + head = head->next; + } + if (head == NULL) + return NULL; + + address_head = head->addresses; + while ( (address_head != NULL) && + ( (address_head->addrlen != addrlen) || + (memcmp(address_head->addr, addr, addrlen) != 0) ) ) + address_head = address_head->next; + return address_head; +} + + +/** + * Get the peer address struct for the given neighbour and + * address. If it doesn't yet exist, create it. + * + * @param neighbour which peer we care about + * @param tname name of the transport plugin + * @param addr binary address + * @param addrlen length of addr + * @return NULL if we do not have a transport plugin for 'tname' + */ +static struct ForeignAddressList * +add_peer_address(struct NeighbourList *neighbour, + const char *tname, + const char *addr, + size_t addrlen) +{ + struct ReadyList *head; + struct ForeignAddressList *ret; + + ret = find_peer_address (neighbour, tname, addr, addrlen); + if (ret != NULL) + return ret; + head = neighbour->plugins; + while (head != NULL) + { + if (0 == strcmp (tname, head->plugin->short_name)) + break; + head = head->next; + } + if (head == NULL) + return NULL; + ret = GNUNET_malloc(sizeof(struct ForeignAddressList) + addrlen); + ret->addr = (const char*) &ret[1]; + memcpy (&ret[1], addr, addrlen); + ret->addrlen = addrlen; + ret->expires = GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + ret->latency = GNUNET_TIME_relative_get_forever(); + ret->distance = -1; + ret->timeout = GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + ret->ready_list = head; + ret->next = head->addresses; + head->addresses = ret; + return ret; +} + + +/** + * Closure for 'add_validated_address'. + */ +struct AddValidatedAddressContext +{ + /** + * Entry that has been validated. + */ + const struct ValidationEntry *ve; + + /** + * Flag set after we have added the address so + * that we terminate the iteration next time. + */ + int done; +}; + + +/** + * Callback function used to fill a buffer of max bytes with a list of + * addresses in the format used by HELLOs. Should use + * "GNUNET_HELLO_add_address" as a helper function. * - * @return 0 once all addresses have been - * returned + * @param cls the 'struct AddValidatedAddressContext' with the validated address + * @param max maximum number of bytes that can be written to buf + * @param buf where to write the address information + * @return number of bytes written, 0 to signal the + * end of the iteration. */ static size_t -list_validated_addresses (void *cls, size_t max, void *buf) +add_validated_address (void *cls, + size_t max, void *buf) { - struct ValidationAddress **va = cls; - size_t ret; + struct AddValidatedAddressContext *avac = cls; + const struct ValidationEntry *ve = avac->ve; - while ((NULL != *va) && ((*va)->ok != GNUNET_YES)) - *va = (*va)->next; - if (NULL == *va) + if (GNUNET_YES == avac->done) return 0; - ret = GNUNET_HELLO_add_address ((*va)->transport_name, - (*va)->expiration, - (*va)->peer_address->addr, (*va)->peer_address->addrlen, buf, max); - *va = (*va)->next; - return ret; + avac->done = GNUNET_YES; + return GNUNET_HELLO_add_address (ve->transport_name, + GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION), + ve->addr, + ve->addrlen, + buf, + max); } +static void send_periodic_ping(void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); + + /** - * HELLO validation cleanup task. + * Iterator over hash map entries. Checks if the given validation + * entry is for the same challenge as what is given in the PONG. + * + * @param cls the 'struct TransportPongMessage*' + * @param key peer identity + * @param value value in the hash map ('struct ValidationEntry') + * @return GNUNET_YES if we should continue to + * iterate (mismatch), GNUNET_NO if not (entry matched) */ -static void -cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +static int +check_pending_validation (void *cls, + const GNUNET_HashCode * key, + void *value) { - struct ValidationAddress *va; - struct ValidationList *pos; - struct ValidationList *prev; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Absolute first; + const struct TransportPongMessage *pong = cls; + struct ValidationEntry *ve = value; + struct AddValidatedAddressContext avac; + unsigned int challenge = ntohl(pong->challenge); struct GNUNET_HELLO_Message *hello; - struct GNUNET_PeerIdentity pid; - struct NeighborList *n; + struct GNUNET_PeerIdentity target; + struct NeighbourList *n; + struct ForeignAddressList *fal; + struct PeriodicValidationContext *periodic_validation_context; + + if (ve->challenge != challenge) + return GNUNET_YES; - now = GNUNET_TIME_absolute_get (); - prev = NULL; - pos = pending_validations; - while (pos != NULL) - { - if (pos->timeout.value < now.value) - { - if (prev == NULL) - pending_validations = pos->next; - else - prev->next = pos->next; - va = pos->addresses; - hello = GNUNET_HELLO_create (&pos->publicKey, - &list_validated_addresses, &va); - GNUNET_CRYPTO_hash (&pos->publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &pid.hashPubKey); #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n", - "HELLO", GNUNET_i2s (&pid)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Confirmed validity of address, peer `%4s' has address `%s' (%s).\n", + GNUNET_h2s (key), + GNUNET_a2s ((const struct sockaddr *) ve->addr, + ve->addrlen), + ve->transport_name); #endif - GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello); - n = find_neighbor (&pid); - if (NULL != n) - { - try_transmission_to_peer (n); - } - GNUNET_free (hello); - while (NULL != (va = pos->addresses)) - { - pos->addresses = va->next; - GNUNET_free (va->transport_name); - GNUNET_free (va); - } - GNUNET_free (pos); - if (prev == NULL) - pos = pending_validations; - else - pos = prev->next; - continue; - } - prev = pos; - pos = pos->next; - } - - /* finally, reschedule cleanup if needed; list is - ordered by timeout, so we need the last element... */ - if (NULL != pending_validations) - { - first = pending_validations->timeout; - pos = pending_validations; - while (pos != NULL) - { - first = GNUNET_TIME_absolute_min (first, pos->timeout); - pos = pos->next; - } - if (tc->reason != GNUNET_SCHEDULER_REASON_SHUTDOWN) - { - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_absolute_get_remaining - (first), &cleanup_validation, NULL); - } - } + GNUNET_STATISTICS_update (stats, + gettext_noop ("# address validation successes"), + 1, + GNUNET_NO); + /* create the updated HELLO */ + GNUNET_CRYPTO_hash (&ve->publicKey, + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &target.hashPubKey); + avac.done = GNUNET_NO; + avac.ve = ve; + hello = GNUNET_HELLO_create (&ve->publicKey, + &add_validated_address, + &avac); + GNUNET_PEERINFO_add_peer (cfg, sched, + &target, + hello); + GNUNET_free (hello); + n = find_neighbour (&target); + if (n != NULL) + { + fal = add_peer_address (n, + ve->transport_name, + ve->addr, + ve->addrlen); + GNUNET_assert (fal != NULL); + fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); + fal->validated = GNUNET_YES; + fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time); + periodic_validation_context = GNUNET_malloc(sizeof(struct PeriodicValidationContext)); + periodic_validation_context->foreign_address = fal; + periodic_validation_context->transport = strdup(ve->transport_name); + memcpy(&periodic_validation_context->publicKey, + &ve->publicKey, + sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)); + /* FIXME: this causes all of the revalidation PINGs for the same HELLO + to be transmitted in bulk, which is not nice; also, + triggering these HERE means that revalidations do NOT happen AT ALL + for HELLOs a previous instance of this process validated (since + there is no "initial" validation PING => no revalidation => BUG! */ + fal->revalidate_task = GNUNET_SCHEDULER_add_delayed(sched, + TRANSPORT_DEFAULT_REVALIDATION, + &send_periodic_ping, + periodic_validation_context); + if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value) + n->latency = fal->latency; + else + n->latency.value = (fal->latency.value + n->latency.value) / 2; + n->distance = fal->distance; + if (GNUNET_NO == n->received_pong) + { + notify_clients_connect (&target, n->latency, n->distance); + n->received_pong = GNUNET_YES; + } + if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, + n->retry_task); + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); + } + } + + /* clean up validation entry */ + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (validation_map, + key, + ve)); + GNUNET_SCHEDULER_cancel (sched, + ve->timeout_task); + GNUNET_free (ve->transport_name); + GNUNET_free (ve); + return GNUNET_NO; } @@ -1694,11 +1908,11 @@ cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * (otherwise we may be seeing a MiM attack). * * @param cls closure - * @param name name of the transport that generated the address + * @param message the pong message * @param peer who responded to our challenge - * @param challenge the challenge number we presumably used - * @param sender_addr string describing our sender address (as observed - * by the other peer in human-readable format) + * @param sender_address string describing our sender address (as observed + * by the other peer in binary format) + * @param sender_address_len number of bytes in 'sender_address' */ static void handle_pong (void *cls, const struct GNUNET_MessageHeader *message, @@ -1706,256 +1920,239 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message, const char *sender_address, size_t sender_address_len) { - unsigned int not_done; - int matched; - struct ValidationList *pos; - struct ValidationAddress *va; - struct GNUNET_PeerIdentity id; - const struct TransportPongMessage *pong = (const struct TransportPongMessage *)message; - int count = 0; - unsigned int challenge = ntohl(pong->challenge); - pos = pending_validations; - - while (pos != NULL) - { - GNUNET_CRYPTO_hash (&pos->publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &id.hashPubKey); - if (0 == memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity))) - break; - pos = pos->next; - count++; - } - if (pos == NULL) - { - /* TODO: call statistics (unmatched PONG) */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Received validation response but have no record of any validation request for `%4s' (out of %d). Ignoring.\n"), - GNUNET_i2s (peer), count); - return; - } - not_done = 0; - matched = GNUNET_NO; - va = pos->addresses; - while (va != NULL) - { - if (va->challenge == challenge) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Confirmed validity of address, peer `%4s' has address `%s'.\n", - GNUNET_i2s (peer), - GNUNET_a2s ((const struct sockaddr *) va->peer_address->addr, - va->peer_address->addrlen)); -#endif - GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK, - _ - ("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"), - GNUNET_a2s ((const struct sockaddr *) &pong[1], - ntohs(pong->addrlen)), va->transport_name); - va->ok = GNUNET_YES; - va->expiration = - GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); - matched = GNUNET_YES; - va->peer_address->connected = GNUNET_YES; - va->peer_address->latency = GNUNET_TIME_absolute_get_difference(va->send_time, GNUNET_TIME_absolute_get()); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Confirmed validity of address, peer `%4s' has address `%s', latency of %llu\n", - GNUNET_i2s (peer), - GNUNET_a2s ((const struct sockaddr *) va->peer_address->addr, - va->peer_address->addrlen), (unsigned long long)va->peer_address->latency.value); +#if DEBUG_TRANSPORT > 1 + /* we get tons of these that just get discarded, only log + if we are quite verbose */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Receiving `%s' message from `%4s'.\n", "PONG", + GNUNET_i2s (peer)); #endif - va->peer_address->transmit_ready = GNUNET_YES; - va->peer_address->expires = GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - } - if (va->ok != GNUNET_YES) - not_done++; - va = va->next; - } - if (GNUNET_NO == matched) - { - /* TODO: call statistics (unmatched PONG) */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"), - "PONG", "PING"); - } - if (0 == not_done) - { -#if DEBUG_TRANSPORT + GNUNET_STATISTICS_update (stats, + gettext_noop ("# PONG messages received"), + 1, + GNUNET_NO); + if (GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap_get_multiple (validation_map, + &peer->hashPubKey, + &check_pending_validation, + (void*) message)) + { + /* This is *expected* to happen a lot since we send + PONGs to *all* known addresses of the sender of + the PING, so most likely we get multiple PONGs + per PING, and all but the first PONG will end up + here. So really we should not print anything here + unless we want to be very, very verbose... */ +#if DEBUG_TRANSPORT > 2 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "All addresses validated, will now construct `%s' for `%4s'.\n", - "HELLO", GNUNET_i2s (peer)); + "Received `%s' message from `%4s' but have no record of a matching `%s' message. Ignoring.\n", + "PONG", + GNUNET_i2s (peer), + "PING"); #endif - pos->timeout.value = 0; - GNUNET_SCHEDULER_add_with_priority (sched, - GNUNET_SCHEDULER_PRIORITY_IDLE, - &cleanup_validation, NULL); + return; } +#if 0 + /* FIXME: add given address to potential pool of our addresses + (for voting) */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK, + _("Another peer saw us using the address `%s' via `%s'.\n"), + GNUNET_a2s ((const struct sockaddr *) &pong[1], + ntohs(pong->addrlen)), + va->transport_name); +#endif } -/** - * Add an entry for each of our transport plugins - * (that are able to send) to the list of plugins - * for this neighbor. - * - * @param neighbor to initialize - */ -static void -add_plugins (struct NeighborList *neighbor) -{ - struct TransportPlugin *tp; - struct ReadyList *rl; - - neighbor->retry_plugins_time - = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY); - tp = plugins; - while (tp != NULL) - { - if (tp->api->send != NULL) - { - rl = GNUNET_malloc (sizeof (struct ReadyList)); - rl->next = neighbor->plugins; - neighbor->plugins = rl; - rl->plugin = tp; - rl->neighbor = neighbor; - rl->addresses = NULL; - } - tp = tp->next; - } -} static void -neighbor_timeout_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +neighbour_timeout_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct NeighborList *n = cls; + struct NeighbourList *n = cls; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Neighbor `%4s' has timed out!\n", GNUNET_i2s (&n->id)); + "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id)); #endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# disconnects due to timeout"), + 1, + GNUNET_NO); n->timeout_task = GNUNET_SCHEDULER_NO_TASK; - disconnect_neighbor (n, GNUNET_NO); + disconnect_neighbour (n, GNUNET_NO); } + /** - * Create a fresh entry in our neighbor list for the given peer. - * Will try to transmit our current HELLO to the new neighbor. Also + * Create a fresh entry in our neighbour list for the given peer. + * Will try to transmit our current HELLO to the new neighbour. Also * notifies our clients about the new "connection". * * @param peer the peer for which we create the entry - * @return the new neighbor list entry + * @return the new neighbour list entry */ -static struct NeighborList * -setup_new_neighbor (const struct GNUNET_PeerIdentity *peer) +static struct NeighbourList * +setup_new_neighbour (const struct GNUNET_PeerIdentity *peer) { - struct NeighborList *n; + struct NeighbourList *n; + struct TransportPlugin *tp; + struct ReadyList *rl; GNUNET_assert (our_hello != NULL); - n = GNUNET_malloc (sizeof (struct NeighborList)); - n->next = neighbors; - neighbors = n; + n = GNUNET_malloc (sizeof (struct NeighbourList)); + n->next = neighbours; + neighbours = n; n->id = *peer; - n->last_quota_update = GNUNET_TIME_absolute_get (); n->peer_timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); - add_plugins (n); + GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, + GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, + MAX_BANDWIDTH_CARRY_S); + tp = plugins; + while (tp != NULL) + { + if (tp->api->send != NULL) + { + rl = GNUNET_malloc (sizeof (struct ReadyList)); + rl->next = n->plugins; + n->plugins = rl; + rl->plugin = tp; + rl->addresses = NULL; + } + tp = tp->next; + } + n->latency = GNUNET_TIME_UNIT_FOREVER_REL; + n->distance = -1; n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbor_timeout_task, n); + &neighbour_timeout_task, n); transmit_to_peer (NULL, NULL, 0, + HELLO_ADDRESS_EXPIRATION, (const char *) our_hello, GNUNET_HELLO_size(our_hello), - GNUNET_YES, n); - notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_NO, n); return n; } -static struct PeerAddressList * -add_peer_address(struct NeighborList *neighbor, const char *addr, size_t addrlen) + +/** + * Closure for 'check_address_exists'. + */ +struct CheckAddressExistsClosure { - /* FIXME: should return a list of PeerAddressLists, support for multiple transports! */ - struct ReadyList *head = neighbor->plugins; - struct PeerAddressList * new_address; + /** + * Address to check for. + */ + const void *addr; + + /** + * Name of the transport. + */ + const char *tname; - GNUNET_assert(addr != NULL); + /** + * Length of addr. + */ + size_t addrlen; - new_address = NULL; - while (head != NULL) - { - new_address = GNUNET_malloc(sizeof(struct PeerAddressList)); - new_address->addr = GNUNET_malloc(addrlen); - memcpy(new_address->addr, addr, addrlen); - new_address->addrlen = addrlen; - new_address->connect_attempts = 0; - new_address->connected = GNUNET_YES; /* Set connected to GNUNET_YES, assuming that we're good */ - new_address->expires = GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - new_address->latency = GNUNET_TIME_relative_get_forever(); - new_address->neighbor = neighbor; - new_address->plugin = head->plugin; - new_address->transmit_ready = GNUNET_YES; - new_address->timeout = GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); /* FIXME: Do we need this? */ - new_address->ready_list = head; - new_address->next = head->addresses; - head->addresses = new_address; - head = head->next; - } + /** + * Set to GNUNET_YES if the address exists. + */ + int exists; +}; - return new_address; + +/** + * Iterator over hash map entries. Checks if the given + * validation entry is for the same address as what is given + * in the closure. + * + * @param cls the 'struct CheckAddressExistsClosure*' + * @param key current key code (ignored) + * @param value value in the hash map ('struct ValidationEntry') + * @return GNUNET_YES if we should continue to + * iterate (mismatch), GNUNET_NO if not (entry matched) + */ +static int +check_address_exists (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct CheckAddressExistsClosure *caec = cls; + struct ValidationEntry *ve = value; + if ( (0 == strcmp (caec->tname, + ve->transport_name)) && + (caec->addrlen == ve->addrlen) && + (0 == memcmp (caec->addr, + ve->addr, + caec->addrlen)) ) + { + caec->exists = GNUNET_YES; + return GNUNET_NO; + } + return GNUNET_YES; } -static struct PeerAddressList * -find_peer_address(struct NeighborList *neighbor, const char *addr, size_t addrlen) + +/** + * HELLO validation cleanup task (validation failed). + * + * @param cls the 'struct ValidationEntry' that failed + * @param tc scheduler context (unused) + */ +static void +timeout_hello_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct ReadyList *head = neighbor->plugins; - struct PeerAddressList *address_head; - while (head != NULL) - { - address_head = head->addresses; - while ((address_head != NULL) && - (address_head->addrlen != addrlen) && - (memcmp(address_head->addr, addr, addrlen) != 0)) - { - address_head = address_head->next; - } - if (address_head != NULL) - return address_head; + struct ValidationEntry *va = cls; + struct GNUNET_PeerIdentity pid; - head = head->next; - } - return NULL; + GNUNET_STATISTICS_update (stats, + gettext_noop ("# address validation timeouts"), + 1, + GNUNET_NO); + GNUNET_CRYPTO_hash (&va->publicKey, + sizeof (struct + GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &pid.hashPubKey); + GNUNET_CONTAINER_multihashmap_remove (validation_map, + &pid.hashPubKey, + va); + GNUNET_free (va->transport_name); + GNUNET_free (va); } + /** - * Append the given address to the list of entries - * that need to be validated. + * Check if the given address is already being validated; if not, + * append the given address to the list of entries that are being be + * validated and initiate validation. + * + * @param cls closure ('struct PeriodicValidationContext *') + * @param tname name of the transport + * @param expiration expiration time + * @param addr the address + * @param addrlen length of the address + * @return GNUNET_OK (always) */ static int -run_validation (void *cls, +rerun_validation (void *cls, const char *tname, struct GNUNET_TIME_Absolute expiration, const void *addr, size_t addrlen) { - struct ValidationList *e = cls; - struct TransportPlugin *tp; - struct ValidationAddress *va; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey = cls; struct GNUNET_PeerIdentity id; - struct NeighborList *neighbor; - struct PeerAddressList *peer_address; - int sent; - struct TransportPingMessage *ping; + struct TransportPlugin *tp; + struct ValidationEntry *va; + struct NeighbourList *neighbour; + struct ForeignAddressList *peer_address; + struct TransportPingMessage ping; + /*struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;*/ + struct CheckAddressExistsClosure caec; char * message_buf; - int hello_size; - int tsize; + uint16_t hello_size; + size_t tsize; tp = find_transport (tname); if (tp == NULL) @@ -1967,181 +2164,396 @@ run_validation (void *cls, tname); return GNUNET_OK; } - GNUNET_CRYPTO_hash (&e->publicKey, + + GNUNET_CRYPTO_hash (publicKey, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), &id.hashPubKey); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling validation of address `%s' via `%s' for `%4s'\n", - GNUNET_a2s (addr, addrlen), tname, GNUNET_i2s (&id)); + caec.addr = addr; + caec.addrlen = addrlen; + caec.tname = tname; + caec.exists = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_iterate (validation_map, + &check_address_exists, + &caec); + if (caec.exists == GNUNET_YES) + { + /* During validation attempts we will likely trigger the other + peer trying to validate our address which in turn will cause + it to send us its HELLO, so we expect to hit this case rather + frequently. Only print something if we are very verbose. */ +#if DEBUG_TRANSPORT > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Some validation of address `%s' via `%s' for peer `%4s' already in progress.\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&id)); #endif - va = GNUNET_malloc (sizeof (struct ValidationAddress)); - va->next = e->addresses; - e->addresses = va; + return GNUNET_OK; + } + va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen); va->transport_name = GNUNET_strdup (tname); va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, (unsigned int) -1); va->send_time = GNUNET_TIME_absolute_get(); + va->addr = (const void*) &va[1]; + memcpy (&va[1], addr, addrlen); + va->addrlen = addrlen; + memcpy(&va->publicKey, publicKey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)); + va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, + HELLO_VERIFICATION_TIMEOUT, + &timeout_hello_validation, + va); + GNUNET_CONTAINER_multihashmap_put (validation_map, + &id.hashPubKey, + va, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + neighbour = find_neighbour(&id); + if (neighbour == NULL) + neighbour = setup_new_neighbour(&id); + peer_address = add_peer_address(neighbour, tname, addr, addrlen); + GNUNET_assert(peer_address != NULL); + hello_size = GNUNET_HELLO_size(our_hello); + tsize = sizeof(struct TransportPingMessage) + hello_size; + message_buf = GNUNET_malloc(tsize); + ping.challenge = htonl(va->challenge); + ping.header.size = htons(sizeof(struct TransportPingMessage)); + ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING); + memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity)); + memcpy(message_buf, our_hello, hello_size); + memcpy(&message_buf[hello_size], + &ping, + sizeof(struct TransportPingMessage)); +#if DEBUG_TRANSPORT_REVALIDATION + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Performing re-validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&id), + "HELLO", hello_size, + "PING", sizeof (struct TransportPingMessage)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# PING messages sent for re-validation"), + 1, + GNUNET_NO); + transmit_to_peer (NULL, peer_address, + GNUNET_SCHEDULER_PRIORITY_DEFAULT, + HELLO_VERIFICATION_TIMEOUT, + message_buf, tsize, + GNUNET_YES, neighbour); + GNUNET_free(message_buf); + return GNUNET_OK; +} - neighbor = find_neighbor(&id); - if (neighbor == NULL) - neighbor = setup_new_neighbor(&id); +/** + * Send periodic ping messages to a give foreign address. + * + * cls closure, can be safely cast to ForeignAddressList + * tc task context + * + * FIXME: Since a _billion_ pongs are sent for every ping, + * maybe this should be a special message type or something + * that gets discarded on the other side instead of initiating + * a flood. + */ +static void +send_periodic_ping (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeriodicValidationContext *periodic_validation_context = cls; - peer_address = find_peer_address(neighbor, addr, addrlen); - if (peer_address == NULL) + if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) { - peer_address = add_peer_address(neighbor, addr, addrlen); + GNUNET_free(periodic_validation_context->transport); + GNUNET_free(periodic_validation_context); + return; /* We have been shutdown, don't do anything! */ } + rerun_validation(&periodic_validation_context->publicKey, + periodic_validation_context->transport, + periodic_validation_context->foreign_address->expires, + periodic_validation_context->foreign_address->addr, + periodic_validation_context->foreign_address->addrlen); + GNUNET_free(periodic_validation_context->transport); + GNUNET_free(periodic_validation_context); +} - GNUNET_assert(peer_address != NULL); - va->peer_address = peer_address; /* Back pointer FIXME: remove this nonsense! */ - peer_address->validation = va; +/** + * Check if the given address is already being validated; if not, + * append the given address to the list of entries that are being be + * validated and initiate validation. + * + * @param cls closure ('struct CheckHelloValidatedContext *') + * @param tname name of the transport + * @param expiration expiration time + * @param addr the address + * @param addrlen length of the address + * @return GNUNET_OK (always) + */ +static int +run_validation (void *cls, + const char *tname, + struct GNUNET_TIME_Absolute expiration, + const void *addr, size_t addrlen) +{ + struct CheckHelloValidatedContext *chvc = cls; + struct GNUNET_PeerIdentity id; + struct TransportPlugin *tp; + struct ValidationEntry *va; + struct NeighbourList *neighbour; + struct ForeignAddressList *peer_address; + struct TransportPingMessage ping; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk; + struct CheckAddressExistsClosure caec; + char * message_buf; + uint16_t hello_size; + size_t tsize; + GNUNET_STATISTICS_update (stats, + gettext_noop ("# peer addresses scheduled for validation"), + 1, + GNUNET_NO); + tp = find_transport (tname); + if (tp == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO | + GNUNET_ERROR_TYPE_BULK, + _ + ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"), + tname); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# peer addresses not validated (no applicable transport plugin available)"), + 1, + GNUNET_NO); + return GNUNET_OK; + } + GNUNET_HELLO_get_key (chvc->hello, &pk); + GNUNET_CRYPTO_hash (&pk, + sizeof (struct + GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &id.hashPubKey); + caec.addr = addr; + caec.addrlen = addrlen; + caec.tname = tname; + caec.exists = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_iterate (validation_map, + &check_address_exists, + &caec); + if (caec.exists == GNUNET_YES) + { + /* During validation attempts we will likely trigger the other + peer trying to validate our address which in turn will cause + it to send us its HELLO, so we expect to hit this case rather + frequently. Only print something if we are very verbose. */ +#if DEBUG_TRANSPORT > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Validation of address `%s' via `%s' for peer `%4s' already in progress.\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&id)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# peer addresses not validated (in progress)"), + 1, + GNUNET_NO); + return GNUNET_OK; + } + va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen); + va->transport_name = GNUNET_strdup (tname); + va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + (unsigned int) -1); + va->send_time = GNUNET_TIME_absolute_get(); + va->addr = (const void*) &va[1]; + memcpy (&va[1], addr, addrlen); + va->addrlen = addrlen; + GNUNET_HELLO_get_key (chvc->hello, + &va->publicKey); + va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, + HELLO_VERIFICATION_TIMEOUT, + &timeout_hello_validation, + va); + GNUNET_CONTAINER_multihashmap_put (validation_map, + &id.hashPubKey, + va, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + neighbour = find_neighbour(&id); + if (neighbour == NULL) + neighbour = setup_new_neighbour(&id); + peer_address = add_peer_address(neighbour, tname, addr, addrlen); + GNUNET_assert(peer_address != NULL); hello_size = GNUNET_HELLO_size(our_hello); tsize = sizeof(struct TransportPingMessage) + hello_size; - message_buf = GNUNET_malloc(tsize); - - ping = GNUNET_malloc(sizeof(struct TransportPingMessage)); - ping->challenge = htonl(va->challenge); - ping->header.size = htons(sizeof(struct TransportPingMessage)); - ping->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING); - memcpy(&ping->target, &id, sizeof(struct GNUNET_PeerIdentity)); - -#if DEBUG_TRANSPORT - GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "hello size is %d, ping size is %d, total size is %d\n", hello_size, sizeof(struct TransportPingMessage), tsize); -#endif + ping.challenge = htonl(va->challenge); + ping.header.size = htons(sizeof(struct TransportPingMessage)); + ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING); + memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity)); memcpy(message_buf, our_hello, hello_size); - memcpy(&message_buf[hello_size], ping, sizeof(struct TransportPingMessage)); - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending ping message of size %d to address `%s' via `%s' for `%4s'\n", - tsize, GNUNET_a2s (addr, addrlen), tname, GNUNET_i2s (&id)); -#endif - sent = transmit_to_peer(NULL, peer_address, GNUNET_SCHEDULER_PRIORITY_DEFAULT, - message_buf, tsize, GNUNET_NO, neighbor); - + memcpy(&message_buf[hello_size], + &ping, + sizeof(struct TransportPingMessage)); #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport returned %d from send!\n", sent); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Performing validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&id), + "HELLO", hello_size, + "PING", sizeof (struct TransportPingMessage)); #endif - - GNUNET_free(ping); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# PING messages sent for initial validation"), + 1, + GNUNET_NO); + transmit_to_peer (NULL, peer_address, + GNUNET_SCHEDULER_PRIORITY_DEFAULT, + HELLO_VERIFICATION_TIMEOUT, + message_buf, tsize, + GNUNET_YES, neighbour); GNUNET_free(message_buf); return GNUNET_OK; } -#if WHY -/* - * @param cls handle to the plugin (for sending) - * @param target the peer identity of the peer we are sending to - * @param challenge the challenge number - * @param timeout how long to await validation? - * @param addr the address to validate - * @param addrlen the length of the address - * - * Perform address validation, which means sending a PING PONG to - * the address via the transport plugin. If not validated, then - * do not count this as a good peer/address... + +/** + * Add the given address to the list of foreign addresses + * available for the given peer (check for duplicates). * - * Currently this function is not used, ping/pongs get sent from the - * run_validation function. Haven't decided yet how to do this. + * @param cls the respective 'struct NeighbourList' to update + * @param tname name of the transport + * @param expiration expiration time + * @param addr the address + * @param addrlen length of the address + * @return GNUNET_OK (always) */ -static void -validate_address (void *cls, struct ValidationAddress *va, - const struct GNUNET_PeerIdentity *target, - struct GNUNET_TIME_Relative timeout, - const void *addr, size_t addrlen) +static int +add_to_foreign_address_list (void *cls, + const char *tname, + struct GNUNET_TIME_Absolute expiration, + const void *addr, size_t addrlen) { - /* struct Plugin *plugin = cls; - int challenge = va->challenge; */ + struct NeighbourList *n = cls; + struct ForeignAddressList *fal; + int try; - - return; -} + try = GNUNET_NO; + fal = find_peer_address (n, tname, addr, addrlen); + if (fal == NULL) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&n->id), + expiration.value); #endif + fal = add_peer_address (n, tname, addr, addrlen); + try = GNUNET_YES; + } + if (fal == NULL) + return GNUNET_OK; + fal->expires = GNUNET_TIME_absolute_max (expiration, + fal->expires); + fal->validated = GNUNET_YES; + if (try == GNUNET_YES) + try_transmission_to_peer (n); + return GNUNET_OK; +} + /** * Check if addresses in validated hello "h" overlap with - * those in "chvc->hello" and update "chvc->hello" accordingly, - * removing those addresses that have already been validated. + * those in "chvc->hello" and validate the rest. + * + * @param cls closure + * @param peer id of the peer, NULL for last call + * @param h hello message for the peer (can be NULL) + * @param trust amount of trust we have in the peer (not used) */ static void check_hello_validated (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HELLO_Message *h, uint32_t trust) + const struct GNUNET_HELLO_Message *h, + uint32_t trust) { struct CheckHelloValidatedContext *chvc = cls; - struct ValidationAddress *va; - struct TransportPlugin *tp; - int first_call; - int count; - struct GNUNET_PeerIdentity apeer; + struct GNUNET_HELLO_Message *plain_hello; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk; + struct GNUNET_PeerIdentity target; + struct NeighbourList *n; - first_call = GNUNET_NO; - if (chvc->e == NULL) + if (peer == NULL) { chvc->piter = NULL; - first_call = GNUNET_YES; - chvc->e = GNUNET_malloc (sizeof (struct ValidationList)); - GNUNET_assert (GNUNET_OK == - GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello, - &chvc->e->publicKey)); - chvc->e->timeout = - GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT); - chvc->e->next = pending_validations; - pending_validations = chvc->e; - } - - if (h != NULL) - { - GNUNET_HELLO_iterate_new_addresses (chvc->hello, - h, - GNUNET_TIME_absolute_get (), - &run_validation, chvc->e); - } - else if (GNUNET_YES == first_call) - { - /* no existing HELLO, all addresses are new */ - GNUNET_HELLO_iterate_addresses (chvc->hello, - GNUNET_NO, &run_validation, chvc->e); - } - - if (h != NULL) - return; /* wait for next call */ - /* finally, transmit validation attempts */ - GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_id (chvc->hello, &apeer)); - - va = chvc->e->addresses; - count = 0; - while (va != NULL) - { + GNUNET_CONTAINER_DLL_remove (chvc_head, + chvc_tail, + chvc); + if (GNUNET_NO == chvc->hello_known) + { + /* notify PEERINFO about the peer now, so that we at least + have the public key if some other component needs it */ + GNUNET_HELLO_get_key (chvc->hello, &pk); + GNUNET_CRYPTO_hash (&pk, + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &target.hashPubKey); + plain_hello = GNUNET_HELLO_create (&pk, + NULL, + NULL); + GNUNET_PEERINFO_add_peer (cfg, sched, &target, plain_hello); + GNUNET_free (plain_hello); #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Establishing `%s' connection to validate `%s' address `%s' of `%4s'\n", - va->transport_name, - "HELLO", - GNUNET_a2s ((const struct sockaddr *) va->peer_address->addr, - va->peer_address->addrlen), GNUNET_i2s (&apeer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peerinfo had no `%s' message for peer `%4s', full validation needed.\n", + "HELLO", + GNUNET_i2s (&target)); #endif - tp = find_transport (va->transport_name); - GNUNET_assert (tp != NULL); - /* This validation should happen inside the transport, not from the plugin! */ - va->ok = GNUNET_SYSERR; - va = va->next; - count++; - } - - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_absolute_get_remaining (chvc-> - e->timeout), - &cleanup_validation, NULL); - GNUNET_free (chvc); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# new HELLOs requiring full validation"), + 1, + GNUNET_NO); + GNUNET_HELLO_iterate_addresses (chvc->hello, + GNUNET_NO, + &run_validation, + chvc); + } + else + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# duplicate HELLO (peer known)"), + 1, + GNUNET_NO); + } + GNUNET_free (chvc); + return; + } + if (h == NULL) + return; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peerinfo had `%s' message for peer `%4s', validating only new addresses.\n", + "HELLO", + GNUNET_i2s (peer)); +#endif + chvc->hello_known = GNUNET_YES; + n = find_neighbour (peer); + if (n != NULL) + { + GNUNET_HELLO_iterate_addresses (h, + GNUNET_NO, + &add_to_foreign_address_list, + n); + try_transmission_to_peer (n); + } + GNUNET_HELLO_iterate_new_addresses (chvc->hello, + h, + GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME), + &run_validation, + chvc); } - /** * Process HELLO-message. * @@ -2153,7 +2565,6 @@ static int process_hello (struct TransportPlugin *plugin, const struct GNUNET_MessageHeader *message) { - struct ValidationList *e; uint16_t hsize; struct GNUNET_PeerIdentity target; const struct GNUNET_HELLO_Message *hello; @@ -2168,9 +2579,13 @@ process_hello (struct TransportPlugin *plugin, return GNUNET_SYSERR; } /* first, check if load is too high */ - if (GNUNET_OS_load_cpu_get (cfg) > 100) + if (GNUNET_SCHEDULER_get_load (sched, + GNUNET_SCHEDULER_PRIORITY_BACKGROUND) > MAX_HELLO_LOAD) { - /* TODO: call to stats? */ + GNUNET_STATISTICS_update (stats, + gettext_noop ("# HELLOs ignored due to high load"), + 1, + GNUNET_NO); return GNUNET_OK; } hello = (const struct GNUNET_HELLO_Message *) message; @@ -2182,42 +2597,29 @@ process_hello (struct TransportPlugin *plugin, GNUNET_CRYPTO_hash (&publicKey, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), &target.hashPubKey); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Processing `%s' message for `%4s' of size %d (hsize is %d)\n", - "HELLO", GNUNET_i2s (&target), GNUNET_HELLO_size(hello), hsize); -#endif - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Notifying peerinfo about peer %s\n", - GNUNET_i2s (&target)); -#endif - - /* check if a HELLO for this peer is already on the validation list */ - e = pending_validations; - while (e != NULL) + if (0 == memcmp (&my_identity, + &target, + sizeof (struct GNUNET_PeerIdentity))) { - if (0 == memcmp (&e->publicKey, - &publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded))) - { - /* TODO: call to stats? */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s' message for peer `%4s' is already pending; ignoring new message\n", - "HELLO", GNUNET_i2s (&target)); -#endif - return GNUNET_OK; - } - e = e->next; + GNUNET_STATISTICS_update (stats, + gettext_noop ("# HELLOs ignored for validation (is my own HELLO)"), + 1, + GNUNET_NO); + return GNUNET_OK; } +#if DEBUG_TRANSPORT > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Processing `%s' message for `%4s' of size %u\n", + "HELLO", + GNUNET_i2s (&target), + GNUNET_HELLO_size(hello)); +#endif chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize); - chvc->plugin = plugin; - chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1]; - chvc->e = NULL; - memcpy (chvc->hello, hello, hsize); + chvc->hello = (const struct GNUNET_HELLO_Message *) &chvc[1]; + memcpy (&chvc[1], hello, hsize); + GNUNET_CONTAINER_DLL_insert (chvc_head, + chvc_tail, + chvc); /* finally, check if HELLO was previously validated (continuation will then schedule actual validation) */ chvc->piter = GNUNET_PEERINFO_iterate (cfg, @@ -2231,40 +2633,27 @@ process_hello (struct TransportPlugin *plugin, /** - * The peer specified by the given neighbor has timed-out or a plugin + * The peer specified by the given neighbour has timed-out or a plugin * has disconnected. We may either need to do nothing (other plugins * still up), or trigger a full disconnect and clean up. This * function updates our state and does the necessary notifications. - * Also notifies our clients that the neighbor is now officially + * Also notifies our clients that the neighbour is now officially * gone. * - * @param n the neighbor list entry for the peer + * @param n the neighbour list entry for the peer * @param check should we just check if all plugins * disconnected or must we ask all plugins to * disconnect? */ static void -disconnect_neighbor (struct NeighborList *current_handle, int check) +disconnect_neighbour (struct NeighbourList *n, int check) { struct ReadyList *rpos; - struct NeighborList *npos; - struct NeighborList *nprev; - struct NeighborList *n; + struct NeighbourList *npos; + struct NeighbourList *nprev; struct MessageQueue *mq; - struct PeerAddressList *peer_addresses; - struct PeerAddressList *peer_pos; - - if (neighbors == NULL) - return; /* We don't have any neighbors, so client has an already removed handle! */ - - npos = neighbors; - while ((npos != NULL) && (current_handle != npos)) - npos = npos->next; - - if (npos == NULL) - return; /* Couldn't find neighbor in existing list, must have been already removed! */ - else - n = npos; + struct ForeignAddressList *peer_addresses; + struct ForeignAddressList *peer_pos; if (GNUNET_YES == check) { @@ -2281,14 +2670,14 @@ disconnect_neighbor (struct NeighborList *current_handle, int check) rpos = rpos->next; } } - #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id)); + "Disconnecting from `%4s'\n", + GNUNET_i2s (&n->id)); #endif - /* remove n from neighbors list */ + /* remove n from neighbours list */ nprev = NULL; - npos = neighbors; + npos = neighbours; while ((npos != NULL) && (npos != n)) { nprev = npos; @@ -2296,72 +2685,85 @@ disconnect_neighbor (struct NeighborList *current_handle, int check) } GNUNET_assert (npos != NULL); if (nprev == NULL) - neighbors = n->next; + neighbours = n->next; else nprev->next = n->next; /* notify all clients about disconnect */ - notify_clients_disconnect (&n->id); + if (GNUNET_YES == n->received_pong) + notify_clients_disconnect (&n->id); /* clean up all plugins, cancel connections and pending transmissions */ while (NULL != (rpos = n->plugins)) { n->plugins = rpos->next; - GNUNET_assert (rpos->neighbor == n); - if (GNUNET_YES == rpos->connected) - rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id); + rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id); while (rpos->addresses != NULL) { peer_pos = rpos->addresses; rpos->addresses = peer_pos->next; - GNUNET_free(peer_pos->addr); GNUNET_free(peer_pos); } GNUNET_free (rpos); } /* free all messages on the queue */ - while (NULL != (mq = n->messages)) - { - n->messages = mq->next; - GNUNET_assert (0 == memcmp(mq->neighbor_id, &n->id, sizeof(struct GNUNET_PeerIdentity))); - GNUNET_free (mq->neighbor_id); + while (NULL != (mq = n->messages_head)) + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes in message queue for other peers"), + -mq->message_buf_size, + GNUNET_NO); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes discarded due to disconnect"), + mq->message_buf_size, + GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (n->messages_head, + n->messages_tail, + mq); + GNUNET_assert (0 == memcmp(&mq->neighbour_id, + &n->id, + sizeof(struct GNUNET_PeerIdentity))); GNUNET_free (mq); } if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (sched, n->timeout_task); + { + GNUNET_SCHEDULER_cancel (sched, n->timeout_task); + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, n->retry_task); + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + } /* finally, free n itself */ GNUNET_free (n); } -/* +/** * We have received a PING message from someone. Need to send a PONG message - * in response to the peer by any means necessary. Of course, with something - * like TCP where a connection exists, we may want to send it that way. But - * we may not be able to make that distinction... - */ -static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - const char *sender_address, - size_t sender_address_len) + * in response to the peer by any means necessary. + * + * FIXME: With something like TCP where a connection exists, we may + * want to send it that way. But the current API does not seem to + * allow us to do so (can't tell this to the transport!) + */ +static int +handle_ping(void *cls, const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *sender_address, + size_t sender_address_len) { struct TransportPlugin *plugin = cls; struct TransportPingMessage *ping; struct TransportPongMessage *pong; - struct PeerAddressList *peer_address; - uint16_t msize; - struct NeighborList *n; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Processing `%s' from `%s'\n", - "PING", GNUNET_a2s ((const struct sockaddr *)sender_address, sender_address_len)); -#endif + struct NeighbourList *n; + struct ReadyList *rl; + struct ForeignAddressList *fal; - msize = ntohs (message->size); - if (msize < sizeof (struct TransportPingMessage)) + if (ntohs (message->size) != sizeof (struct TransportPingMessage)) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -2372,12 +2774,21 @@ static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message, sizeof (struct GNUNET_PeerIdentity))) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Received `%s' message not destined for me!\n"), "PING"); + _("Received `%s' message not destined for me!\n"), + "PING"); return GNUNET_SYSERR; } - - msize -= sizeof (struct TransportPingMessage); - +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "Processing `%s' from `%s'\n", + "PING", + GNUNET_a2s ((const struct sockaddr *)sender_address, + sender_address_len)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# PING messages received"), + 1, + GNUNET_NO); pong = GNUNET_malloc (sizeof (struct TransportPongMessage) + sender_address_len); pong->header.size = htons (sizeof (struct TransportPongMessage) + sender_address_len); pong->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_PONG); @@ -2388,33 +2799,40 @@ static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message, pong->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_TCP_PING); pong->challenge = ping->challenge; pong->addrlen = htons(sender_address_len); - - memcpy(&pong->signer, &my_public_key, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)); + memcpy(&pong->signer, + &my_public_key, + sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)); memcpy (&pong[1], sender_address, sender_address_len); GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_rsa_sign (my_private_key, &pong->purpose, &pong->signature)); - n = find_neighbor(peer); + n = find_neighbour(peer); if (n == NULL) - n = setup_new_neighbor(peer); - - peer_address = find_peer_address(n, sender_address, sender_address_len); - if (peer_address == NULL) - peer_address = add_peer_address(n, sender_address, sender_address_len); - - peer_address->timeout = GNUNET_TIME_relative_to_absolute(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - - /* We don't use the peer_address because the address we received the message from may not - * be a reliable way to send it back! We add it to the list which should queue up a separate - * ping to determine if the address is viable. - */ - transmit_to_peer(NULL, NULL, TRANSPORT_DEFAULT_PRIORITY, (char *)pong, ntohs(pong->header.size), GNUNET_NO, n); - + n = setup_new_neighbour(peer); + /* broadcast 'PONG' to all available addresses */ + rl = n->plugins; + while (rl != NULL) + { + fal = rl->addresses; + while (fal != NULL) + { + transmit_to_peer(NULL, fal, + TRANSPORT_PONG_PRIORITY, + HELLO_VERIFICATION_TIMEOUT, + (const char *)pong, + ntohs(pong->header.size), + GNUNET_YES, + n); + fal = fal->next; + } + rl = rl->next; + } GNUNET_free(pong); return GNUNET_OK; } + /** * Function called by the plugin for each received message. * Update data volumes, possibly notify plugins about @@ -2422,18 +2840,16 @@ static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message, * and generally forward to our receive callback. * * @param cls the "struct TransportPlugin *" we gave to the plugin - * @param message the message, NULL if peer was disconnected - * @param distance the transport cost to this peer (not latency!) - * @param sender_address the address that the sender reported - * (opaque to transport service) - * @param sender_address_len the length of the sender address * @param peer (claimed) identity of the other peer - * @return the new service_context that the plugin should use - * for future receive calls for messages from this - * particular peer - * - */ -static void + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again + * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) + * @param sender_address binary address of the sender (if observed) + * @param sender_address_len number of bytes in sender_address + * @return how long the plugin should wait until receiving more data + * (plugins that do not support this, can ignore the return value) + */ +static struct GNUNET_TIME_Relative plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message, unsigned int distance, const char *sender_address, @@ -2443,136 +2859,128 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, struct TransportPlugin *plugin = cls; struct TransportClient *cpos; struct InboundMessage *im; - struct PeerAddressList *peer_address; + struct ForeignAddressList *peer_address; uint16_t msize; - struct NeighborList *n; + struct NeighbourList *n; + struct GNUNET_TIME_Relative ret; - n = find_neighbor (peer); + n = find_neighbour (peer); if (n == NULL) - { - if (message == NULL) - return; /* disconnect of peer already marked down */ - n = setup_new_neighbor (peer); - - } - - peer_address = find_peer_address(n, sender_address, sender_address_len); - if (peer_address == NULL) - peer_address = add_peer_address(n, sender_address, sender_address_len); - + n = setup_new_neighbour (peer); service_context = n->plugins; while ((service_context != NULL) && (plugin != service_context->plugin)) service_context = service_context->next; GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL)); - if (message == NULL) - { + if (message != NULL) + { + peer_address = add_peer_address(n, + plugin->short_name, + sender_address, + sender_address_len); + if (peer_address != NULL) + { + peer_address->distance = distance; + if (peer_address->connected == GNUNET_NO) + { + peer_address->connected = GNUNET_YES; + peer_address->connect_attempts++; + } + peer_address->timeout + = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + } + /* update traffic received amount ... */ + msize = ntohs (message->size); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bytes received from other peers"), + msize, + GNUNET_NO); + n->distance = distance; + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_SCHEDULER_cancel (sched, + n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) + { + /* dropping message due to frequent inbound volume violations! */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING | + GNUNET_ERROR_TYPE_BULK, + _ + ("Dropping incoming message due to repeated bandwidth quota (%u b/s) violations (total of %u).\n"), + n->in_tracker.available_bytes_per_s__, + n->quota_violation_count); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# bandwidth quota violations by other peers"), + 1, + GNUNET_NO); + return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */ + } + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_HELLO: + GNUNET_STATISTICS_update (stats, + gettext_noop ("# HELLO messages received from other peers"), + 1, + GNUNET_NO); + process_hello (plugin, message); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: + handle_ping(plugin, message, peer, sender_address, sender_address_len); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: + handle_pong(plugin, message, peer, sender_address, sender_address_len); + break; + default: #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Receive failed from `%4s', triggering disconnect\n", - GNUNET_i2s (&n->id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s', sending to all clients.\n", + ntohs (message->type), GNUNET_i2s (peer)); #endif - /* TODO: call stats */ - if (service_context != NULL) - service_context->connected = GNUNET_NO; - disconnect_neighbor (n, GNUNET_YES); - return; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Processing message of type `%u' received by plugin...\n", - ntohs (message->type)); -#endif - if (service_context != NULL) - { - if (service_context->connected == GNUNET_NO) - { - /*service_context->connected = GNUNET_YES;*/ - /* FIXME: What to do here? Should we use these as well, to specify some Address - * in the AddressList should be available? - */ - peer_address->transmit_ready = GNUNET_YES; - peer_address->connect_attempts++; - } - peer_address->timeout - = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - } - /* update traffic received amount ... */ - msize = ntohs (message->size); - n->last_received += msize; - GNUNET_SCHEDULER_cancel (sched, n->timeout_task); - n->peer_timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbor_timeout_task, n); - update_quota (n); - if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) - { - /* dropping message due to frequent inbound volume violations! */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING | - GNUNET_ERROR_TYPE_BULK, - _ - ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count); - /* TODO: call stats */ - GNUNET_assert ((service_context == NULL) || - (NULL != service_context->neighbor)); - - return; - } - switch (ntohs (message->type)) + if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, + msize)) + n->quota_violation_count++; + else + n->quota_violation_count = 0; /* back within limits */ + GNUNET_STATISTICS_update (stats, + gettext_noop ("# payload received from other peers"), + msize, + GNUNET_NO); + /* transmit message to all clients */ + im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); + im->header.size = htons (sizeof (struct InboundMessage) + msize); + im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + im->latency = GNUNET_TIME_relative_hton (n->latency); + im->peer = *peer; + memcpy (&im[1], message, msize); + cpos = clients; + while (cpos != NULL) + { + transmit_to_client (cpos, &im->header, GNUNET_YES); + cpos = cpos->next; + } + GNUNET_free (im); + } + } + ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0); + if (ret.value > 0) { - case GNUNET_MESSAGE_TYPE_HELLO: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving `%s' message from `%4s'.\n", "HELLO", - GNUNET_i2s (peer)); -#endif - process_hello (plugin, message); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving `%s' message from `%4s'.\n", "PING", - GNUNET_i2s (peer)); -#endif - handle_ping(plugin, message, peer, sender_address, sender_address_len); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving `%s' message from `%4s'.\n", "PONG", - GNUNET_i2s (peer)); -#endif - handle_pong(plugin, message, peer, sender_address, sender_address_len); - break; - default: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received REAL MESSAGE type %u from `%4s', sending to all clients.\n", - ntohs (message->type), GNUNET_i2s (peer)); -#endif - /* transmit message to all clients */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); - im->header.size = htons (sizeof (struct InboundMessage) + msize); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = n->latency; - im->peer = *peer; - memcpy (&im[1], message, msize); - - cpos = clients; - while (cpos != NULL) - { - transmit_to_client (cpos, &im->header, GNUNET_YES); - cpos = cpos->next; - } - GNUNET_free (im); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Throttling read (%llu bytes excess at %u b/s), waiting %llums before reading more.\n", + (unsigned long long) n->in_tracker.consumption_since_last_update__, + (unsigned int) n->in_tracker.available_bytes_per_s__, + (unsigned long long) ret.value); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# ms throttling suggested"), + (int64_t) ret.value, + GNUNET_NO); } - GNUNET_assert ((service_context == NULL) || - (NULL != service_context->neighbor)); + return ret; } @@ -2591,9 +2999,7 @@ handle_start (void *cls, { struct TransportClient *c; struct ConnectInfoMessage cim; - struct NeighborList *n; - struct InboundMessage *im; - struct GNUNET_MessageHeader *ack; + struct NeighbourList *n; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2627,33 +3033,18 @@ handle_start (void *cls, /* tell new client about all existing connections */ cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = - htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); - cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.size = htons (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ - ack = (struct GNUNET_MessageHeader *) &im[1]; - ack->size = htons (sizeof (struct GNUNET_MessageHeader)); - ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK); - for (n = neighbors; n != NULL; n = n->next) - { - cim.id = n->id; - transmit_to_client (c, &cim.header, GNUNET_NO); - if (n->received_pong) - { - im->peer = n->id; - transmit_to_client (c, &im->header, GNUNET_NO); + n = neighbours; + while (n != NULL) + { + if (GNUNET_YES == n->received_pong) + { + cim.id = n->id; + cim.latency = GNUNET_TIME_relative_hton (n->latency); + cim.distance = htonl (n->distance); + transmit_to_client (c, &cim.header, GNUNET_NO); } + n = n->next; } - GNUNET_free (im); - } - else - { - fprintf(stderr, "Our hello is NULL!\n"); } GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2673,10 +3064,10 @@ handle_hello (void *cls, { int ret; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client\n", "HELLO"); -#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# HELLOs received from clients"), + 1, + GNUNET_NO); ret = process_hello (NULL, message); GNUNET_SERVER_receive_done (client, ret); } @@ -2695,7 +3086,7 @@ handle_send (void *cls, const struct GNUNET_MessageHeader *message) { struct TransportClient *tc; - struct NeighborList *n; + struct NeighbourList *n; const struct OutboundMessage *obm; const struct GNUNET_MessageHeader *obmm; uint16_t size; @@ -2709,6 +3100,10 @@ handle_send (void *cls, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } + GNUNET_STATISTICS_update (stats, + gettext_noop ("# payload received for other peers"), + size, + GNUNET_NO); obm = (const struct OutboundMessage *) message; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2723,9 +3118,9 @@ handle_send (void *cls, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - n = find_neighbor (&obm->peer); + n = find_neighbour (&obm->peer); if (n == NULL) - n = setup_new_neighbor (&obm->peer); /* But won't ever add address, we have none! */ + n = setup_new_neighbour (&obm->peer); tc = clients; while ((tc != NULL) && (tc->client != client)) tc = tc->next; @@ -2736,7 +3131,10 @@ handle_send (void *cls, ntohs (obmm->size), ntohs (obmm->type), GNUNET_i2s (&obm->peer)); #endif - transmit_to_peer (tc, NULL, ntohl (obm->priority), (char *)obmm, ntohs (obmm->size), GNUNET_NO, n); + transmit_to_peer (tc, NULL, ntohl (obm->priority), + GNUNET_TIME_relative_ntoh (obm->timeout), + (char *)obmm, + ntohs (obmm->size), GNUNET_NO, n); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2755,78 +3153,38 @@ handle_set_quota (void *cls, { const struct QuotaSetMessage *qsm = (const struct QuotaSetMessage *) message; - struct NeighborList *n; - struct TransportPlugin *p; - struct ReadyList *rl; - - n = find_neighbor (&qsm->peer); + struct NeighbourList *n; + + GNUNET_STATISTICS_update (stats, + gettext_noop ("# SET QUOTA messages received"), + 1, + GNUNET_NO); + n = find_neighbour (&qsm->peer); if (n == NULL) { GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_STATISTICS_update (stats, + gettext_noop ("# SET QUOTA messages ignored (no such peer)"), + 1, + GNUNET_NO); return; } - #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n", - "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer)); + "SET_QUOTA", + (unsigned int) ntohl (qsm->quota.value__), + (unsigned int) n->in_tracker.available_bytes_per_s__, + GNUNET_i2s (&qsm->peer)); #endif - - update_quota (n); - if (n->quota_in < ntohl (qsm->quota_in)) - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_in = ntohl (qsm->quota_in); - rl = n->plugins; - while (rl != NULL) - { - p = rl->plugin; - p->api->set_receive_quota (p->api->cls, - &qsm->peer, ntohl (qsm->quota_in)); - rl = rl->next; - } + GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, + qsm->quota); + if (0 == ntohl (qsm->quota.value__)) + disconnect_neighbour (n, GNUNET_NO); GNUNET_SERVER_receive_done (client, GNUNET_OK); } -/** - * Handle TRY_CONNECT-message. - * - * @param cls closure (always NULL) - * @param client identification of the client - * @param message the actual message - */ -static void -handle_try_connect (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct TryConnectMessage *tcm; - struct NeighborList *neighbor; - tcm = (const struct TryConnectMessage *) message; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client %p asking to connect to `%4s'\n", - "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer)); -#endif - neighbor = find_neighbor(&tcm->peer); - - if (neighbor == NULL) - setup_new_neighbor (&tcm->peer); - else - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client asked to connect to `%4s', but connection already exists\n", - "TRY_CONNECT", GNUNET_i2s (&tcm->peer)); -#endif - transmit_to_peer (NULL, NULL, 0, - (const char *) our_hello, GNUNET_HELLO_size(our_hello), - GNUNET_YES, neighbor); - notify_clients_connect (&tcm->peer, GNUNET_TIME_UNIT_FOREVER_REL); - } - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - static void transmit_address_to_client (void *cls, const char *address) { @@ -2843,6 +3201,7 @@ transmit_address_to_client (void *cls, const char *address) GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); } + /** * Handle AddressLookup-message. * @@ -2919,9 +3278,6 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0}, {&handle_set_quota, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)}, - {&handle_try_connect, NULL, - GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT, - sizeof (struct TryConnectMessage)}, {&handle_address_lookup, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP, 0}, @@ -2941,9 +3297,8 @@ create_environment (struct TransportPlugin *plug) plug->env.cls = plug; plug->env.receive = &plugin_env_receive; plug->env.notify_address = &plugin_env_notify_address; - plug->env.default_quota_in = - (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); plug->env.max_connections = max_connect_per_transport; + plug->env.stats = stats; } @@ -3010,10 +3365,12 @@ client_disconnect_notification (void *cls, return; while (NULL != (mqe = pos->message_queue_head)) { - pos->message_queue_head = mqe->next; + GNUNET_CONTAINER_DLL_remove (pos->message_queue_head, + pos->message_queue_tail, + mqe); + pos->message_count--; GNUNET_free (mqe); } - pos->message_queue_head = NULL; if (prev == NULL) clients = pos->next; else @@ -3023,22 +3380,54 @@ client_disconnect_notification (void *cls, pos->client = NULL; return; } + if (pos->th != NULL) + { + GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); + pos->th = NULL; + } + GNUNET_break (0 == pos->message_count); GNUNET_free (pos); } /** - * Function called when the service shuts down. Unloads our plugins. + * Iterator to free entries in the validation_map. + * + * @param cls closure (unused) + * @param key current key code + * @param value value in the hash map (validation to abort) + * @return GNUNET_YES (always) + */ +static int +abort_validation (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct ValidationEntry *va = value; + + GNUNET_SCHEDULER_cancel (sched, va->timeout_task); + GNUNET_free (va->transport_name); + GNUNET_free (va); + return GNUNET_YES; +} + + +/** + * Function called when the service shuts down. Unloads our plugins + * and cancels pending validations. * * @param cls closure, unused * @param tc task context (unused) */ static void -unload_plugins (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct TransportPlugin *plug; - struct AddressList *al; + struct OwnAddressList *al; + struct CheckHelloValidatedContext *chvc; + while (neighbours != NULL) + disconnect_neighbour (neighbours, GNUNET_NO); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport service is unloading plugins...\n"); @@ -3059,6 +3448,26 @@ unload_plugins (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) if (my_private_key != NULL) GNUNET_CRYPTO_rsa_key_free (my_private_key); GNUNET_free_non_null (our_hello); + + /* free 'chvc' data structure */ + while (NULL != (chvc = chvc_head)) + { + chvc_head = chvc->next; + GNUNET_PEERINFO_iterate_cancel (chvc->piter); + GNUNET_free (chvc); + } + chvc_tail = NULL; + + GNUNET_CONTAINER_multihashmap_iterate (validation_map, + &abort_validation, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (validation_map); + validation_map = NULL; + if (stats != NULL) + { + GNUNET_STATISTICS_destroy (stats, GNUNET_NO); + stats = NULL; + } } @@ -3084,6 +3493,8 @@ run (void *cls, sched = s; cfg = c; + stats = GNUNET_STATISTICS_create (sched, "transport", cfg); + validation_map = GNUNET_CONTAINER_multihashmap_create (64); /* parse configuration */ if ((GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c, @@ -3099,6 +3510,13 @@ run (void *cls, _ ("Transport service is lacking key configuration settings. Exiting.\n")); GNUNET_SCHEDULER_shutdown (s); + if (stats != NULL) + { + GNUNET_STATISTICS_destroy (stats, GNUNET_NO); + stats = NULL; + } + GNUNET_CONTAINER_multihashmap_destroy (validation_map); + validation_map = NULL; return; } max_connect_per_transport = (uint32_t) tneigh; @@ -3110,6 +3528,13 @@ run (void *cls, _ ("Transport service could not access hostkey. Exiting.\n")); GNUNET_SCHEDULER_shutdown (s); + if (stats != NULL) + { + GNUNET_STATISTICS_destroy (stats, GNUNET_NO); + stats = NULL; + } + GNUNET_CONTAINER_multihashmap_destroy (validation_map); + validation_map = NULL; return; } GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key); @@ -3138,7 +3563,7 @@ run (void *cls, } GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_UNIT_FOREVER_REL, - &unload_plugins, NULL); + &shutdown_task, NULL); if (no_transports) refresh_hello ();