X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fgnunet-service-transport.c;h=d654fcc8cfc89c55cd5fe4c60f31f11d682b3d7b;hb=9d2cd5be986a0732b9e5a8fcdf5acd1c7225ef2f;hp=ca1d376da494b234a28799016c5ad7fd45727256;hpb=45f0267ad9a0e19c0445e0a2e228aab438bf517f;p=oweals%2Fgnunet.git diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index ca1d376da..d654fcc8c 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -23,12 +23,15 @@ * @brief low-level P2P messaging * @author Christian Grothoff * - * TODO: - * - if we do not receive an ACK in response to our - * HELLO, retransmit HELLO! + * NOTE: + * - This code uses 'GNUNET_a2s' for debug printing in many places, + * which is technically wrong since it assumes we have IP+Port + * (v4/v6) addresses. Once we add transports like http or smtp + * this will have to be changed! */ #include "platform.h" #include "gnunet_client_lib.h" +#include "gnunet_container_lib.h" #include "gnunet_constants.h" #include "gnunet_getopt_lib.h" #include "gnunet_hello_lib.h" @@ -41,6 +44,12 @@ #include "plugin_transport.h" #include "transport.h" +/** + * Should we do some additional checks (to validate behavior + * of clients)? + */ +#define EXTRA_CHECKS GNUNET_YES + /** * How many messages can we have pending for a given client process * before we start to drop incoming messages? We typically should @@ -59,6 +68,13 @@ */ #define MAX_CONNECT_RETRY 3 +/** + * Limit on the number of ready-to-run tasks when validating + * HELLOs. If more tasks are ready to run, we will drop + * HELLOs instead of validating them. + */ +#define MAX_HELLO_LOAD 4 + /** * How often must a peer violate bandwidth quotas before we start * to simply drop its messages? @@ -73,14 +89,17 @@ * Besides, if a single request to an address takes a long time, * then the peer is unlikely worthwhile anyway. */ -#define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3) +#define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) /** * How long will we allow sending of a ping to be delayed? */ #define TRANSPORT_DEFAULT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15) -#define TRANSPORT_DEFAULT_PRIORITY 4 /* Tired of remembering arbitrary priority names */ +/** + * Priority to use for PONG messages. + */ +#define TRANSPORT_PONG_PRIORITY 4 /** * How often do we re-add (cheaper) plugins to our list of plugins @@ -89,29 +108,120 @@ #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15) /** - * After how long do we expire an address in a HELLO - * that we just validated? This value is also used - * for our own addresses when we create a HELLO. + * After how long do we expire an address in a HELLO that we just + * validated? This value is also used for our own addresses when we + * create a HELLO. */ #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12) /** - * Entry in linked list of network addresses. + * How long before an existing address expires should we again try to + * validate it? Must be (significantly) smaller than + * HELLO_ADDRESS_EXPIRATION. + */ +#define HELLO_REVALIDATION_START_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 1) + + +/** + * List of addresses of other peers + */ +struct ForeignAddressList +{ + /** + * This is a linked list. + */ + struct ForeignAddressList *next; + + /** + * Which ready list does this entry belong to. + */ + struct ReadyList *ready_list; + + /** + * How long until we auto-expire this address (unless it is + * re-confirmed by the transport)? + */ + struct GNUNET_TIME_Absolute expires; + + /** + * Length of addr. + */ + size_t addrlen; + + /** + * The address. + */ + const void *addr; + + /** + * What was the last latency observed for this plugin + * and peer? Invalid if connected is GNUNET_NO. + */ + struct GNUNET_TIME_Relative latency; + + /** + * If we did not successfully transmit a message to the given peer + * via this connection during the specified time, we should consider + * the connection to be dead. This is used in the case that a TCP + * transport simply stalls writing to the stream but does not + * formerly get a signal that the other peer died. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Are we currently connected via this address? The first time we + * successfully transmit or receive data to a peer via a particular + * address, we set this to GNUNET_YES. If we later get an error + * (disconnect notification, transmission failure, timeout), we set + * it back to GNUNET_NO. + */ + int connected; + + /** + * Is this plugin currently busy transmitting to the specific target? + * GNUNET_NO if not (initial, default state is GNUNET_NO). Internal + * messages do not count as 'in transmit'. + */ + int in_transmit; + + /** + * Has this address been validated yet? + */ + int validated; + + /** + * How often have we tried to connect using this plugin? Used to + * discriminate against addresses that do not work well. + * FIXME: not yet used, but should be! + */ + unsigned int connect_attempts; + + /** + * DV distance to this peer (1 if no DV is used). + * FIXME: need to set this from transport plugins! + */ + uint32_t distance; + +}; + + +/** + * Entry in linked list of network addresses for ourselves. */ -struct AddressList +struct OwnAddressList { /** * This is a linked list. */ - struct AddressList *next; + struct OwnAddressList *next; /** * The address, actually a pointer to the end * of this struct. Do not free! */ - void *addr; - + const void *addr; + /** * How long until we auto-expire this address (unless it is * re-confirmed by the transport)? @@ -156,7 +266,7 @@ struct TransportPlugin /** * List of our known addresses for this transport. */ - struct AddressList *addresses; + struct OwnAddressList *addresses; /** * Environment this transport service is using @@ -169,7 +279,6 @@ struct TransportPlugin */ GNUNET_SCHEDULER_TaskIdentifier address_update_task; - /** * Set to GNUNET_YES if we need to scrap the existing * list of "addresses" and start fresh when we receive @@ -178,38 +287,55 @@ struct TransportPlugin * to the list and wait for the commit call. */ int rebuild; + }; -struct NeighborList; +struct NeighbourList; /** - * For each neighbor we keep a list of messages - * that we still want to transmit to the neighbor. + * For each neighbour we keep a list of messages + * that we still want to transmit to the neighbour. */ struct MessageQueue { /** - * This is a linked list. + * This is a doubly linked list. */ struct MessageQueue *next; /** - * The message we want to transmit. + * This is a doubly linked list. + */ + struct MessageQueue *prev; + + /** + * The message(s) we want to transmit, GNUNET_MessageHeader(s) + * stuck together in memory. Allocated at the end of this struct. + */ + const char *message_buf; + + /** + * Size of the message buf */ - struct GNUNET_MessageHeader *message; + size_t message_buf_size; /** * Client responsible for queueing the message; - * used to check that a client has not two messages + * used to check that a client has no two messages * pending for the same target. Can be NULL. */ struct TransportClient *client; /** - * Neighbor this entry belongs to. + * Using which specific address should we send this message? + */ + struct ForeignAddressList *specific_address; + + /** + * Peer ID of the Neighbour this entry belongs to. */ - struct NeighborList *neighbor; + struct GNUNET_PeerIdentity neighbour_id; /** * Plugin that we used for the transmission. @@ -217,6 +343,11 @@ struct MessageQueue */ struct TransportPlugin *plugin; + /** + * At what time should we fail? + */ + struct GNUNET_TIME_Absolute timeout; + /** * Internal message of the transport system that should not be * included in the usual SEND-SEND_OK transmission confirmation @@ -235,12 +366,11 @@ struct MessageQueue /** - * For a given Neighbor, which plugins are available + * For a given Neighbour, which plugins are available * to talk to this peer and what are their costs? */ struct ReadyList { - /** * This is a linked list. */ @@ -253,65 +383,24 @@ struct ReadyList struct TransportPlugin *plugin; /** - * Neighbor this entry belongs to. - */ - struct NeighborList *neighbor; - - /** - * What was the last latency observed for this plugin - * and peer? Invalid if connected is GNUNET_NO. - */ - struct GNUNET_TIME_Relative latency; - - /** - * If we did not successfully transmit a message to the given peer - * via this connection during the specified time, we should consider - * the connection to be dead. This is used in the case that a TCP - * transport simply stalls writing to the stream but does not - * formerly get a signal that the other peer died. - */ - struct GNUNET_TIME_Absolute timeout; - - /** - * Is this plugin currently connected? The first time - * we transmit or send data to a peer via a particular - * plugin, we set this to GNUNET_YES. If we later get - * an error (disconnect notification or transmission - * failure), we set it back to GNUNET_NO. Each time the - * value is set to GNUNET_YES, we increment the - * "connect_attempts" counter. If that one reaches a - * particular threshold, we consider the plugin to not - * be working properly at this time for the given peer - * and remove it from the eligible list. - */ - int connected; - - /** - * How often have we tried to connect using this plugin? - */ - unsigned int connect_attempts; - - /** - * Is this plugin ready to transmit to the specific target? - * GNUNET_NO if not. Initially, all plugins are marked ready. If a - * transmission is in progress, "transmit_ready" is set to - * GNUNET_NO. + * Transport addresses, latency, and readiness for + * this particular plugin. */ - int transmit_ready; + struct ForeignAddressList *addresses; }; /** - * Entry in linked list of all of our current neighbors. + * Entry in linked list of all of our current neighbours. */ -struct NeighborList +struct NeighbourList { /** * This is a linked list. */ - struct NeighborList *next; + struct NeighbourList *next; /** * Which of our transports is connected to this peer @@ -320,25 +409,21 @@ struct NeighborList struct ReadyList *plugins; /** - * List of messages we would like to send to this peer; + * Head of list of messages we would like to send to this peer; * must contain at most one message per client. */ - struct MessageQueue *messages; + struct MessageQueue *messages_head; /** - * Identity of this neighbor. - */ - struct GNUNET_PeerIdentity id; - - /* - * Opaque addr of this peer, only known to the plugin + * Tail of list of messages we would like to send to this peer; must + * contain at most one message per client. */ - char *addr; + struct MessageQueue *messages_tail; - /* - * Size of addr + /** + * Identity of this neighbour. */ - size_t addr_len; + struct GNUNET_PeerIdentity id; /** * ID of task scheduled to run when this peer is about to @@ -346,6 +431,12 @@ struct NeighborList */ GNUNET_SCHEDULER_TaskIdentifier timeout_task; + /** + * ID of task scheduled to run when we should retry transmitting + * the head of the message queue. + */ + GNUNET_SCHEDULER_TaskIdentifier retry_task; + /** * How long until we should consider this peer dead * (if we don't receive another message in the @@ -359,10 +450,22 @@ struct NeighborList struct GNUNET_TIME_Absolute last_quota_update; /** - * At what time should we try to again add plugins to - * our ready list? + * The latency we have seen for this particular address for + * this particular peer. This latency may have been calculated + * over multiple transports. This value reflects how long it took + * us to receive a response when SENDING via this particular + * transport/neighbour/address combination! + * + * FIXME: we need to periodically send PINGs to update this + * latency (at least more often than the current "huge" (11h?) + * update interval). + */ + struct GNUNET_TIME_Relative latency; + + /** + * DV distance to this peer (1 if no DV is used). */ - struct GNUNET_TIME_Absolute retry_plugins_time; + uint32_t distance; /** * How many bytes have we received since the "last_quota_update" @@ -371,7 +474,7 @@ struct NeighborList uint64_t last_received; /** - * Global quota for inbound traffic for the neighbor in bytes/ms. + * Global quota for inbound traffic for the neighbour in bytes/ms. */ uint32_t quota_in; @@ -384,19 +487,10 @@ struct NeighborList unsigned int quota_violation_count; /** - * Have we seen an ACK from this neighbor in the past? - * (used to make up a fake ACK for clients connecting after - * the neighbor connected to us). - */ - int saw_ack; - - /* The latency we have seen for this particular address for - * this particular peer. This latency may have been calculated - * over multiple transports. This value reflects how long it took - * us to receive a response when SENDING via this particular - * transport/neighbor/address combination! + * Have we seen an PONG from this neighbour in the past (and + * not had a disconnect since)? */ - struct GNUNET_TIME_RelativeNBO latency; + int received_pong; }; @@ -472,19 +566,29 @@ struct TransportPongMessage */ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded signer; + /** + * Size of address appended to this message + */ + size_t addrlen; + }; + /** - * Linked list of messages to be transmitted to - * the client. Each entry is followed by the - * actual message. + * Linked list of messages to be transmitted to the client. Each + * entry is followed by the actual message. */ struct ClientMessageQueueEntry { /** - * This is a linked list. + * This is a doubly-linked list. */ struct ClientMessageQueueEntry *next; + + /** + * This is a doubly-linked list. + */ + struct ClientMessageQueueEntry *prev; }; @@ -516,6 +620,11 @@ struct TransportClient */ struct ClientMessageQueueEntry *message_queue_tail; + /** + * Current transmit request handle. + */ + struct GNUNET_CONNECTION_TransmitHandle *th; + /** * Is a call to "transmit_send_continuation" pending? If so, we * must not free this struct (even if the corresponding client @@ -533,15 +642,16 @@ struct TransportClient /** - * For each HELLO, we may have to validate multiple addresses; - * each address gets its own request entry. + * Entry in map of all HELLOs awaiting validation. */ -struct ValidationAddress +struct ValidationEntry { + /** - * This is a linked list. + * The address, actually a pointer to the end + * of this struct. Do not free! */ - struct ValidationAddress *next; + const void *addr; /** * Name of the transport. @@ -549,74 +659,78 @@ struct ValidationAddress char *transport_name; /** - * When should this validated address expire? + * The public key of the peer. */ - struct GNUNET_TIME_Absolute expiration; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; /** - * Length of the address we are validating. + * ID of task that will clean up this entry if we don't succeed + * with the validation first. */ - size_t addr_len; + GNUNET_SCHEDULER_TaskIdentifier timeout_task; /** - * Challenge number we used. + * At what time did we send this validation? */ - uint32_t challenge; + struct GNUNET_TIME_Absolute send_time; + + /** + * Length of addr. + */ + size_t addrlen; /** - * Set to GNUNET_YES if the challenge was met, - * GNUNET_SYSERR if we know it failed, GNUNET_NO - * if we are waiting on a response. + * Challenge number we used. */ - int ok; + uint32_t challenge; + }; /** - * Entry in linked list of all HELLOs awaiting validation. + * Context of currently active requests to peerinfo + * for validation of HELLOs. */ -struct ValidationList +struct CheckHelloValidatedContext { /** - * This is a linked list. + * This is a doubly-linked list. */ - struct ValidationList *next; + struct CheckHelloValidatedContext *next; /** - * Linked list with one entry per address from the HELLO - * that needs to be validated. + * This is a doubly-linked list. */ - struct ValidationAddress *addresses; + struct CheckHelloValidatedContext *prev; /** - * The public key of the peer. + * Hello that we are validating. */ - struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; + const struct GNUNET_HELLO_Message *hello; /** - * When does this record time-out? (assuming the - * challenge goes unanswered) + * Context for peerinfo iteration. + * NULL after we are done processing peerinfo's information. */ - struct GNUNET_TIME_Absolute timeout; + struct GNUNET_PEERINFO_IteratorContext *piter; + + /** + * Was a HELLO known for this peer to peerinfo? + */ + int hello_known; }; -/** - * HELLOs awaiting validation. - */ -static struct ValidationList *pending_validations; - /** * Our HELLO message. */ static struct GNUNET_HELLO_Message *our_hello; /** - * "version" of "our_hello". Used to see if a given - * neighbor has already been sent the latest version - * of our HELLO message. + * "version" of "our_hello". Used to see if a given neighbour has + * already been sent the latest version of our HELLO message. */ static unsigned int our_hello_version; @@ -661,43 +775,73 @@ static struct TransportPlugin *plugins; static struct GNUNET_SERVER_Handle *server; /** - * All known neighbors and their HELLOs. + * All known neighbours and their HELLOs. */ -static struct NeighborList *neighbors; +static struct NeighbourList *neighbours; /** - * Number of neighbors we'd like to have. + * Number of neighbours we'd like to have. */ static uint32_t max_connect_per_transport; +/** + * Head of linked list. + */ +static struct CheckHelloValidatedContext *chvc_head; + +/** + * Tail of linked list. + */ +static struct CheckHelloValidatedContext *chvc_tail; + + +/** + * Map of PeerIdentities to 'struct ValidationEntry*'s (addresses + * of the given peer that we are currently validating). + */ +static struct GNUNET_CONTAINER_MultiHashMap *validation_map; + + +/** + * The peer specified by the given neighbour has timed-out or a plugin + * has disconnected. We may either need to do nothing (other plugins + * still up), or trigger a full disconnect and clean up. This + * function updates our state and do the necessary notifications. + * Also notifies our clients that the neighbour is now officially + * gone. + * + * @param n the neighbour list entry for the peer + * @param check should we just check if all plugins + * disconnected or must we ask all plugins to + * disconnect? + */ +static void disconnect_neighbour (struct NeighbourList *n, int check); + +/** + * Check the ready list for the given neighbour and if a plugin is + * ready for transmission (and if we have a message), do so! + * + * @param neighbour target peer for which to transmit + */ +static void try_transmission_to_peer (struct NeighbourList *neighbour); + /** - * Find an entry in the neighbor list for a particular peer. + * Find an entry in the neighbour list for a particular peer. * if sender_address is not specified (NULL) then return the * first matching entry. If sender_address is specified, then * make sure that the address and address_len also matches. * * @return NULL if not found. */ -static struct NeighborList * -find_neighbor (const struct GNUNET_PeerIdentity *key, const char *sender_address, - size_t sender_address_len) +static struct NeighbourList * +find_neighbour (const struct GNUNET_PeerIdentity *key) { - struct NeighborList *head = neighbors; - if (sender_address == NULL) - { - while ((head != NULL) && - (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity)))) - head = head->next; - } - else - { - while ((head != NULL) && - (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))) && - (sender_address_len != head->addr_len) && - (0 != memcmp (sender_address, &head->addr, head->addr_len))) - head = head->next; - } + struct NeighbourList *head = neighbours; + + while ((head != NULL) && + (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity)))) + head = head->next; return head; } @@ -718,19 +862,27 @@ find_transport (const char *short_name) /** - * Update the quota values for the given neighbor now. + * Update the quota values for the given neighbour now. + * + * @param n neighbour to update + * @param force GNUNET_YES to force recalculation now */ static void -update_quota (struct NeighborList *n) +update_quota (struct NeighbourList *n, + int force) { - struct GNUNET_TIME_Relative delta; + struct GNUNET_TIME_Absolute now; + unsigned long long delta; uint64_t allowed; uint64_t remaining; - delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - if (delta.value < MIN_QUOTA_REFRESH_TIME) - return; /* not enough time passed for doing quota update */ - allowed = delta.value * n->quota_in; + now = GNUNET_TIME_absolute_get (); + delta = now.value - n->last_quota_update.value; + allowed = n->quota_in * delta; + if ( (delta < MIN_QUOTA_REFRESH_TIME) && + (!force) && + (allowed < 32 * 1024) ) + return; /* too early, not enough data */ if (n->last_received < allowed) { remaining = allowed - n->last_received; @@ -741,7 +893,7 @@ update_quota (struct NeighborList *n) if (remaining > MAX_BANDWIDTH_CARRY) remaining = MAX_BANDWIDTH_CARRY; n->last_received = 0; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; n->last_quota_update.value -= remaining; if (n->quota_violation_count > 0) n->quota_violation_count--; @@ -749,10 +901,10 @@ update_quota (struct NeighborList *n) else { n->last_received -= allowed; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; if (n->last_received > allowed) { - /* more than twice the allowed rate! */ + /* much more than the allowed rate! */ n->quota_violation_count += 10; } } @@ -760,10 +912,9 @@ update_quota (struct NeighborList *n) /** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. + * Function called to notify a client about the socket being ready to + * queue more data. "buf" will be NULL and "size" zero if the socket + * was closed for writing in the meantime. * * @param cls closure * @param size number of bytes available in buf @@ -778,9 +929,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) uint16_t msize; size_t tsize; const struct GNUNET_MessageHeader *msg; - struct GNUNET_CONNECTION_TransmitHandle *th; char *cbuf; + client->th = NULL; if (buf == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -788,10 +939,11 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) /* fatal error with client, free message queue! */ while (NULL != (q = client->message_queue_head)) { - client->message_queue_head = q->next; + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); GNUNET_free (q); } - client->message_queue_tail = NULL; client->message_count = 0; return 0; } @@ -808,9 +960,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) "Transmitting message of type %u to client.\n", ntohs (msg->type)); #endif - client->message_queue_head = q->next; - if (q->next == NULL) - client->message_queue_tail = NULL; + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); memcpy (&cbuf[tsize], msg, msize); tsize += msize; GNUNET_free (q); @@ -819,12 +971,12 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) if (NULL != q) { GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } return tsize; } @@ -846,7 +998,6 @@ transmit_to_client (struct TransportClient *client, { struct ClientMessageQueueEntry *q; uint16_t msize; - struct GNUNET_CONNECTION_TransmitHandle *th; if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop)) { @@ -857,89 +1008,51 @@ transmit_to_client (struct TransportClient *client, /* TODO: call to statistics... */ return; } - client->message_count++; msize = ntohs (msg->size); GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize); memcpy (&q[1], msg, msize); - /* append to message queue */ - if (client->message_queue_tail == NULL) - { - client->message_queue_tail = q; - } - else + GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head, + client->message_queue_tail, + client->message_queue_tail, + q); + client->message_count++; + if (client->th == NULL) { - client->message_queue_tail->next = q; - client->message_queue_tail = q; - } - if (client->message_queue_head == NULL) - { - client->message_queue_head = q; - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } } /** - * Find alternative plugins for communication. + * Transmit a 'SEND_OK' notification to the given client for the + * given neighbour. * - * @param neighbor for which neighbor should we try to find - * more plugins? + * @param client who to notify + * @param n neighbour to notify about + * @param result status code for the transmission request */ static void -try_alternative_plugins (struct NeighborList *neighbor) +transmit_send_ok (struct TransportClient *client, + struct NeighbourList *n, + int result) { - struct ReadyList *rl; - - if ((neighbor->plugins != NULL) && - (neighbor->retry_plugins_time.value > - GNUNET_TIME_absolute_get ().value)) - return; /* don't try right now */ - neighbor->retry_plugins_time - = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY); - - rl = neighbor->plugins; - while (rl != NULL) - { - if (rl->connect_attempts > 0) - rl->connect_attempts--; /* amnesty */ - rl = rl->next; - } + struct SendOkMessage send_ok_msg; + send_ok_msg.header.size = htons (sizeof (send_ok_msg)); + send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); + send_ok_msg.success = htonl (result); + send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency); + send_ok_msg.peer = n->id; + transmit_to_client (client, &send_ok_msg.header, GNUNET_NO); } -/** - * The peer specified by the given neighbor has timed-out or a plugin - * has disconnected. We may either need to do nothing (other plugins - * still up), or trigger a full disconnect and clean up. This - * function updates our state and do the necessary notifications. - * Also notifies our clients that the neighbor is now officially - * gone. - * - * @param n the neighbor list entry for the peer - * @param check should we just check if all plugins - * disconnected or must we ask all plugins to - * disconnect? - */ -static void disconnect_neighbor (struct NeighborList *n, int check); - - -/** - * Check the ready list for the given neighbor and - * if a plugin is ready for transmission (and if we - * have a message), do so! - * - * @param neighbor target peer for which to check the plugins - */ -static void try_transmission_to_peer (struct NeighborList *neighbor); - - /** * Function called by the GNUNET_TRANSPORT_TransmitFunction * upon "completion" of a send request. This tells the API @@ -960,139 +1073,198 @@ transmit_send_continuation (void *cls, int result) { struct MessageQueue *mq = cls; - struct ReadyList *rl; - struct SendOkMessage send_ok_msg; - struct NeighborList *n; + struct NeighbourList *n; - GNUNET_assert (mq != NULL); - n = mq->neighbor; + n = find_neighbour(&mq->neighbour_id); GNUNET_assert (n != NULL); - GNUNET_assert (0 == - memcmp (&n->id, target, - sizeof (struct GNUNET_PeerIdentity))); - rl = n->plugins; - while ((rl != NULL) && (rl->plugin != mq->plugin)) - rl = rl->next; - GNUNET_assert (rl != NULL); - if (result == GNUNET_OK) - { - rl->timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission to peer `%s' failed, marking connection as down.\n", - GNUNET_i2s (target)); - rl->connected = GNUNET_NO; + if (mq->specific_address != NULL) + { + if (result == GNUNET_OK) + { + mq->specific_address->timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + mq->specific_address->connected = GNUNET_YES; + } + else + { + mq->specific_address->connected = GNUNET_NO; + } + if (! mq->internal_msg) + mq->specific_address->in_transmit = GNUNET_NO; } - if (!mq->internal_msg) - rl->transmit_ready = GNUNET_YES; if (mq->client != NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Notifying client %p about failed transission to peer `%4s'.\n", - mq->client, GNUNET_i2s (target)); - send_ok_msg.header.size = htons (sizeof (send_ok_msg)); - send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - send_ok_msg.success = htonl (result); - send_ok_msg.peer = n->id; - transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO); - } - GNUNET_free (mq->message); + transmit_send_ok (mq->client, n, result); GNUNET_free (mq); - /* one plugin just became ready again, try transmitting - another message (if available) */ - if (result == GNUNET_OK) - try_transmission_to_peer (n); - else - disconnect_neighbor (n, GNUNET_YES); + try_transmission_to_peer (n); + if (result != GNUNET_OK) + disconnect_neighbour (n, GNUNET_YES); } /** - * Check the ready list for the given neighbor and - * if a plugin is ready for transmission (and if we - * have a message), do so! + * Find an address in any of the available transports for + * the given neighbour that would be good for message + * transmission. This is essentially the transport selection + * routine. + * + * @param neighbour for whom to select an address + * @return selected address, NULL if we have none */ -static void -try_transmission_to_peer (struct NeighborList *neighbor) +struct ForeignAddressList * +find_ready_address(struct NeighbourList *neighbour) { - struct ReadyList *pos; - struct GNUNET_TIME_Relative min_latency; - struct ReadyList *rl; - struct MessageQueue *mq; - struct GNUNET_TIME_Absolute now; + struct ReadyList *head = neighbour->plugins; + struct ForeignAddressList *addresses; + struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); + struct ForeignAddressList *best_address; - if (neighbor->messages == NULL) - return; /* nothing to do */ - try_alternative_plugins (neighbor); - min_latency = GNUNET_TIME_UNIT_FOREVER_REL; - rl = NULL; - mq = neighbor->messages; - now = GNUNET_TIME_absolute_get (); - pos = neighbor->plugins; - while (pos != NULL) + best_address = NULL; + while (head != NULL) { - /* set plugins that are inactive for a long time back to disconnected */ - if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES)) + addresses = head->addresses; + while (addresses != NULL) { + if ( (addresses->timeout.value < now.value) && + (addresses->connected == GNUNET_YES) ) + { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Marking long-time inactive connection to `%4s' as down.\n", - GNUNET_i2s (&neighbor->id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Marking long-time inactive connection to `%4s' as down.\n", + GNUNET_i2s (&neighbour->id)); #endif - pos->connected = GNUNET_NO; + addresses->connected = GNUNET_NO; + } + addresses = addresses->next; } - if (((GNUNET_YES == pos->transmit_ready) || - (mq->internal_msg)) && - (pos->connect_attempts < MAX_CONNECT_RETRY) && - ((rl == NULL) || (min_latency.value > pos->latency.value))) + + addresses = head->addresses; + while (addresses != NULL) { - rl = pos; - min_latency = pos->latency; + if ( ( (best_address == NULL) || + (addresses->connected == GNUNET_YES) || + (best_address->connected == GNUNET_NO) ) && + (addresses->in_transmit == GNUNET_NO) && + ( (best_address == NULL) || + (addresses->latency.value < best_address->latency.value)) ) + best_address = addresses; + /* FIXME: also give lower-latency addresses that are not + connected a chance some times... */ + addresses = addresses->next; } - pos = pos->next; + head = head->next; } - if (rl == NULL) - { #if DEBUG_TRANSPORT + if (best_address != NULL) + { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No plugin ready to transmit message\n"); -#endif - return; /* nobody ready */ + "Best address found has latency of %llu ms.\n", + best_address->latency.value); } - if (GNUNET_NO == rl->connected) - { - rl->connect_attempts++; - rl->connected = GNUNET_YES; +#endif + return best_address; + +} + + +/** + * We should re-try transmitting to the given peer, + * hopefully we've learned something in the meantime. + */ +static void +retry_transmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourList *n = cls; + + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); +} + + +/** + * Check the ready list for the given neighbour and if a plugin is + * ready for transmission (and if we have a message), do so! + * + * @param neighbour target peer for which to transmit + */ +static void +try_transmission_to_peer (struct NeighbourList *neighbour) +{ + struct ReadyList *rl; + struct MessageQueue *mq; + struct GNUNET_TIME_Relative timeout; + + if (neighbour->messages_head == NULL) + return; /* nothing to do */ + rl = NULL; + mq = neighbour->messages_head; + /* FIXME: support bi-directional use of TCP */ + if (mq->specific_address == NULL) + mq->specific_address = find_ready_address(neighbour); + if (mq->specific_address == NULL) + { + timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); + if (timeout.value == 0) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No destination address available to transmit message of size %u to peer `%4s'\n", + mq->message_buf_size, + GNUNET_i2s (&mq->neighbour_id)); +#endif + if (mq->client != NULL) + transmit_send_ok (mq->client, neighbour, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (neighbour->messages_head, + neighbour->messages_tail, + mq); + GNUNET_free (mq); + return; /* nobody ready */ + } + if (neighbour->retry_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, + neighbour->retry_task); + neighbour->retry_task = GNUNET_SCHEDULER_add_delayed (sched, + timeout, + &retry_transmission_task, + neighbour); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Establishing fresh connection with `%4s' via plugin `%s'\n", - GNUNET_i2s (&neighbor->id), rl->plugin->short_name); + "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n", + mq->message_buf_size, + GNUNET_i2s (&mq->neighbour_id), + timeout.value); #endif - } - neighbor->messages = mq->next; + return; + } + GNUNET_CONTAINER_DLL_remove (neighbour->messages_head, + neighbour->messages_tail, + mq); + if (mq->specific_address->connected == GNUNET_NO) + mq->specific_address->connect_attempts++; + rl = mq->specific_address->ready_list; mq->plugin = rl->plugin; if (!mq->internal_msg) - rl->transmit_ready = GNUNET_NO; + mq->specific_address->in_transmit = GNUNET_YES; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Giving message of type `%u' for `%4s' to plugin `%s'\n", - ntohs (mq->message->type), - GNUNET_i2s (&neighbor->id), rl->plugin->short_name); + "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n", + mq->message_buf_size, + GNUNET_i2s (&neighbour->id), + GNUNET_a2s (mq->specific_address->addr, + mq->specific_address->addrlen), + rl->plugin->short_name); #endif - rl->plugin->api->send (rl->plugin->api->cls, - &neighbor->id, - mq->message, - mq->priority, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - rl->neighbor->addr, - rl->neighbor->addr_len, - GNUNET_NO, - &transmit_send_continuation, mq); + &mq->neighbour_id, + mq->message_buf, + mq->message_buf_size, + mq->priority, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + mq->specific_address->addr, + mq->specific_address->addrlen, + GNUNET_YES /* FIXME: sometimes, we want to be more tolerant here! */, + &transmit_send_continuation, mq); } @@ -1100,30 +1272,31 @@ try_transmission_to_peer (struct NeighborList *neighbor) * Send the specified message to the specified peer. * * @param client source of the transmission request (can be NULL) + * @param peer_address ForeignAddressList where we should send this message * @param priority how important is the message - * @param msg message to send - * @param is_internal is this an internal message - * @param neighbor handle to the neighbor for transmission + * @param timeout how long do we have to transmit? + * @param message_buf message(s) to send GNUNET_MessageHeader(s) + * @param message_buf_size total size of all messages in message_buf + * @param is_internal is this an internal message; these are pre-pended and + * also do not count for plugins being "ready" to transmit + * @param neighbour handle to the neighbour for transmission */ static void transmit_to_peer (struct TransportClient *client, + struct ForeignAddressList *peer_address, unsigned int priority, - const struct GNUNET_MessageHeader *msg, - int is_internal, struct NeighborList *neighbor) + struct GNUNET_TIME_Relative timeout, + const char *message_buf, + size_t message_buf_size, + int is_internal, struct NeighbourList *neighbour) { struct MessageQueue *mq; - struct MessageQueue *mqe; - struct GNUNET_MessageHeader *m; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - _("Sending message of type %u to peer `%4s'\n"), - ntohs (msg->type), GNUNET_i2s (&neighbor->id)); -#endif +#if EXTRA_CHECKS if (client != NULL) { /* check for duplicate submission */ - mq = neighbor->messages; + mq = neighbour->messages_head; while (NULL != mq) { if (mq->client == client) @@ -1136,31 +1309,27 @@ transmit_to_peer (struct TransportClient *client, mq = mq->next; } } - mq = GNUNET_malloc (sizeof (struct MessageQueue)); +#endif + mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size); + mq->specific_address = peer_address; mq->client = client; - m = GNUNET_malloc (ntohs (msg->size)); - memcpy (m, msg, ntohs (msg->size)); - mq->message = m; - mq->neighbor = neighbor; + memcpy (&mq[1], message_buf, message_buf_size); + mq->message_buf = (const char*) &mq[1]; + mq->message_buf_size = message_buf_size; + memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity)); mq->internal_msg = is_internal; mq->priority = priority; - - /* find tail */ - mqe = neighbor->messages; - if (mqe != NULL) - while (mqe->next != NULL) - mqe = mqe->next; - if (mqe == NULL) - { - /* new head */ - neighbor->messages = mq; - } + mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); + if (is_internal) + GNUNET_CONTAINER_DLL_insert (neighbour->messages_head, + neighbour->messages_tail, + mq); else - { - /* append */ - mqe->next = mq; - } - try_transmission_to_peer (neighbor); + GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head, + neighbour->messages_tail, + neighbour->messages_tail, + mq); + try_transmission_to_peer (neighbour); } @@ -1170,7 +1339,7 @@ transmit_to_peer (struct TransportClient *client, struct GeneratorContext { struct TransportPlugin *plug_pos; - struct AddressList *addr_pos; + struct OwnAddressList *addr_pos; struct GNUNET_TIME_Absolute expiration; }; @@ -1190,7 +1359,10 @@ address_generator (void *cls, size_t max, void *buf) gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL; } if (NULL == gc->plug_pos) - return 0; + { + + return 0; + } ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name, gc->expiration, gc->addr_pos->addr, @@ -1209,17 +1381,17 @@ refresh_hello () { struct GNUNET_HELLO_Message *hello; struct TransportClient *cpos; - struct NeighborList *npos; + struct NeighbourList *npos; struct GeneratorContext gc; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Refreshing my `%s'\n", "HELLO"); -#endif gc.plug_pos = plugins; gc.addr_pos = plugins != NULL ? plugins->addresses : NULL; gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "Refreshed my `%s', new size is %d\n", "HELLO", GNUNET_HELLO_size(hello)); +#endif cpos = clients; while (cpos != NULL) { @@ -1233,17 +1405,19 @@ refresh_hello () our_hello = hello; our_hello_version++; GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello); - npos = neighbors; + npos = neighbours; while (npos != NULL) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Transmitting updated `%s' to neighbor `%4s'\n", + "Transmitting updated `%s' to neighbour `%4s'\n", "HELLO", GNUNET_i2s (&npos->id)); #endif - transmit_to_peer (NULL, 0, - (const struct GNUNET_MessageHeader *) our_hello, - GNUNET_YES, npos); + transmit_to_peer (NULL, NULL, 0, + HELLO_ADDRESS_EXPIRATION, + (const char *) our_hello, + GNUNET_HELLO_size(our_hello), + GNUNET_NO, npos); npos = npos->next; } } @@ -1275,9 +1449,9 @@ update_addresses (struct TransportPlugin *plugin, int fresh) struct GNUNET_TIME_Relative min_remaining; struct GNUNET_TIME_Relative remaining; struct GNUNET_TIME_Absolute now; - struct AddressList *pos; - struct AddressList *prev; - struct AddressList *next; + struct OwnAddressList *pos; + struct OwnAddressList *prev; + struct OwnAddressList *next; int expired; if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK) @@ -1356,7 +1530,7 @@ plugin_env_notify_address (void *cls, struct GNUNET_TIME_Relative expires) { struct TransportPlugin *p = cls; - struct AddressList *al; + struct OwnAddressList *al; struct GNUNET_TIME_Absolute abex; abex = GNUNET_TIME_relative_to_absolute (expires); @@ -1373,12 +1547,8 @@ plugin_env_notify_address (void *cls, } al = al->next; } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Plugin `%s' informs us about a new address `%s'\n", name, - GNUNET_a2s (addr, addrlen)); -#endif - al = GNUNET_malloc (sizeof (struct AddressList) + addrlen); + + al = GNUNET_malloc (sizeof (struct OwnAddressList) + addrlen); al->addr = &al[1]; al->next = p->addresses; p->addresses = al; @@ -1394,19 +1564,20 @@ plugin_env_notify_address (void *cls, */ static void notify_clients_connect (const struct GNUNET_PeerIdentity *peer, - struct GNUNET_TIME_Relative latency) + struct GNUNET_TIME_Relative latency, + uint32_t distance) { struct ConnectInfoMessage cim; struct TransportClient *cpos; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Informing clients about peer `%4s' connecting to us\n", - GNUNET_i2s (peer)); + "Notifying clients about connection from `%s'\n", + GNUNET_i2s (peer)); #endif cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); + cim.distance = htonl (distance); cim.latency = GNUNET_TIME_relative_hton (latency); memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity)); cpos = clients; @@ -1429,8 +1600,8 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Informing clients about peer `%4s' disconnecting\n", - GNUNET_i2s (peer)); + "Notifying clients about lost connection to `%s'\n", + GNUNET_i2s (peer)); #endif dim.header.size = htons (sizeof (struct DisconnectInfoMessage)); dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); @@ -1446,126 +1617,229 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) /** - * Copy any validated addresses to buf. + * Find a ForeignAddressList entry for the given neighbour + * that matches the given address and transport. * - * @return 0 once all addresses have been - * returned - */ -static size_t -list_validated_addresses (void *cls, size_t max, void *buf) + * @param neighbour which peer we care about + * @param tname name of the transport plugin + * @param addr binary address + * @param addrlen length of addr + * @return NULL if no such entry exists + */ +static struct ForeignAddressList * +find_peer_address(struct NeighbourList *neighbour, + const char *tname, + const char *addr, + size_t addrlen) { - struct ValidationAddress **va = cls; - size_t ret; + struct ReadyList *head; + struct ForeignAddressList *address_head; - while ((NULL != *va) && ((*va)->ok != GNUNET_YES)) - *va = (*va)->next; - if (NULL == *va) - return 0; - ret = GNUNET_HELLO_add_address ((*va)->transport_name, - (*va)->expiration, - &(*va)[1], (*va)->addr_len, buf, max); - *va = (*va)->next; + head = neighbour->plugins; + while (head != NULL) + { + if (0 == strcmp (tname, head->plugin->short_name)) + break; + head = head->next; + } + if (head == NULL) + return NULL; + + address_head = head->addresses; + while ( (address_head != NULL) && + ( (address_head->addrlen != addrlen) || + (memcmp(address_head->addr, addr, addrlen) != 0) ) ) + address_head = address_head->next; + return address_head; +} + + +/** + * Get the peer address struct for the given neighbour and + * address. If it doesn't yet exist, create it. + * + * @param neighbour which peer we care about + * @param tname name of the transport plugin + * @param addr binary address + * @param addrlen length of addr + * @return NULL if we do not have a transport plugin for 'tname' + */ +static struct ForeignAddressList * +add_peer_address(struct NeighbourList *neighbour, + const char *tname, + const char *addr, + size_t addrlen) +{ + struct ReadyList *head; + struct ForeignAddressList *ret; + + ret = find_peer_address (neighbour, tname, addr, addrlen); + if (ret != NULL) + return ret; + head = neighbour->plugins; + while (head != NULL) + { + if (0 == strcmp (tname, head->plugin->short_name)) + break; + head = head->next; + } + if (head == NULL) + return NULL; + ret = GNUNET_malloc(sizeof(struct ForeignAddressList) + addrlen); + ret->addr = (const char*) &ret[1]; + memcpy (&ret[1], addr, addrlen); + ret->addrlen = addrlen; + ret->expires = GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + ret->latency = GNUNET_TIME_relative_get_forever(); + ret->distance = -1; + ret->timeout = GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + ret->ready_list = head; + ret->next = head->addresses; + head->addresses = ret; return ret; } /** - * HELLO validation cleanup task. + * Closure for 'add_validated_address'. */ -static void -cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +struct AddValidatedAddressContext { - struct ValidationAddress *va; - struct ValidationList *pos; - struct ValidationList *prev; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Absolute first; - struct GNUNET_HELLO_Message *hello; - struct GNUNET_PeerIdentity pid; - struct NeighborList *n; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "HELLO validation cleanup background task running...\n"); -#endif - now = GNUNET_TIME_absolute_get (); - prev = NULL; - pos = pending_validations; - while (pos != NULL) - { - if (pos->timeout.value < now.value) - { - if (prev == NULL) - pending_validations = pos->next; - else - prev->next = pos->next; - va = pos->addresses; - hello = GNUNET_HELLO_create (&pos->publicKey, - &list_validated_addresses, &va); - GNUNET_CRYPTO_hash (&pos->publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &pid.hashPubKey); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n", - "HELLO", GNUNET_i2s (&pid)); -#endif - GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello); - n = find_neighbor (&pid, NULL, 0); - if (NULL != n) - try_transmission_to_peer (n); - GNUNET_free (hello); - while (NULL != (va = pos->addresses)) - { - pos->addresses = va->next; - GNUNET_free (va->transport_name); - GNUNET_free (va); - } - GNUNET_free (pos); - if (prev == NULL) - pos = pending_validations; - else - pos = prev->next; - continue; - } - prev = pos; - pos = pos->next; - } + /** + * Entry that has been validated. + */ + const struct ValidationEntry *ve; - /* finally, reschedule cleanup if needed; list is - ordered by timeout, so we need the last element... */ - if (NULL != pending_validations) - { - first = pending_validations->timeout; - pos = pending_validations; - while (pos != NULL) - { - first = GNUNET_TIME_absolute_min (first, pos->timeout); - pos = pos->next; - } - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_absolute_get_remaining - (first), &cleanup_validation, NULL); - } -} + /** + * Flag set after we have added the address so + * that we terminate the iteration next time. + */ + int done; +}; -static struct GNUNET_MessageHeader * -createPingMessage (struct GNUNET_PeerIdentity * target, struct ValidationAddress *va) +/** + * Callback function used to fill a buffer of max bytes with a list of + * addresses in the format used by HELLOs. Should use + * "GNUNET_HELLO_add_address" as a helper function. + * + * @param cls the 'struct AddValidatedAddressContext' with the validated address + * @param max maximum number of bytes that can be written to buf + * @param buf where to write the address information + * @return number of bytes written, 0 to signal the + * end of the iteration. + */ +static size_t +add_validated_address (void *cls, + size_t max, void *buf) { + struct AddValidatedAddressContext *avac = cls; + const struct ValidationEntry *ve = avac->ve; - struct TransportPingMessage *ping; - ping = GNUNET_malloc(sizeof(struct TransportPingMessage)); + if (GNUNET_YES == avac->done) + return 0; + avac->done = GNUNET_YES; + return GNUNET_HELLO_add_address (ve->transport_name, + GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION), + ve->addr, + ve->addrlen, + buf, + max); +} - ping->challenge = htonl(va->challenge); - ping->header.size = sizeof(struct TransportPingMessage); - ping->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING); - memcpy(&ping->target, target, sizeof(struct GNUNET_PeerIdentity)); - return &ping->header; +/** + * Iterator over hash map entries. Checks if the given + * validation entry is for the same challenge as what + * is given in the PONG. + * + * @param cls the 'struct TransportPongMessage*' + * @param key peer identity + * @param value value in the hash map ('struct ValidationEntry') + * @return GNUNET_YES if we should continue to + * iterate (mismatch), GNUNET_NO if not (entry matched) + */ +static int +check_pending_validation (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + const struct TransportPongMessage *pong = cls; + struct ValidationEntry *ve = value; + struct AddValidatedAddressContext avac; + unsigned int challenge = ntohl(pong->challenge); + struct GNUNET_HELLO_Message *hello; + struct GNUNET_PeerIdentity target; + struct NeighbourList *n; + struct ForeignAddressList *fal; + + if (ve->challenge != challenge) + return GNUNET_YES; + +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Confirmed validity of address, peer `%4s' has address `%s' (%s).\n", + GNUNET_h2s (key), + GNUNET_a2s ((const struct sockaddr *) ve->addr, + ve->addrlen), + ve->transport_name); +#endif + /* create the updated HELLO */ + GNUNET_CRYPTO_hash (&ve->publicKey, + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &target.hashPubKey); + avac.done = GNUNET_NO; + avac.ve = ve; + hello = GNUNET_HELLO_create (&ve->publicKey, + &add_validated_address, + &avac); + GNUNET_PEERINFO_add_peer (cfg, sched, + &target, + hello); + GNUNET_free (hello); + n = find_neighbour (&target); + if (n != NULL) + { + fal = add_peer_address (n, ve->transport_name, + ve->addr, + ve->addrlen); + fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); + fal->validated = GNUNET_YES; + fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time); + if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value) + n->latency = fal->latency; + else + n->latency.value = (fal->latency.value + n->latency.value) / 2; + n->distance = fal->distance; + if (GNUNET_NO == n->received_pong) + { + notify_clients_connect (&target, n->latency, n->distance); + n->received_pong = GNUNET_YES; + } + if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, + n->retry_task); + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); + } + } + + /* clean up validation entry */ + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (validation_map, + key, + ve)); + GNUNET_SCHEDULER_cancel (sched, + ve->timeout_task); + GNUNET_free (ve->transport_name); + GNUNET_free (ve); + return GNUNET_NO; } + /** * Function that will be called if we receive a validation * of an address challenge that we transmitted to another @@ -1575,11 +1849,11 @@ createPingMessage (struct GNUNET_PeerIdentity * target, struct ValidationAddress * (otherwise we may be seeing a MiM attack). * * @param cls closure - * @param name name of the transport that generated the address + * @param message the pong message * @param peer who responded to our challenge - * @param challenge the challenge number we presumably used - * @param sender_addr string describing our sender address (as observed - * by the other peer in human-readable format) + * @param sender_address string describing our sender address (as observed + * by the other peer in binary format) + * @param sender_address_len number of bytes in 'sender_address' */ static void handle_pong (void *cls, const struct GNUNET_MessageHeader *message, @@ -1587,122 +1861,207 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message, const char *sender_address, size_t sender_address_len) { - unsigned int not_done; - int matched; - struct ValidationList *pos; - struct ValidationAddress *va; - struct GNUNET_PeerIdentity id; - struct TransportPongMessage *pong = (struct TransportPongMessage *)message; - - unsigned int challenge = ntohl(pong->challenge); - pos = pending_validations; - while (pos != NULL) - { - GNUNET_CRYPTO_hash (&pos->publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &id.hashPubKey); - if (0 == memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity))) - break; - pos = pos->next; - } - if (pos == NULL) - { - /* TODO: call statistics (unmatched PONG) */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"), - GNUNET_i2s (peer)); - return; - } - not_done = 0; - matched = GNUNET_NO; - va = pos->addresses; - while (va != NULL) - { - if (va->challenge == challenge) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Confirmed validity of address, peer `%4s' has address `%s'.\n", - GNUNET_i2s (peer), - GNUNET_a2s ((const struct sockaddr *) sender_address, - sender_address_len)); +#if DEBUG_TRANSPORT > 1 + /* we get tons of these that just get discarded, only log + if we are quite verbose */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Receiving `%s' message from `%4s'.\n", "PONG", + GNUNET_i2s (peer)); #endif - GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK, - _ - ("Another peer saw us using the address `%s' via `FIXME'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"), - GNUNET_a2s ((const struct sockaddr *) &va[1], - va->addr_len)); - va->ok = GNUNET_YES; - va->expiration = - GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); - matched = GNUNET_YES; - } - if (va->ok != GNUNET_YES) - not_done++; - va = va->next; - } - if (GNUNET_NO == matched) - { - /* TODO: call statistics (unmatched PONG) */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"), - "PONG", "PING"); - } - if (0 == not_done) - { -#if DEBUG_TRANSPORT + if (GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap_get_multiple (validation_map, + &peer->hashPubKey, + &check_pending_validation, + (void*) message)) + { + /* This is *expected* to happen a lot since we send + PONGs to *all* known addresses of the sender of + the PING, so most likely we get multiple PONGs + per PING, and all but the first PONG will end up + here. So really we should not print anything here + unless we want to be very, very verbose... */ +#if DEBUG_TRANSPORT > 2 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "All addresses validated, will now construct `%s' for `%4s'.\n", - "HELLO", GNUNET_i2s (peer)); + "Received `%s' message from `%4s' but have no record of a matching `%s' message. Ignoring.\n", + "PONG", + GNUNET_i2s (peer), + "PING"); #endif - pos->timeout.value = 0; - GNUNET_SCHEDULER_add_with_priority (sched, - GNUNET_SCHEDULER_PRIORITY_IDLE, - &cleanup_validation, NULL); + return; } - else - { + +#if 0 + /* FIXME: add given address to potential pool of our addresses + (for voting) */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK, + _("Another peer saw us using the address `%s' via `%s'.\n"), + GNUNET_a2s ((const struct sockaddr *) &pong[1], + ntohs(pong->addrlen)), + va->transport_name); +#endif +} + + +static void +neighbour_timeout_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourList *n = cls; + #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Still waiting for %u additional `%s' messages before constructing `%s' for `%4s'.\n", - not_done, "PONG", "HELLO", GNUNET_i2s (peer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id)); #endif + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + disconnect_neighbour (n, GNUNET_NO); +} + + +/** + * Create a fresh entry in our neighbour list for the given peer. + * Will try to transmit our current HELLO to the new neighbour. Also + * notifies our clients about the new "connection". + * + * @param peer the peer for which we create the entry + * @return the new neighbour list entry + */ +static struct NeighbourList * +setup_new_neighbour (const struct GNUNET_PeerIdentity *peer) +{ + struct NeighbourList *n; + struct TransportPlugin *tp; + struct ReadyList *rl; + + GNUNET_assert (our_hello != NULL); + n = GNUNET_malloc (sizeof (struct NeighbourList)); + n->next = neighbours; + neighbours = n; + n->id = *peer; + n->last_quota_update = GNUNET_TIME_absolute_get (); + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); + tp = plugins; + while (tp != NULL) + { + if (tp->api->send != NULL) + { + rl = GNUNET_malloc (sizeof (struct ReadyList)); + rl->next = n->plugins; + n->plugins = rl; + rl->plugin = tp; + rl->addresses = NULL; + } + tp = tp->next; } + n->latency = GNUNET_TIME_UNIT_FOREVER_REL; + n->distance = -1; + n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + transmit_to_peer (NULL, NULL, 0, + HELLO_ADDRESS_EXPIRATION, + (const char *) our_hello, GNUNET_HELLO_size(our_hello), + GNUNET_NO, n); + return n; } -struct CheckHelloValidatedContext +/** + * Closure for 'check_address_exists'. + */ +struct CheckAddressExistsClosure { /** - * Plugin for which we are validating. + * Address to check for. */ - struct TransportPlugin *plugin; + const void *addr; /** - * Hello that we are validating. + * Name of the transport. */ - struct GNUNET_HELLO_Message *hello; + const char *tname; /** - * Validation list being build. + * Length of addr. */ - struct ValidationList *e; + size_t addrlen; /** - * Context for peerinfo iteration. - * NULL after we are done processing peerinfo's information. + * Set to GNUNET_YES if the address exists. */ - struct GNUNET_PEERINFO_IteratorContext *piter; - + int exists; }; /** - * Append the given address to the list of entries - * that need to be validated. + * Iterator over hash map entries. Checks if the given + * validation entry is for the same address as what is given + * in the closure. + * + * @param cls the 'struct CheckAddressExistsClosure*' + * @param key current key code (ignored) + * @param value value in the hash map ('struct ValidationEntry') + * @return GNUNET_YES if we should continue to + * iterate (mismatch), GNUNET_NO if not (entry matched) + */ +static int +check_address_exists (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct CheckAddressExistsClosure *caec = cls; + struct ValidationEntry *ve = value; + if ( (0 == strcmp (caec->tname, + ve->transport_name)) && + (caec->addrlen == ve->addrlen) && + (0 == memcmp (caec->addr, + ve->addr, + caec->addrlen)) ) + { + caec->exists = GNUNET_YES; + return GNUNET_NO; + } + return GNUNET_YES; +} + + +/** + * HELLO validation cleanup task (validation failed). + * + * @param cls the 'struct ValidationEntry' that failed + * @param tc scheduler context (unused) + */ +static void +timeout_hello_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct ValidationEntry *va = cls; + struct GNUNET_PeerIdentity pid; + + GNUNET_CRYPTO_hash (&va->publicKey, + sizeof (struct + GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &pid.hashPubKey); + GNUNET_CONTAINER_multihashmap_remove (validation_map, + &pid.hashPubKey, + va); + GNUNET_free (va->transport_name); + GNUNET_free (va); +} + + +/** + * Check if the given address is already being validated; if not, + * append the given address to the list of entries that are being be + * validated and initiate validation. + * + * @param cls closure ('struct CheckHelloValidatedContext *') + * @param tname name of the transport + * @param expiration expiration time + * @param addr the address + * @param addrlen length of the address + * @return GNUNET_OK (always) */ static int run_validation (void *cls, @@ -1710,12 +2069,19 @@ run_validation (void *cls, struct GNUNET_TIME_Absolute expiration, const void *addr, size_t addrlen) { - struct ValidationList *e = cls; - struct TransportPlugin *tp; - struct ValidationAddress *va; + struct CheckHelloValidatedContext *chvc = cls; struct GNUNET_PeerIdentity id; - struct GNUNET_MessageHeader *pingMessage; - int sent; + struct TransportPlugin *tp; + struct ValidationEntry *va; + struct NeighbourList *neighbour; + struct ForeignAddressList *peer_address; + struct TransportPingMessage ping; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk; + struct CheckAddressExistsClosure caec; + char * message_buf; + uint16_t hello_size; + size_t tsize; + tp = find_transport (tname); if (tp == NULL) { @@ -1726,146 +2092,201 @@ run_validation (void *cls, tname); return GNUNET_OK; } - GNUNET_CRYPTO_hash (&e->publicKey, + GNUNET_HELLO_get_key (chvc->hello, &pk); + GNUNET_CRYPTO_hash (&pk, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), &id.hashPubKey); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling validation of address `%s' via `%s' for `%4s'\n", - GNUNET_a2s (addr, addrlen), tname, GNUNET_i2s (&id)); - - va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen); - va->next = e->addresses; - e->addresses = va; + caec.addr = addr; + caec.addrlen = addrlen; + caec.tname = tname; + caec.exists = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_iterate (validation_map, + &check_address_exists, + &caec); + if (caec.exists == GNUNET_YES) + { + /* During validation attempts we will likely trigger the other + peer trying to validate our address which in turn will cause + it to send us its HELLO, so we expect to hit this case rather + frequently. Only print something if we are very verbose. */ +#if DEBUG_TRANSPORT > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Validation of address `%s' via `%s' for peer `%4s' already in progress.\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&id)); +#endif + return GNUNET_OK; + } + va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen); va->transport_name = GNUNET_strdup (tname); - va->addr_len = addrlen; va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, (unsigned int) -1); + va->send_time = GNUNET_TIME_absolute_get(); + va->addr = (const void*) &va[1]; memcpy (&va[1], addr, addrlen); - - pingMessage = createPingMessage(&id, va); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending ping message to address `%s' via `%s' for `%4s'\n", - GNUNET_a2s (addr, addrlen), tname, GNUNET_i2s (&id)); - - - sent = tp->api->send(tp->api->cls, &id, pingMessage, GNUNET_SCHEDULER_PRIORITY_DEFAULT, - TRANSPORT_DEFAULT_TIMEOUT, addr, addrlen, GNUNET_YES, NULL, NULL); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport returned %d from send!\n", sent); - - GNUNET_free(pingMessage); - + va->addrlen = addrlen; + GNUNET_HELLO_get_key (chvc->hello, + &va->publicKey); + va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, + HELLO_VERIFICATION_TIMEOUT, + &timeout_hello_validation, + va); + GNUNET_CONTAINER_multihashmap_put (validation_map, + &id.hashPubKey, + va, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + neighbour = find_neighbour(&id); + if (neighbour == NULL) + neighbour = setup_new_neighbour(&id); + peer_address = add_peer_address(neighbour, tname, addr, addrlen); + GNUNET_assert(peer_address != NULL); + hello_size = GNUNET_HELLO_size(our_hello); + tsize = sizeof(struct TransportPingMessage) + hello_size; + message_buf = GNUNET_malloc(tsize); + ping.challenge = htonl(va->challenge); + ping.header.size = htons(sizeof(struct TransportPingMessage)); + ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING); + memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity)); + memcpy(message_buf, our_hello, hello_size); + memcpy(&message_buf[hello_size], + &ping, + sizeof(struct TransportPingMessage)); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Performing validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&id), + "HELLO", hello_size, + "PING", sizeof (struct TransportPingMessage)); +#endif + transmit_to_peer (NULL, peer_address, + GNUNET_SCHEDULER_PRIORITY_DEFAULT, + HELLO_VERIFICATION_TIMEOUT, + message_buf, tsize, + GNUNET_YES, neighbour); + GNUNET_free(message_buf); return GNUNET_OK; } -/* - * @param cls handle to the plugin (for sending) - * @param target the peer identity of the peer we are sending to - * @param challenge the challenge number - * @param timeout how long to await validation? - * @param addr the address to validate - * @param addrlen the length of the address - * - * Perform address validation, which means sending a PING PONG to - * the address via the transport plugin. If not validated, then - * do not count this as a good peer/address... + +/** + * Add the given address to the list of foreign addresses + * available for the given peer (check for duplicates). * + * @param cls the respective 'struct NeighbourList' to update + * @param tname name of the transport + * @param expiration expiration time + * @param addr the address + * @param addrlen length of the address + * @return GNUNET_OK (always) */ -static void -validate_address (void *cls, struct ValidationAddress *va, - const struct GNUNET_PeerIdentity *target, - struct GNUNET_TIME_Relative timeout, - const void *addr, size_t addrlen) +static int +add_to_foreign_address_list (void *cls, + const char *tname, + struct GNUNET_TIME_Absolute expiration, + const void *addr, size_t addrlen) { - /* struct Plugin *plugin = cls; - int challenge = va->challenge; */ + struct NeighbourList *n = cls; + struct ForeignAddressList *fal; - - return; + fal = find_peer_address (n, tname, addr, addrlen); + if (fal == NULL) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&n->id), + expiration.value); +#endif + fal = add_peer_address (n, tname, addr, addrlen); + } + if (fal == NULL) + return GNUNET_OK; + fal->expires = GNUNET_TIME_absolute_max (expiration, + fal->expires); + fal->validated = GNUNET_YES; + return GNUNET_OK; } /** * Check if addresses in validated hello "h" overlap with - * those in "chvc->hello" and update "chvc->hello" accordingly, - * removing those addresses that have already been validated. + * those in "chvc->hello" and validate the rest. + * + * @param cls closure + * @param peer id of the peer, NULL for last call + * @param h hello message for the peer (can be NULL) + * @param trust amount of trust we have in the peer (not used) */ static void check_hello_validated (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HELLO_Message *h, uint32_t trust) + const struct GNUNET_HELLO_Message *h, + uint32_t trust) { struct CheckHelloValidatedContext *chvc = cls; - struct ValidationAddress *va; - struct TransportPlugin *tp; - int first_call; - struct GNUNET_PeerIdentity apeer; + struct GNUNET_HELLO_Message *plain_hello; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk; + struct GNUNET_PeerIdentity target; + struct NeighbourList *n; - first_call = GNUNET_NO; - if (chvc->e == NULL) + if (peer == NULL) { chvc->piter = NULL; - first_call = GNUNET_YES; - chvc->e = GNUNET_malloc (sizeof (struct ValidationList)); - GNUNET_assert (GNUNET_OK == - GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello, - &chvc->e->publicKey)); - chvc->e->timeout = - GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT); - chvc->e->next = pending_validations; - pending_validations = chvc->e; - } - if (h != NULL) - { - GNUNET_HELLO_iterate_new_addresses (chvc->hello, - h, - GNUNET_TIME_absolute_get (), - &run_validation, chvc->e); - } - else if (GNUNET_YES == first_call) - { - /* no existing HELLO, all addresses are new */ - GNUNET_HELLO_iterate_addresses (chvc->hello, - GNUNET_NO, &run_validation, chvc->e); - } - if (h != NULL) - return; /* wait for next call */ - /* finally, transmit validation attempts */ - GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_id (chvc->hello, &apeer)); + GNUNET_CONTAINER_DLL_remove (chvc_head, + chvc_tail, + chvc); + if (GNUNET_NO == chvc->hello_known) + { + /* notify PEERINFO about the peer now, so that we at least + have the public key if some other component needs it */ + GNUNET_HELLO_get_key (chvc->hello, &pk); + GNUNET_CRYPTO_hash (&pk, + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &target.hashPubKey); + plain_hello = GNUNET_HELLO_create (&pk, + NULL, + NULL); + GNUNET_PEERINFO_add_peer (cfg, sched, &target, plain_hello); + GNUNET_free (plain_hello); #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ready to validate addresses from `%s' message for peer `%4s'\n", - "HELLO", GNUNET_i2s (&apeer)); -#endif - va = chvc->e->addresses; - while (va != NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Establishing `%s' connection to validate `%s' address `%s' of `%4s'\n", - va->transport_name, - "HELLO", - GNUNET_a2s ((const struct sockaddr *) &va[1], - va->addr_len), GNUNET_i2s (&apeer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peerinfo had no `%s' message for peer `%4s', full validation needed.\n", + "HELLO", + GNUNET_i2s (&target)); #endif - tp = find_transport (va->transport_name); - GNUNET_assert (tp != NULL); - /* This validation should happen inside the transport, not from the plugin! */ - validate_address (tp->api->cls, va, &apeer, - HELLO_VERIFICATION_TIMEOUT, - &va[1], va->addr_len); - /* va->ok = GNUNET_SYSERR; will be set by validate_address! */ - va = va->next; + GNUNET_HELLO_iterate_addresses (chvc->hello, + GNUNET_NO, + &run_validation, + chvc); + } + GNUNET_free (chvc); + return; } - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_absolute_get_remaining (chvc-> - e->timeout), - &cleanup_validation, NULL); - GNUNET_free (chvc); + if (h == NULL) + return; + chvc->hello_known = GNUNET_YES; + n = find_neighbour (peer); + if (n != NULL) + { + GNUNET_HELLO_iterate_addresses (h, + GNUNET_NO, + &add_to_foreign_address_list, + n); + try_transmission_to_peer (n); + } + GNUNET_HELLO_iterate_new_addresses (chvc->hello, + h, + GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME), + &run_validation, + chvc); } - /** * Process HELLO-message. * @@ -1877,7 +2298,6 @@ static int process_hello (struct TransportPlugin *plugin, const struct GNUNET_MessageHeader *message) { - struct ValidationList *e; uint16_t hsize; struct GNUNET_PeerIdentity target; const struct GNUNET_HELLO_Message *hello; @@ -1892,7 +2312,8 @@ process_hello (struct TransportPlugin *plugin, return GNUNET_SYSERR; } /* first, check if load is too high */ - if (GNUNET_OS_load_cpu_get (cfg) > 100) + if (GNUNET_SCHEDULER_get_load (sched, + GNUNET_SCHEDULER_PRIORITY_BACKGROUND) > MAX_HELLO_LOAD) { /* TODO: call to stats? */ return GNUNET_OK; @@ -1908,32 +2329,18 @@ process_hello (struct TransportPlugin *plugin, &target.hashPubKey); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Processing `%s' message for `%4s'\n", - "HELLO", GNUNET_i2s (&target)); -#endif - /* check if a HELLO for this peer is already on the validation list */ - e = pending_validations; - while (e != NULL) - { - if (0 == memcmp (&e->publicKey, - &publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded))) - { - /* TODO: call to stats? */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s' message for peer `%4s' is already pending; ignoring new message\n", - "HELLO", GNUNET_i2s (&target)); + "Processing `%s' message for `%4s' of size %u\n", + "HELLO", + GNUNET_i2s (&target), + GNUNET_HELLO_size(hello)); #endif - return GNUNET_OK; - } - e = e->next; - } + chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize); - chvc->plugin = plugin; - chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1]; - memcpy (chvc->hello, hello, hsize); + chvc->hello = (const struct GNUNET_HELLO_Message *) &chvc[1]; + memcpy (&chvc[1], hello, hsize); + GNUNET_CONTAINER_DLL_insert (chvc_head, + chvc_tail, + chvc); /* finally, check if HELLO was previously validated (continuation will then schedule actual validation) */ chvc->piter = GNUNET_PEERINFO_iterate (cfg, @@ -1947,44 +2354,51 @@ process_hello (struct TransportPlugin *plugin, /** - * The peer specified by the given neighbor has timed-out or a plugin + * The peer specified by the given neighbour has timed-out or a plugin * has disconnected. We may either need to do nothing (other plugins * still up), or trigger a full disconnect and clean up. This - * function updates our state and do the necessary notifications. - * Also notifies our clients that the neighbor is now officially + * function updates our state and does the necessary notifications. + * Also notifies our clients that the neighbour is now officially * gone. * - * @param n the neighbor list entry for the peer + * @param n the neighbour list entry for the peer * @param check should we just check if all plugins * disconnected or must we ask all plugins to * disconnect? */ static void -disconnect_neighbor (struct NeighborList *n, int check) +disconnect_neighbour (struct NeighbourList *n, int check) { struct ReadyList *rpos; - struct NeighborList *npos; - struct NeighborList *nprev; + struct NeighbourList *npos; + struct NeighbourList *nprev; struct MessageQueue *mq; + struct ForeignAddressList *peer_addresses; + struct ForeignAddressList *peer_pos; if (GNUNET_YES == check) { rpos = n->plugins; while (NULL != rpos) { - if (GNUNET_YES == rpos->connected) - return; /* still connected */ + peer_addresses = rpos->addresses; + while (peer_addresses != NULL) + { + if (GNUNET_YES == peer_addresses->connected) + return; /* still connected */ + peer_addresses = peer_addresses->next; + } rpos = rpos->next; } } - #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id)); + "Disconnecting from `%4s'\n", + GNUNET_i2s (&n->id)); #endif - /* remove n from neighbors list */ + /* remove n from neighbours list */ nprev = NULL; - npos = neighbors; + npos = neighbours; while ((npos != NULL) && (npos != n)) { nprev = npos; @@ -1992,146 +2406,76 @@ disconnect_neighbor (struct NeighborList *n, int check) } GNUNET_assert (npos != NULL); if (nprev == NULL) - neighbors = n->next; + neighbours = n->next; else nprev->next = n->next; /* notify all clients about disconnect */ - notify_clients_disconnect (&n->id); + if (GNUNET_YES == n->received_pong) + notify_clients_disconnect (&n->id); /* clean up all plugins, cancel connections and pending transmissions */ while (NULL != (rpos = n->plugins)) { n->plugins = rpos->next; - GNUNET_assert (rpos->neighbor == n); - if (GNUNET_YES == rpos->connected) - rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id); + rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id); + + while (rpos->addresses != NULL) + { + peer_pos = rpos->addresses; + rpos->addresses = peer_pos->next; + GNUNET_free(peer_pos); + } GNUNET_free (rpos); } /* free all messages on the queue */ - while (NULL != (mq = n->messages)) - { - n->messages = mq->next; - GNUNET_assert (mq->neighbor == n); + while (NULL != (mq = n->messages_head)) + { + GNUNET_CONTAINER_DLL_remove (n->messages_head, + n->messages_tail, + mq); + GNUNET_assert (0 == memcmp(&mq->neighbour_id, + &n->id, + sizeof(struct GNUNET_PeerIdentity))); GNUNET_free (mq); } if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (sched, n->timeout_task); - /* finally, free n itself */ - GNUNET_free (n); -} - - -/** - * Add an entry for each of our transport plugins - * (that are able to send) to the list of plugins - * for this neighbor. - * - * @param neighbor to initialize - */ -static void -add_plugins (struct NeighborList *neighbor) -{ - struct TransportPlugin *tp; - struct ReadyList *rl; - - neighbor->retry_plugins_time - = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY); - tp = plugins; - while (tp != NULL) { - if (tp->api->send != NULL) - { - rl = GNUNET_malloc (sizeof (struct ReadyList)); - rl->next = neighbor->plugins; - neighbor->plugins = rl; - rl->plugin = tp; - rl->neighbor = neighbor; - rl->transmit_ready = GNUNET_YES; - } - tp = tp->next; + GNUNET_SCHEDULER_cancel (sched, n->timeout_task); + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; } -} - - -static void -neighbor_timeout_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NeighborList *n = cls; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Neighbor `%4s' has timed out!\n", GNUNET_i2s (&n->id)); -#endif - n->timeout_task = GNUNET_SCHEDULER_NO_TASK; - disconnect_neighbor (n, GNUNET_NO); + if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, n->retry_task); + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + } + /* finally, free n itself */ + GNUNET_free (n); } /** - * Create a fresh entry in our neighbor list for the given peer. - * Will try to transmit our current HELLO to the new neighbor. Also - * notifies our clients about the new "connection". - * - * @param peer the peer for which we create the entry - * @return the new neighbor list entry - */ -static struct NeighborList * -setup_new_neighbor (const struct GNUNET_PeerIdentity *peer, const char *addr, size_t sender_address_len) -{ - struct NeighborList *n; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Setting up new neighbor `%4s', sending our HELLO to introduce ourselves\n", - GNUNET_i2s (peer)); -#endif - GNUNET_assert (our_hello != NULL); - n = GNUNET_malloc (sizeof (struct NeighborList)); - n->next = neighbors; - neighbors = n; - n->id = *peer; - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->peer_timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); - add_plugins (n); - n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbor_timeout_task, n); - transmit_to_peer (NULL, 0, - (const struct GNUNET_MessageHeader *) our_hello, - GNUNET_YES, n); - notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL); - return n; -} - -/* * We have received a PING message from someone. Need to send a PONG message - * in response to the peer by any means necessary. Of course, with something - * like TCP where a connection exists, we may want to send it that way. But - * we may not be able to make that distinction... - */ -static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - const char *sender_address, - size_t sender_address_len) + * in response to the peer by any means necessary. + * + * FIXME: With something like TCP where a connection exists, we may + * want to send it that way. But the current API does not seem to + * allow us to do so (can't tell this to the transport!) + */ +static int +handle_ping(void *cls, const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *sender_address, + size_t sender_address_len) { struct TransportPlugin *plugin = cls; struct TransportPingMessage *ping; struct TransportPongMessage *pong; uint16_t msize; - - pong = GNUNET_malloc(sizeof(struct TransportPongMessage)); - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Processing `%s' from `%s'\n", - "PING", GNUNET_a2s ((const struct sockaddr *)sender_address, sender_address_len)); -#endif + struct NeighbourList *n; + struct ReadyList *rl; + struct ForeignAddressList *fal; msize = ntohs (message->size); if (msize < sizeof (struct TransportPingMessage)) @@ -2145,26 +2489,18 @@ static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message, sizeof (struct GNUNET_PeerIdentity))) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Received `%s' message not destined for me!\n"), "PING"); + _("Received `%s' message not destined for me!\n"), + "PING"); return GNUNET_SYSERR; } - +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "Processing `%s' from `%s'\n", + "PING", + GNUNET_a2s ((const struct sockaddr *)sender_address, + sender_address_len)); +#endif msize -= sizeof (struct TransportPingMessage); -/* - * if (GNUNET_OK != tcp_plugin_address_suggested (plugin, &vcm[1], msize)) - { - GNUNET_break_op (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - - if (GNUNET_OK != GNUNET_SERVER_client_get_address (client, &addr, &addrlen)) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } -*/ pong = GNUNET_malloc (sizeof (struct TransportPongMessage) + sender_address_len); pong->header.size = htons (sizeof (struct TransportPongMessage) + sender_address_len); pong->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_PONG); @@ -2174,21 +2510,82 @@ static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + sender_address_len); pong->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_TCP_PING); pong->challenge = ping->challenge; - - memcpy(&pong->signer, &my_public_key, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)); + pong->addrlen = htons(sender_address_len); + memcpy(&pong->signer, + &my_public_key, + sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)); memcpy (&pong[1], sender_address, sender_address_len); GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_rsa_sign (my_private_key, &pong->purpose, &pong->signature)); - transmit_to_peer(NULL, TRANSPORT_DEFAULT_PRIORITY, &pong->header, GNUNET_NO, find_neighbor(peer, NULL, 0)); - /* plugin->api->send(); */ /* We can't directly send back along received address, because - the port we saw for the peer (for TCP especially) will not - likely be the open port on the other side! */ + n = find_neighbour(peer); + if (n == NULL) + n = setup_new_neighbour(peer); + /* broadcast 'PONG' to all available addresses */ + rl = n->plugins; + while (rl != NULL) + { + fal = rl->addresses; + while (fal != NULL) + { + transmit_to_peer(NULL, fal, + TRANSPORT_PONG_PRIORITY, + HELLO_VERIFICATION_TIMEOUT, + (const char *)pong, + ntohs(pong->header.size), + GNUNET_YES, + n); + fal = fal->next; + } + rl = rl->next; + } GNUNET_free(pong); return GNUNET_OK; } + +/** + * Calculate how long we should delay reading from the TCP socket to + * ensure that we stay within our bandwidth limits (push back). + * + * @param n for which neighbour should this be calculated + * @return how long to delay receiving more data + */ +static struct GNUNET_TIME_Relative +calculate_throttle_delay (struct NeighbourList *n) +{ + struct GNUNET_TIME_Relative ret; + struct GNUNET_TIME_Absolute now; + uint64_t del; + uint64_t avail; + uint64_t excess; + + now = GNUNET_TIME_absolute_get (); + del = now.value - n->last_quota_update.value; + if (del > MAX_BANDWIDTH_CARRY) + { + update_quota (n, GNUNET_YES); + del = now.value - n->last_quota_update.value; + GNUNET_assert (del <= MAX_BANDWIDTH_CARRY); + } + if (n->quota_in == 0) + n->quota_in = 1; /* avoid divison by zero */ + avail = del * n->quota_in; + if (avail > n->last_received) + return GNUNET_TIME_UNIT_ZERO; /* can receive right now */ + excess = n->last_received - avail; + ret.value = excess / n->quota_in; + if (ret.value > 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n", + (unsigned long long) excess, + (unsigned long long) n->quota_in, + (unsigned long long) ret.value); + return ret; +} + + /** * Function called by the plugin for each received message. * Update data volumes, possibly notify plugins about @@ -2196,151 +2593,112 @@ static int handle_ping(void *cls, const struct GNUNET_MessageHeader *message, * and generally forward to our receive callback. * * @param cls the "struct TransportPlugin *" we gave to the plugin - * @param message the message, NULL if peer was disconnected - * @param distance the transport cost to this peer (not latency!) - * @param sender_address the address that the sender reported - * (opaque to transport service) - * @param sender_address_len the length of the sender address * @param peer (claimed) identity of the other peer - * @return the new service_context that the plugin should use - * for future receive calls for messages from this - * particular peer - * - */ -static void + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again + * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) + * @param sender_address binary address of the sender (if observed) + * @param sender_address_len number of bytes in sender_address + * @return how long the plugin should wait until receiving more data + * (plugins that do not support this, can ignore the return value) + */ +static struct GNUNET_TIME_Relative plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message, unsigned int distance, const char *sender_address, size_t sender_address_len) { - const struct GNUNET_MessageHeader ack = { - htons (sizeof (struct GNUNET_MessageHeader)), - htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK) - }; struct ReadyList *service_context; struct TransportPlugin *plugin = cls; struct TransportClient *cpos; struct InboundMessage *im; + struct ForeignAddressList *peer_address; uint16_t msize; - struct NeighborList *n; + struct NeighbourList *n; - n = find_neighbor (peer, sender_address, sender_address_len); + n = find_neighbour (peer); if (n == NULL) - { - if (message == NULL) - return; /* disconnect of peer already marked down */ - n = setup_new_neighbor (peer, sender_address, sender_address_len); - } + n = setup_new_neighbour (peer); + update_quota (n, GNUNET_NO); service_context = n->plugins; while ((service_context != NULL) && (plugin != service_context->plugin)) service_context = service_context->next; GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL)); - if (message == NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Receive failed from `%4s', triggering disconnect\n", - GNUNET_i2s (&n->id)); -#endif - /* TODO: call stats */ - if (service_context != NULL) - service_context->connected = GNUNET_NO; - disconnect_neighbor (n, GNUNET_YES); - return; - } + if (message != NULL) + { + peer_address = add_peer_address(n, + plugin->short_name, + sender_address, + sender_address_len); + if (peer_address != NULL) + { + peer_address->distance = distance; + if (peer_address->connected == GNUNET_NO) + { + peer_address->connected = GNUNET_YES; + peer_address->connect_attempts++; + } + peer_address->timeout + = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + } + /* update traffic received amount ... */ + msize = ntohs (message->size); + n->distance = distance; + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_SCHEDULER_cancel (sched, + n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) + { + /* dropping message due to frequent inbound volume violations! */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING | + GNUNET_ERROR_TYPE_BULK, + _ + ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), + n->quota_violation_count); + return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */ + } + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_HELLO: + process_hello (plugin, message); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: + handle_ping(plugin, message, peer, sender_address, sender_address_len); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: + handle_pong(plugin, message, peer, sender_address, sender_address_len); + break; + default: #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Processing message of type `%u' received by plugin...\n", - ntohs (message->type)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s', sending to all clients.\n", + ntohs (message->type), GNUNET_i2s (peer)); #endif - if (service_context != NULL) - { - if (service_context->connected == GNUNET_NO) - { - service_context->connected = GNUNET_YES; - service_context->transmit_ready = GNUNET_YES; - service_context->connect_attempts++; - } - service_context->timeout - = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - /* service_context->latency = latency; */ /* This value should be set by us! */ - } - /* update traffic received amount ... */ - msize = ntohs (message->size); - n->last_received += msize; - GNUNET_SCHEDULER_cancel (sched, n->timeout_task); - n->peer_timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbor_timeout_task, n); - update_quota (n); - if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) - { - /* dropping message due to frequent inbound volume violations! */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING | - GNUNET_ERROR_TYPE_BULK, - _ - ("Dropping incoming message due to repeated bandwidth quota violations.\n")); - /* TODO: call stats */ - GNUNET_assert ((service_context == NULL) || - (NULL != service_context->neighbor)); - return; - } - switch (ntohs (message->type)) - { - case GNUNET_MESSAGE_TYPE_HELLO: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving `%s' message from `%4s'.\n", "HELLO", - GNUNET_i2s (peer)); -#endif - process_hello (plugin, message); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending `%s' message to connecting peer `%4s'.\n", "ACK", - GNUNET_i2s (peer)); -#endif - transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: - handle_ping(plugin, message, peer, sender_address, sender_address_len); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: - handle_pong(plugin, message, peer, sender_address, sender_address_len); - break; - //plugin_env_notify_validation(); - case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK: - n->saw_ack = GNUNET_YES; - /* intentional fall-through! */ - default: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from `%4s', sending to all clients.\n", - ntohs (message->type), GNUNET_i2s (peer)); -#endif - /* transmit message to all clients */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); - im->header.size = htons (sizeof (struct InboundMessage) + msize); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = n->latency; - im->peer = *peer; - memcpy (&im[1], message, msize); - - cpos = clients; - while (cpos != NULL) - { - transmit_to_client (cpos, &im->header, GNUNET_YES); - cpos = cpos->next; - } - GNUNET_free (im); - } - GNUNET_assert ((service_context == NULL) || - (NULL != service_context->neighbor)); + /* transmit message to all clients */ + im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); + im->header.size = htons (sizeof (struct InboundMessage) + msize); + im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + im->latency = GNUNET_TIME_relative_hton (n->latency); + im->peer = *peer; + memcpy (&im[1], message, msize); + cpos = clients; + while (cpos != NULL) + { + transmit_to_client (cpos, &im->header, GNUNET_YES); + cpos = cpos->next; + } + GNUNET_free (im); + } + } + return calculate_throttle_delay (n); } @@ -2359,9 +2717,7 @@ handle_start (void *cls, { struct TransportClient *c; struct ConnectInfoMessage cim; - struct NeighborList *n; - struct InboundMessage *im; - struct GNUNET_MessageHeader *ack; + struct NeighbourList *n; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2395,33 +2751,18 @@ handle_start (void *cls, /* tell new client about all existing connections */ cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = - htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); - cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.size = htons (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ - ack = (struct GNUNET_MessageHeader *) &im[1]; - ack->size = htons (sizeof (struct GNUNET_MessageHeader)); - ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK); - for (n = neighbors; n != NULL; n = n->next) - { - cim.id = n->id; - transmit_to_client (c, &cim.header, GNUNET_NO); - if (n->saw_ack) - { - im->peer = n->id; - transmit_to_client (c, &im->header, GNUNET_NO); + n = neighbours; + while (n != NULL) + { + if (GNUNET_YES == n->received_pong) + { + cim.id = n->id; + cim.latency = GNUNET_TIME_relative_hton (n->latency); + cim.distance = htonl (n->distance); + transmit_to_client (c, &cim.header, GNUNET_NO); } + n = n->next; } - GNUNET_free (im); - } - else - { - fprintf(stderr, "Our hello is NULL!\n"); } GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2441,10 +2782,6 @@ handle_hello (void *cls, { int ret; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client\n", "HELLO"); -#endif ret = process_hello (NULL, message); GNUNET_SERVER_receive_done (client, ret); } @@ -2463,7 +2800,7 @@ handle_send (void *cls, const struct GNUNET_MessageHeader *message) { struct TransportClient *tc; - struct NeighborList *n; + struct NeighbourList *n; const struct OutboundMessage *obm; const struct GNUNET_MessageHeader *obmm; uint16_t size; @@ -2491,9 +2828,9 @@ handle_send (void *cls, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - n = find_neighbor (&obm->peer, NULL, 0); + n = find_neighbour (&obm->peer); if (n == NULL) - n = setup_new_neighbor (&obm->peer, NULL, 0); + n = setup_new_neighbour (&obm->peer); tc = clients; while ((tc != NULL) && (tc->client != client)) tc = tc->next; @@ -2504,7 +2841,10 @@ handle_send (void *cls, ntohs (obmm->size), ntohs (obmm->type), GNUNET_i2s (&obm->peer)); #endif - transmit_to_peer (tc, ntohl (obm->priority), obmm, GNUNET_NO, n); + transmit_to_peer (tc, NULL, ntohl (obm->priority), + GNUNET_TIME_relative_ntoh (obm->timeout), + (char *)obmm, + ntohs (obmm->size), GNUNET_NO, n); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2523,74 +2863,29 @@ handle_set_quota (void *cls, { const struct QuotaSetMessage *qsm = (const struct QuotaSetMessage *) message; - struct NeighborList *n; - struct TransportPlugin *p; - struct ReadyList *rl; + struct NeighbourList *n; + uint32_t qin; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client for peer `%4s'\n", - "SET_QUOTA", GNUNET_i2s (&qsm->peer)); -#endif - n = find_neighbor (&qsm->peer, NULL, 0); + n = find_neighbour (&qsm->peer); if (n == NULL) { GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } - update_quota (n); - if (n->quota_in < ntohl (qsm->quota_in)) - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_in = ntohl (qsm->quota_in); - rl = n->plugins; - while (rl != NULL) - { - p = rl->plugin; - p->api->set_receive_quota (p->api->cls, - &qsm->peer, ntohl (qsm->quota_in)); - rl = rl->next; - } - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - -/** - * Handle TRY_CONNECT-message. - * - * @param cls closure (always NULL) - * @param client identification of the client - * @param message the actual message - */ -static void -handle_try_connect (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct TryConnectMessage *tcm; - - tcm = (const struct TryConnectMessage *) message; + qin = ntohl (qsm->quota_in); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client %p asking to connect to `%4s'\n", - "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer)); -#endif - if (NULL == find_neighbor (&tcm->peer, NULL, 0)) - setup_new_neighbor (&tcm->peer, NULL, 0); /* Can we set up a truly _new_ neighbor without - knowing its address? Should we ask the plugin - for more information about this peer? I don't - think we can... Or set up new peer should only - happen when transport notifies us of an address, - and this setup should check for an address in - the existing list only */ -#if DEBUG_TRANSPORT - else - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Client asked to connect to `%4s', but connection already exists\n", - "TRY_CONNECT", GNUNET_i2s (&tcm->peer)); + "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n", + "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer)); #endif + update_quota (n, GNUNET_YES); + if (n->quota_in < qin) + n->last_quota_update = GNUNET_TIME_absolute_get (); + n->quota_in = qin; GNUNET_SERVER_receive_done (client, GNUNET_OK); } + static void transmit_address_to_client (void *cls, const char *address) { @@ -2607,6 +2902,7 @@ transmit_address_to_client (void *cls, const char *address) GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); } + /** * Handle AddressLookup-message. * @@ -2683,9 +2979,6 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0}, {&handle_set_quota, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)}, - {&handle_try_connect, NULL, - GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT, - sizeof (struct TryConnectMessage)}, {&handle_address_lookup, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP, 0}, @@ -2705,8 +2998,6 @@ create_environment (struct TransportPlugin *plug) plug->env.cls = plug; plug->env.receive = &plugin_env_receive; plug->env.notify_address = &plugin_env_notify_address; - plug->env.default_quota_in = - (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); plug->env.max_connections = max_connect_per_transport; } @@ -2757,7 +3048,7 @@ client_disconnect_notification (void *cls, struct TransportClient *prev; struct ClientMessageQueueEntry *mqe; - if (client == NULL) + if (client == NULL) return; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, @@ -2774,10 +3065,12 @@ client_disconnect_notification (void *cls, return; while (NULL != (mqe = pos->message_queue_head)) { - pos->message_queue_head = mqe->next; + GNUNET_CONTAINER_DLL_remove (pos->message_queue_head, + pos->message_queue_tail, + mqe); + pos->message_count--; GNUNET_free (mqe); } - pos->message_queue_head = NULL; if (prev == NULL) clients = pos->next; else @@ -2787,22 +3080,54 @@ client_disconnect_notification (void *cls, pos->client = NULL; return; } + if (pos->th != NULL) + { + GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); + pos->th = NULL; + } + GNUNET_break (0 == pos->message_count); GNUNET_free (pos); } /** - * Function called when the service shuts down. Unloads our plugins. + * Iterator to free entries in the validation_map. + * + * @param cls closure (unused) + * @param key current key code + * @param value value in the hash map (validation to abort) + * @return GNUNET_YES (always) + */ +static int +abort_validation (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct ValidationEntry *va = value; + + GNUNET_SCHEDULER_cancel (sched, va->timeout_task); + GNUNET_free (va->transport_name); + GNUNET_free (va); + return GNUNET_YES; +} + + +/** + * Function called when the service shuts down. Unloads our plugins + * and cancels pending validations. * * @param cls closure, unused * @param tc task context (unused) */ static void -unload_plugins (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct TransportPlugin *plug; - struct AddressList *al; + struct OwnAddressList *al; + struct CheckHelloValidatedContext *chvc; + while (neighbours != NULL) + disconnect_neighbour (neighbours, GNUNET_NO); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport service is unloading plugins...\n"); @@ -2823,6 +3148,20 @@ unload_plugins (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) if (my_private_key != NULL) GNUNET_CRYPTO_rsa_key_free (my_private_key); GNUNET_free_non_null (our_hello); + + /* free 'chvc' data structure */ + while (NULL != (chvc = chvc_head)) + { + chvc_head = chvc->next; + GNUNET_PEERINFO_iterate_cancel (chvc->piter); + GNUNET_free (chvc); + } + chvc_tail = NULL; + + GNUNET_CONTAINER_multihashmap_iterate (validation_map, + &abort_validation, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (validation_map); } @@ -2848,6 +3187,7 @@ run (void *cls, sched = s; cfg = c; + validation_map = GNUNET_CONTAINER_multihashmap_create (64); /* parse configuration */ if ((GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c, @@ -2902,7 +3242,7 @@ run (void *cls, } GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_UNIT_FOREVER_REL, - &unload_plugins, NULL); + &shutdown_task, NULL); if (no_transports) refresh_hello ();