X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fgnunet-service-transport.c;h=d654fcc8cfc89c55cd5fe4c60f31f11d682b3d7b;hb=9d2cd5be986a0732b9e5a8fcdf5acd1c7225ef2f;hp=292f38039129306095367cf6ae6d1c0a80da96b9;hpb=697aa41b1e22503e2eb2d8802bdaf627b3931997;p=oweals%2Fgnunet.git diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index 292f38039..d654fcc8c 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -23,12 +23,15 @@ * @brief low-level P2P messaging * @author Christian Grothoff * - * TODO: - * - if we do not receive an ACK in response to our - * HELLO, retransmit HELLO! + * NOTE: + * - This code uses 'GNUNET_a2s' for debug printing in many places, + * which is technically wrong since it assumes we have IP+Port + * (v4/v6) addresses. Once we add transports like http or smtp + * this will have to be changed! */ #include "platform.h" #include "gnunet_client_lib.h" +#include "gnunet_container_lib.h" #include "gnunet_constants.h" #include "gnunet_getopt_lib.h" #include "gnunet_hello_lib.h" @@ -41,6 +44,12 @@ #include "plugin_transport.h" #include "transport.h" +/** + * Should we do some additional checks (to validate behavior + * of clients)? + */ +#define EXTRA_CHECKS GNUNET_YES + /** * How many messages can we have pending for a given client process * before we start to drop incoming messages? We typically should @@ -59,6 +68,13 @@ */ #define MAX_CONNECT_RETRY 3 +/** + * Limit on the number of ready-to-run tasks when validating + * HELLOs. If more tasks are ready to run, we will drop + * HELLOs instead of validating them. + */ +#define MAX_HELLO_LOAD 4 + /** * How often must a peer violate bandwidth quotas before we start * to simply drop its messages? @@ -73,7 +89,17 @@ * Besides, if a single request to an address takes a long time, * then the peer is unlikely worthwhile anyway. */ -#define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3) +#define HELLO_VERIFICATION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) + +/** + * How long will we allow sending of a ping to be delayed? + */ +#define TRANSPORT_DEFAULT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15) + +/** + * Priority to use for PONG messages. + */ +#define TRANSPORT_PONG_PRIORITY 4 /** * How often do we re-add (cheaper) plugins to our list of plugins @@ -82,29 +108,120 @@ #define PLUGIN_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15) /** - * After how long do we expire an address in a HELLO - * that we just validated? This value is also used - * for our own addresses when we create a HELLO. + * After how long do we expire an address in a HELLO that we just + * validated? This value is also used for our own addresses when we + * create a HELLO. */ #define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12) /** - * Entry in linked list of network addresses. + * How long before an existing address expires should we again try to + * validate it? Must be (significantly) smaller than + * HELLO_ADDRESS_EXPIRATION. + */ +#define HELLO_REVALIDATION_START_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 1) + + +/** + * List of addresses of other peers + */ +struct ForeignAddressList +{ + /** + * This is a linked list. + */ + struct ForeignAddressList *next; + + /** + * Which ready list does this entry belong to. + */ + struct ReadyList *ready_list; + + /** + * How long until we auto-expire this address (unless it is + * re-confirmed by the transport)? + */ + struct GNUNET_TIME_Absolute expires; + + /** + * Length of addr. + */ + size_t addrlen; + + /** + * The address. + */ + const void *addr; + + /** + * What was the last latency observed for this plugin + * and peer? Invalid if connected is GNUNET_NO. + */ + struct GNUNET_TIME_Relative latency; + + /** + * If we did not successfully transmit a message to the given peer + * via this connection during the specified time, we should consider + * the connection to be dead. This is used in the case that a TCP + * transport simply stalls writing to the stream but does not + * formerly get a signal that the other peer died. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Are we currently connected via this address? The first time we + * successfully transmit or receive data to a peer via a particular + * address, we set this to GNUNET_YES. If we later get an error + * (disconnect notification, transmission failure, timeout), we set + * it back to GNUNET_NO. + */ + int connected; + + /** + * Is this plugin currently busy transmitting to the specific target? + * GNUNET_NO if not (initial, default state is GNUNET_NO). Internal + * messages do not count as 'in transmit'. + */ + int in_transmit; + + /** + * Has this address been validated yet? + */ + int validated; + + /** + * How often have we tried to connect using this plugin? Used to + * discriminate against addresses that do not work well. + * FIXME: not yet used, but should be! + */ + unsigned int connect_attempts; + + /** + * DV distance to this peer (1 if no DV is used). + * FIXME: need to set this from transport plugins! + */ + uint32_t distance; + +}; + + +/** + * Entry in linked list of network addresses for ourselves. */ -struct AddressList +struct OwnAddressList { /** * This is a linked list. */ - struct AddressList *next; + struct OwnAddressList *next; /** * The address, actually a pointer to the end * of this struct. Do not free! */ - void *addr; - + const void *addr; + /** * How long until we auto-expire this address (unless it is * re-confirmed by the transport)? @@ -149,7 +266,7 @@ struct TransportPlugin /** * List of our known addresses for this transport. */ - struct AddressList *addresses; + struct OwnAddressList *addresses; /** * Environment this transport service is using @@ -162,7 +279,6 @@ struct TransportPlugin */ GNUNET_SCHEDULER_TaskIdentifier address_update_task; - /** * Set to GNUNET_YES if we need to scrap the existing * list of "addresses" and start fresh when we receive @@ -171,6 +287,7 @@ struct TransportPlugin * to the list and wait for the commit call. */ int rebuild; + }; struct NeighbourList; @@ -183,26 +300,42 @@ struct MessageQueue { /** - * This is a linked list. + * This is a doubly linked list. */ struct MessageQueue *next; /** - * The message we want to transmit. + * This is a doubly linked list. + */ + struct MessageQueue *prev; + + /** + * The message(s) we want to transmit, GNUNET_MessageHeader(s) + * stuck together in memory. Allocated at the end of this struct. + */ + const char *message_buf; + + /** + * Size of the message buf */ - struct GNUNET_MessageHeader *message; + size_t message_buf_size; /** * Client responsible for queueing the message; - * used to check that a client has not two messages + * used to check that a client has no two messages * pending for the same target. Can be NULL. */ struct TransportClient *client; /** - * Neighbour this entry belongs to. + * Using which specific address should we send this message? */ - struct NeighbourList *neighbour; + struct ForeignAddressList *specific_address; + + /** + * Peer ID of the Neighbour this entry belongs to. + */ + struct GNUNET_PeerIdentity neighbour_id; /** * Plugin that we used for the transmission. @@ -210,6 +343,11 @@ struct MessageQueue */ struct TransportPlugin *plugin; + /** + * At what time should we fail? + */ + struct GNUNET_TIME_Absolute timeout; + /** * Internal message of the transport system that should not be * included in the usual SEND-SEND_OK transmission confirmation @@ -223,7 +361,7 @@ struct MessageQueue * How important is the message? */ unsigned int priority; - + }; @@ -233,7 +371,6 @@ struct MessageQueue */ struct ReadyList { - /** * This is a linked list. */ @@ -246,52 +383,10 @@ struct ReadyList struct TransportPlugin *plugin; /** - * Neighbour this entry belongs to. - */ - struct NeighbourList *neighbour; - - /** - * What was the last latency observed for this plugin - * and peer? Invalid if connected is GNUNET_NO. - */ - struct GNUNET_TIME_Relative latency; - - /** - * If we did not successfully transmit a message to the - * given peer via this connection during the specified - * time, we should consider the connection to be dead. - * This is used in the case that a TCP transport simply - * stalls writing to the stream but does not formerly - * get a signal that the other peer died. - */ - struct GNUNET_TIME_Absolute timeout; - - /** - * Is this plugin currently connected? The first time - * we transmit or send data to a peer via a particular - * plugin, we set this to GNUNET_YES. If we later get - * an error (disconnect notification or transmission - * failure), we set it back to GNUNET_NO. Each time the - * value is set to GNUNET_YES, we increment the - * "connect_attempts" counter. If that one reaches a - * particular threshold, we consider the plugin to not - * be working properly at this time for the given peer - * and remove it from the eligible list. - */ - int connected; - - /** - * How often have we tried to connect using this plugin? - */ - unsigned int connect_attempts; - - /** - * Is this plugin ready to transmit to the specific - * target? GNUNET_NO if not. Initially, all plugins - * are marked ready. If a transmission is in progress, - * "transmit_ready" is set to GNUNET_NO. + * Transport addresses, latency, and readiness for + * this particular plugin. */ - int transmit_ready; + struct ForeignAddressList *addresses; }; @@ -314,10 +409,16 @@ struct NeighbourList struct ReadyList *plugins; /** - * List of messages we would like to send to this peer; + * Head of list of messages we would like to send to this peer; * must contain at most one message per client. */ - struct MessageQueue *messages; + struct MessageQueue *messages_head; + + /** + * Tail of list of messages we would like to send to this peer; must + * contain at most one message per client. + */ + struct MessageQueue *messages_tail; /** * Identity of this neighbour. @@ -330,6 +431,12 @@ struct NeighbourList */ GNUNET_SCHEDULER_TaskIdentifier timeout_task; + /** + * ID of task scheduled to run when we should retry transmitting + * the head of the message queue. + */ + GNUNET_SCHEDULER_TaskIdentifier retry_task; + /** * How long until we should consider this peer dead * (if we don't receive another message in the @@ -343,10 +450,22 @@ struct NeighbourList struct GNUNET_TIME_Absolute last_quota_update; /** - * At what time should we try to again add plugins to - * our ready list? + * The latency we have seen for this particular address for + * this particular peer. This latency may have been calculated + * over multiple transports. This value reflects how long it took + * us to receive a response when SENDING via this particular + * transport/neighbour/address combination! + * + * FIXME: we need to periodically send PINGs to update this + * latency (at least more often than the current "huge" (11h?) + * update interval). */ - struct GNUNET_TIME_Absolute retry_plugins_time; + struct GNUNET_TIME_Relative latency; + + /** + * DV distance to this peer (1 if no DV is used). + */ + uint32_t distance; /** * How many bytes have we received since the "last_quota_update" @@ -368,26 +487,108 @@ struct NeighbourList unsigned int quota_violation_count; /** - * Have we seen an ACK from this neighbour in the past? - * (used to make up a fake ACK for clients connecting after - * the neighbour connected to us). + * Have we seen an PONG from this neighbour in the past (and + * not had a disconnect since)? + */ + int received_pong; + +}; + +/** + * Message used to ask a peer to validate receipt (to check an address + * from a HELLO). Followed by the address used. Note that the + * recipients response does not affirm that he has this address, + * only that he got the challenge message. + */ +struct TransportPingMessage +{ + + /** + * Type will be GNUNET_MESSAGE_TYPE_TRANSPORT_PING + */ + struct GNUNET_MessageHeader header; + + /** + * Random challenge number (in network byte order). + */ + uint32_t challenge GNUNET_PACKED; + + /** + * Who is the intended recipient? + */ + struct GNUNET_PeerIdentity target; + +}; + + +/** + * Message used to validate a HELLO. The challenge is included in the + * confirmation to make matching of replies to requests possible. The + * signature signs the original challenge number, our public key, the + * sender's address (so that the sender can check that the address we + * saw is plausible for him and possibly detect a MiM attack) and a + * timestamp (to limit replay).

+ * + * This message is followed by the address of the + * client that we are observing (which is part of what + * is being signed). + */ +struct TransportPongMessage +{ + + /** + * Type will be GNUNET_MESSAGE_TYPE_TRANSPORT_PONG + */ + struct GNUNET_MessageHeader header; + + /** + * For padding, always zero. + */ + uint32_t reserved GNUNET_PACKED; + + /** + * Signature. + */ + struct GNUNET_CRYPTO_RsaSignature signature; + + /** + * What are we signing and why? */ - int saw_ack; + struct GNUNET_CRYPTO_RsaSignaturePurpose purpose; + + /** + * Random challenge number (in network byte order). + */ + uint32_t challenge GNUNET_PACKED; + + /** + * Who signed this message? + */ + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded signer; + + /** + * Size of address appended to this message + */ + size_t addrlen; }; /** - * Linked list of messages to be transmitted to - * the client. Each entry is followed by the - * actual message. + * Linked list of messages to be transmitted to the client. Each + * entry is followed by the actual message. */ struct ClientMessageQueueEntry { /** - * This is a linked list. + * This is a doubly-linked list. */ struct ClientMessageQueueEntry *next; + + /** + * This is a doubly-linked list. + */ + struct ClientMessageQueueEntry *prev; }; @@ -419,6 +620,11 @@ struct TransportClient */ struct ClientMessageQueueEntry *message_queue_tail; + /** + * Current transmit request handle. + */ + struct GNUNET_CONNECTION_TransmitHandle *th; + /** * Is a call to "transmit_send_continuation" pending? If so, we * must not free this struct (even if the corresponding client @@ -436,15 +642,16 @@ struct TransportClient /** - * For each HELLO, we may have to validate multiple addresses; - * each address gets its own request entry. + * Entry in map of all HELLOs awaiting validation. */ -struct ValidationAddress +struct ValidationEntry { + /** - * This is a linked list. + * The address, actually a pointer to the end + * of this struct. Do not free! */ - struct ValidationAddress *next; + const void *addr; /** * Name of the transport. @@ -452,74 +659,78 @@ struct ValidationAddress char *transport_name; /** - * When should this validated address expire? + * The public key of the peer. */ - struct GNUNET_TIME_Absolute expiration; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; /** - * Length of the address we are validating. + * ID of task that will clean up this entry if we don't succeed + * with the validation first. */ - size_t addr_len; + GNUNET_SCHEDULER_TaskIdentifier timeout_task; /** - * Challenge number we used. + * At what time did we send this validation? */ - uint32_t challenge; + struct GNUNET_TIME_Absolute send_time; + + /** + * Length of addr. + */ + size_t addrlen; /** - * Set to GNUNET_YES if the challenge was met, - * GNUNET_SYSERR if we know it failed, GNUNET_NO - * if we are waiting on a response. + * Challenge number we used. */ - int ok; + uint32_t challenge; + }; /** - * Entry in linked list of all HELLOs awaiting validation. + * Context of currently active requests to peerinfo + * for validation of HELLOs. */ -struct ValidationList +struct CheckHelloValidatedContext { /** - * This is a linked list. + * This is a doubly-linked list. */ - struct ValidationList *next; + struct CheckHelloValidatedContext *next; /** - * Linked list with one entry per address from the HELLO - * that needs to be validated. + * This is a doubly-linked list. */ - struct ValidationAddress *addresses; + struct CheckHelloValidatedContext *prev; /** - * The public key of the peer. + * Hello that we are validating. */ - struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey; + const struct GNUNET_HELLO_Message *hello; /** - * When does this record time-out? (assuming the - * challenge goes unanswered) + * Context for peerinfo iteration. + * NULL after we are done processing peerinfo's information. */ - struct GNUNET_TIME_Absolute timeout; + struct GNUNET_PEERINFO_IteratorContext *piter; + + /** + * Was a HELLO known for this peer to peerinfo? + */ + int hello_known; }; -/** - * HELLOs awaiting validation. - */ -static struct ValidationList *pending_validations; - /** * Our HELLO message. */ static struct GNUNET_HELLO_Message *our_hello; /** - * "version" of "our_hello". Used to see if a given - * neighbour has already been sent the latest version - * of our HELLO message. + * "version" of "our_hello". Used to see if a given neighbour has + * already been sent the latest version of our HELLO message. */ static unsigned int our_hello_version; @@ -573,9 +784,53 @@ static struct NeighbourList *neighbours; */ static uint32_t max_connect_per_transport; +/** + * Head of linked list. + */ +static struct CheckHelloValidatedContext *chvc_head; + +/** + * Tail of linked list. + */ +static struct CheckHelloValidatedContext *chvc_tail; + + +/** + * Map of PeerIdentities to 'struct ValidationEntry*'s (addresses + * of the given peer that we are currently validating). + */ +static struct GNUNET_CONTAINER_MultiHashMap *validation_map; + + +/** + * The peer specified by the given neighbour has timed-out or a plugin + * has disconnected. We may either need to do nothing (other plugins + * still up), or trigger a full disconnect and clean up. This + * function updates our state and do the necessary notifications. + * Also notifies our clients that the neighbour is now officially + * gone. + * + * @param n the neighbour list entry for the peer + * @param check should we just check if all plugins + * disconnected or must we ask all plugins to + * disconnect? + */ +static void disconnect_neighbour (struct NeighbourList *n, int check); + +/** + * Check the ready list for the given neighbour and if a plugin is + * ready for transmission (and if we have a message), do so! + * + * @param neighbour target peer for which to transmit + */ +static void try_transmission_to_peer (struct NeighbourList *neighbour); + /** * Find an entry in the neighbour list for a particular peer. + * if sender_address is not specified (NULL) then return the + * first matching entry. If sender_address is specified, then + * make sure that the address and address_len also matches. * * @return NULL if not found. */ @@ -583,8 +838,9 @@ static struct NeighbourList * find_neighbour (const struct GNUNET_PeerIdentity *key) { struct NeighbourList *head = neighbours; + while ((head != NULL) && - (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity)))) + (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity)))) head = head->next; return head; } @@ -607,18 +863,26 @@ find_transport (const char *short_name) /** * Update the quota values for the given neighbour now. + * + * @param n neighbour to update + * @param force GNUNET_YES to force recalculation now */ static void -update_quota (struct NeighbourList *n) +update_quota (struct NeighbourList *n, + int force) { - struct GNUNET_TIME_Relative delta; + struct GNUNET_TIME_Absolute now; + unsigned long long delta; uint64_t allowed; uint64_t remaining; - delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - if (delta.value < MIN_QUOTA_REFRESH_TIME) - return; /* not enough time passed for doing quota update */ - allowed = delta.value * n->quota_in; + now = GNUNET_TIME_absolute_get (); + delta = now.value - n->last_quota_update.value; + allowed = n->quota_in * delta; + if ( (delta < MIN_QUOTA_REFRESH_TIME) && + (!force) && + (allowed < 32 * 1024) ) + return; /* too early, not enough data */ if (n->last_received < allowed) { remaining = allowed - n->last_received; @@ -629,7 +893,7 @@ update_quota (struct NeighbourList *n) if (remaining > MAX_BANDWIDTH_CARRY) remaining = MAX_BANDWIDTH_CARRY; n->last_received = 0; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; n->last_quota_update.value -= remaining; if (n->quota_violation_count > 0) n->quota_violation_count--; @@ -637,10 +901,10 @@ update_quota (struct NeighbourList *n) else { n->last_received -= allowed; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; if (n->last_received > allowed) { - /* more than twice the allowed rate! */ + /* much more than the allowed rate! */ n->quota_violation_count += 10; } } @@ -648,10 +912,9 @@ update_quota (struct NeighbourList *n) /** - * Function called to notify a client about the socket - * being ready to queue more data. "buf" will be - * NULL and "size" zero if the socket was closed for - * writing in the meantime. + * Function called to notify a client about the socket being ready to + * queue more data. "buf" will be NULL and "size" zero if the socket + * was closed for writing in the meantime. * * @param cls closure * @param size number of bytes available in buf @@ -666,20 +929,21 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) uint16_t msize; size_t tsize; const struct GNUNET_MessageHeader *msg; - struct GNUNET_CONNECTION_TransmitHandle *th; char *cbuf; + client->th = NULL; if (buf == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission to client failed, closing connection.\n"); + "Transmission to client failed, closing connection.\n"); /* fatal error with client, free message queue! */ while (NULL != (q = client->message_queue_head)) { - client->message_queue_head = q->next; + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); GNUNET_free (q); } - client->message_queue_tail = NULL; client->message_count = 0; return 0; } @@ -693,12 +957,12 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) break; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting message of type %u to client.\n", - ntohs (msg->type)); + "Transmitting message of type %u to client.\n", + ntohs (msg->type)); #endif - client->message_queue_head = q->next; - if (q->next == NULL) - client->message_queue_tail = NULL; + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); memcpy (&cbuf[tsize], msg, msize); tsize += msize; GNUNET_free (q); @@ -707,12 +971,12 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) if (NULL != q) { GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } return tsize; } @@ -734,7 +998,6 @@ transmit_to_client (struct TransportClient *client, { struct ClientMessageQueueEntry *q; uint16_t msize; - struct GNUNET_CONNECTION_TransmitHandle *th; if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop)) { @@ -745,90 +1008,49 @@ transmit_to_client (struct TransportClient *client, /* TODO: call to statistics... */ return; } - client->message_count++; msize = ntohs (msg->size); GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize); memcpy (&q[1], msg, msize); - /* append to message queue */ - if (client->message_queue_tail == NULL) - { - client->message_queue_tail = q; - } - else + GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head, + client->message_queue_tail, + client->message_queue_tail, + q); + client->message_count++; + if (client->th == NULL) { - client->message_queue_tail->next = q; - client->message_queue_tail = q; - } - if (client->message_queue_head == NULL) - { - client->message_queue_head = q; - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } } /** - * Find alternative plugins for communication. + * Transmit a 'SEND_OK' notification to the given client for the + * given neighbour. * - * @param neighbour for which neighbour should we try to find - * more plugins? + * @param client who to notify + * @param n neighbour to notify about + * @param result status code for the transmission request */ static void -try_alternative_plugins (struct NeighbourList *neighbour) +transmit_send_ok (struct TransportClient *client, + struct NeighbourList *n, + int result) { - struct ReadyList *rl; + struct SendOkMessage send_ok_msg; - if ((neighbour->plugins != NULL) && - (neighbour->retry_plugins_time.value > - GNUNET_TIME_absolute_get ().value)) - return; /* don't try right now */ - neighbour->retry_plugins_time - = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY); - - rl = neighbour->plugins; - while (rl != NULL) - { - if (rl->connect_attempts > 0) - rl->connect_attempts--; /* amnesty */ - rl = rl->next; - } - -} - - -/** - * The peer specified by the given neighbour has timed-out or a plugin - * has disconnected. We may either need to do nothing (other plugins - * still up), or trigger a full disconnect and clean up. This - * function updates our state and do the necessary notifications. - * Also notifies our clients that the neighbour is now officially - * gone. - * - * @param n the neighbour list entry for the peer - * @param check should we just check if all plugins - * disconnected or must we ask all plugins to - * disconnect? - */ -static void -disconnect_neighbour (struct NeighbourList *n, - int check); - - -/** - * Check the ready list for the given neighbour and - * if a plugin is ready for transmission (and if we - * have a message), do so! - * - * @param neighbour target peer for which to check the plugins - */ -static void -try_transmission_to_peer (struct NeighbourList *neighbour); + send_ok_msg.header.size = htons (sizeof (send_ok_msg)); + send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); + send_ok_msg.success = htonl (result); + send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency); + send_ok_msg.peer = n->id; + transmit_to_client (client, &send_ok_msg.header, GNUNET_NO); +} /** @@ -840,9 +1062,6 @@ try_transmission_to_peer (struct NeighbourList *neighbour); * @param cls closure, identifies the entry on the * message queue that was transmitted and the * client responsible for queueing the message - * @param rl identifies plugin used for the transmission for - * this neighbour; needs to be re-enabled for - * future transmissions * @param target the peer receiving the message * @param result GNUNET_OK on success, if the transmission * failed, we should not tell the client to transmit @@ -850,141 +1069,201 @@ try_transmission_to_peer (struct NeighbourList *neighbour); */ static void transmit_send_continuation (void *cls, - struct ReadyList *rl, const struct GNUNET_PeerIdentity *target, int result) { struct MessageQueue *mq = cls; - struct SendOkMessage send_ok_msg; struct NeighbourList *n; - GNUNET_assert (mq != NULL); - n = mq->neighbour; + n = find_neighbour(&mq->neighbour_id); GNUNET_assert (n != NULL); - GNUNET_assert (0 == - memcmp (&n->id, target, - sizeof (struct GNUNET_PeerIdentity))); - if (rl == NULL) - { - rl = n->plugins; - while ((rl != NULL) && (rl->plugin != mq->plugin)) - rl = rl->next; - GNUNET_assert (rl != NULL); - } - if (result == GNUNET_OK) + if (mq->specific_address != NULL) { - rl->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + if (result == GNUNET_OK) + { + mq->specific_address->timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + mq->specific_address->connected = GNUNET_YES; + } + else + { + mq->specific_address->connected = GNUNET_NO; + } + if (! mq->internal_msg) + mq->specific_address->in_transmit = GNUNET_NO; } - else + if (mq->client != NULL) + transmit_send_ok (mq->client, n, result); + GNUNET_free (mq); + try_transmission_to_peer (n); + if (result != GNUNET_OK) + disconnect_neighbour (n, GNUNET_YES); +} + + +/** + * Find an address in any of the available transports for + * the given neighbour that would be good for message + * transmission. This is essentially the transport selection + * routine. + * + * @param neighbour for whom to select an address + * @return selected address, NULL if we have none + */ +struct ForeignAddressList * +find_ready_address(struct NeighbourList *neighbour) +{ + struct ReadyList *head = neighbour->plugins; + struct ForeignAddressList *addresses; + struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); + struct ForeignAddressList *best_address; + + best_address = NULL; + while (head != NULL) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission to peer `%s' failed, marking connection as down.\n", - GNUNET_i2s(target)); - rl->connected = GNUNET_NO; + addresses = head->addresses; + while (addresses != NULL) + { + if ( (addresses->timeout.value < now.value) && + (addresses->connected == GNUNET_YES) ) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Marking long-time inactive connection to `%4s' as down.\n", + GNUNET_i2s (&neighbour->id)); +#endif + addresses->connected = GNUNET_NO; + } + addresses = addresses->next; + } + + addresses = head->addresses; + while (addresses != NULL) + { + if ( ( (best_address == NULL) || + (addresses->connected == GNUNET_YES) || + (best_address->connected == GNUNET_NO) ) && + (addresses->in_transmit == GNUNET_NO) && + ( (best_address == NULL) || + (addresses->latency.value < best_address->latency.value)) ) + best_address = addresses; + /* FIXME: also give lower-latency addresses that are not + connected a chance some times... */ + addresses = addresses->next; + } + head = head->next; } - if (!mq->internal_msg) - rl->transmit_ready = GNUNET_YES; - if (mq->client != NULL) +#if DEBUG_TRANSPORT + if (best_address != NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Notifying client %p about failed transission to peer `%4s'.\n", - mq->client, - GNUNET_i2s(target)); - send_ok_msg.header.size = htons (sizeof (send_ok_msg)); - send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - send_ok_msg.success = htonl (result); - send_ok_msg.peer = n->id; - transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO); + "Best address found has latency of %llu ms.\n", + best_address->latency.value); } - GNUNET_free (mq->message); - GNUNET_free (mq); - /* one plugin just became ready again, try transmitting - another message (if available) */ - if (result == GNUNET_OK) - try_transmission_to_peer (n); - else - disconnect_neighbour (n, GNUNET_YES); +#endif + return best_address; + } /** - * Check the ready list for the given neighbour and - * if a plugin is ready for transmission (and if we - * have a message), do so! + * We should re-try transmitting to the given peer, + * hopefully we've learned something in the meantime. + */ +static void +retry_transmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourList *n = cls; + + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); +} + + +/** + * Check the ready list for the given neighbour and if a plugin is + * ready for transmission (and if we have a message), do so! + * + * @param neighbour target peer for which to transmit */ static void try_transmission_to_peer (struct NeighbourList *neighbour) { - struct ReadyList *pos; - struct GNUNET_TIME_Relative min_latency; struct ReadyList *rl; struct MessageQueue *mq; - struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Relative timeout; - if (neighbour->messages == NULL) + if (neighbour->messages_head == NULL) return; /* nothing to do */ - try_alternative_plugins (neighbour); - min_latency = GNUNET_TIME_UNIT_FOREVER_REL; rl = NULL; - mq = neighbour->messages; - now = GNUNET_TIME_absolute_get (); - pos = neighbour->plugins; - while (pos != NULL) - { - /* set plugins that are inactive for a long time back to disconnected */ - if ((pos->timeout.value < now.value) && (pos->connected == GNUNET_YES)) - { + mq = neighbour->messages_head; + /* FIXME: support bi-directional use of TCP */ + if (mq->specific_address == NULL) + mq->specific_address = find_ready_address(neighbour); + if (mq->specific_address == NULL) + { + timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); + if (timeout.value == 0) + { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Marking long-time inactive connection to `%4s' as down.\n", - GNUNET_i2s (&neighbour->id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No destination address available to transmit message of size %u to peer `%4s'\n", + mq->message_buf_size, + GNUNET_i2s (&mq->neighbour_id)); #endif - pos->connected = GNUNET_NO; - } - if (((GNUNET_YES == pos->transmit_ready) || - (mq->internal_msg)) && - (pos->connect_attempts < MAX_CONNECT_RETRY) && - ((rl == NULL) || (min_latency.value > pos->latency.value))) - { - rl = pos; - min_latency = pos->latency; - } - pos = pos->next; - } - if (rl == NULL) - { + if (mq->client != NULL) + transmit_send_ok (mq->client, neighbour, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (neighbour->messages_head, + neighbour->messages_tail, + mq); + GNUNET_free (mq); + return; /* nobody ready */ + } + if (neighbour->retry_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, + neighbour->retry_task); + neighbour->retry_task = GNUNET_SCHEDULER_add_delayed (sched, + timeout, + &retry_transmission_task, + neighbour); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No plugin ready to transmit message\n"); + "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n", + mq->message_buf_size, + GNUNET_i2s (&mq->neighbour_id), + timeout.value); #endif - return; /* nobody ready */ - } - if (GNUNET_NO == rl->connected) - { - rl->connect_attempts++; - rl->connected = GNUNET_YES; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Establishing fresh connection with `%4s' via plugin `%s'\n", - GNUNET_i2s (&neighbour->id), rl->plugin->short_name); -#endif - } - neighbour->messages = mq->next; + return; + } + GNUNET_CONTAINER_DLL_remove (neighbour->messages_head, + neighbour->messages_tail, + mq); + if (mq->specific_address->connected == GNUNET_NO) + mq->specific_address->connect_attempts++; + rl = mq->specific_address->ready_list; mq->plugin = rl->plugin; if (!mq->internal_msg) - rl->transmit_ready = GNUNET_NO; + mq->specific_address->in_transmit = GNUNET_YES; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Giving message of type `%u' for `%4s' to plugin `%s'\n", - ntohs (mq->message->type), - GNUNET_i2s (&neighbour->id), rl->plugin->short_name); + "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n", + mq->message_buf_size, + GNUNET_i2s (&neighbour->id), + GNUNET_a2s (mq->specific_address->addr, + mq->specific_address->addrlen), + rl->plugin->short_name); #endif rl->plugin->api->send (rl->plugin->api->cls, - rl, - &neighbour->id, + &mq->neighbour_id, + mq->message_buf, + mq->message_buf_size, mq->priority, - mq->message, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + mq->specific_address->addr, + mq->specific_address->addrlen, + GNUNET_YES /* FIXME: sometimes, we want to be more tolerant here! */, &transmit_send_continuation, mq); } @@ -993,30 +1272,31 @@ try_transmission_to_peer (struct NeighbourList *neighbour) * Send the specified message to the specified peer. * * @param client source of the transmission request (can be NULL) + * @param peer_address ForeignAddressList where we should send this message * @param priority how important is the message - * @param msg message to send - * @param is_internal is this an internal message + * @param timeout how long do we have to transmit? + * @param message_buf message(s) to send GNUNET_MessageHeader(s) + * @param message_buf_size total size of all messages in message_buf + * @param is_internal is this an internal message; these are pre-pended and + * also do not count for plugins being "ready" to transmit * @param neighbour handle to the neighbour for transmission */ static void transmit_to_peer (struct TransportClient *client, - unsigned int priority, - const struct GNUNET_MessageHeader *msg, + struct ForeignAddressList *peer_address, + unsigned int priority, + struct GNUNET_TIME_Relative timeout, + const char *message_buf, + size_t message_buf_size, int is_internal, struct NeighbourList *neighbour) { struct MessageQueue *mq; - struct MessageQueue *mqe; - struct GNUNET_MessageHeader *m; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - _("Sending message of type %u to peer `%4s'\n"), - ntohs (msg->type), GNUNET_i2s (&neighbour->id)); -#endif +#if EXTRA_CHECKS if (client != NULL) { /* check for duplicate submission */ - mq = neighbour->messages; + mq = neighbour->messages_head; while (NULL != mq) { if (mq->client == client) @@ -1029,31 +1309,27 @@ transmit_to_peer (struct TransportClient *client, mq = mq->next; } } - mq = GNUNET_malloc (sizeof (struct MessageQueue)); +#endif + mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size); + mq->specific_address = peer_address; mq->client = client; - m = GNUNET_malloc (ntohs (msg->size)); - memcpy (m, msg, ntohs (msg->size)); - mq->message = m; - mq->neighbour = neighbour; + memcpy (&mq[1], message_buf, message_buf_size); + mq->message_buf = (const char*) &mq[1]; + mq->message_buf_size = message_buf_size; + memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity)); mq->internal_msg = is_internal; mq->priority = priority; - - /* find tail */ - mqe = neighbour->messages; - if (mqe != NULL) - while (mqe->next != NULL) - mqe = mqe->next; - if (mqe == NULL) - { - /* new head */ - neighbour->messages = mq; - try_transmission_to_peer (neighbour); - } + mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); + if (is_internal) + GNUNET_CONTAINER_DLL_insert (neighbour->messages_head, + neighbour->messages_tail, + mq); else - { - /* append */ - mqe->next = mq; - } + GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head, + neighbour->messages_tail, + neighbour->messages_tail, + mq); + try_transmission_to_peer (neighbour); } @@ -1063,7 +1339,7 @@ transmit_to_peer (struct TransportClient *client, struct GeneratorContext { struct TransportPlugin *plug_pos; - struct AddressList *addr_pos; + struct OwnAddressList *addr_pos; struct GNUNET_TIME_Absolute expiration; }; @@ -1083,7 +1359,10 @@ address_generator (void *cls, size_t max, void *buf) gc->addr_pos = (gc->plug_pos != NULL) ? gc->plug_pos->addresses : NULL; } if (NULL == gc->plug_pos) - return 0; + { + + return 0; + } ret = GNUNET_HELLO_add_address (gc->plug_pos->short_name, gc->expiration, gc->addr_pos->addr, @@ -1105,15 +1384,14 @@ refresh_hello () struct NeighbourList *npos; struct GeneratorContext gc; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Refreshing my `%s'\n", - "HELLO"); -#endif gc.plug_pos = plugins; gc.addr_pos = plugins != NULL ? plugins->addresses : NULL; gc.expiration = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); hello = GNUNET_HELLO_create (&my_public_key, &address_generator, &gc); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "Refreshed my `%s', new size is %d\n", "HELLO", GNUNET_HELLO_size(hello)); +#endif cpos = clients; while (cpos != NULL) { @@ -1132,13 +1410,14 @@ refresh_hello () { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Transmitting updated `%s' to neighbour `%4s'\n", - "HELLO", - GNUNET_i2s(&npos->id)); + "Transmitting updated `%s' to neighbour `%4s'\n", + "HELLO", GNUNET_i2s (&npos->id)); #endif - transmit_to_peer (NULL, 0, - (const struct GNUNET_MessageHeader *) our_hello, - GNUNET_YES, npos); + transmit_to_peer (NULL, NULL, 0, + HELLO_ADDRESS_EXPIRATION, + (const char *) our_hello, + GNUNET_HELLO_size(our_hello), + GNUNET_NO, npos); npos = npos->next; } } @@ -1170,9 +1449,9 @@ update_addresses (struct TransportPlugin *plugin, int fresh) struct GNUNET_TIME_Relative min_remaining; struct GNUNET_TIME_Relative remaining; struct GNUNET_TIME_Absolute now; - struct AddressList *pos; - struct AddressList *prev; - struct AddressList *next; + struct OwnAddressList *pos; + struct OwnAddressList *prev; + struct OwnAddressList *next; int expired; if (plugin->address_update_task != GNUNET_SCHEDULER_NO_TASK) @@ -1251,7 +1530,7 @@ plugin_env_notify_address (void *cls, struct GNUNET_TIME_Relative expires) { struct TransportPlugin *p = cls; - struct AddressList *al; + struct OwnAddressList *al; struct GNUNET_TIME_Absolute abex; abex = GNUNET_TIME_relative_to_absolute (expires); @@ -1268,12 +1547,8 @@ plugin_env_notify_address (void *cls, } al = al->next; } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Plugin `%s' informs us about a new address `%s'\n", name, - GNUNET_a2s(addr, addrlen)); -#endif - al = GNUNET_malloc (sizeof (struct AddressList) + addrlen); + + al = GNUNET_malloc (sizeof (struct OwnAddressList) + addrlen); al->addr = &al[1]; al->next = p->addresses; p->addresses = al; @@ -1284,101 +1559,25 @@ plugin_env_notify_address (void *cls, } -/** - * FIXME: document. - */ -struct LookupHelloContext -{ - GNUNET_TRANSPORT_AddressCallback iterator; - - void *iterator_cls; -}; - - -/** - * FIXME: document. - */ -static int -lookup_address_callback (void *cls, - const char *tname, - struct GNUNET_TIME_Absolute expiration, - const void *addr, size_t addrlen) -{ - struct LookupHelloContext *lhc = cls; - lhc->iterator (lhc->iterator_cls, tname, addr, addrlen); - return GNUNET_OK; -} - - -/** - * FIXME: document. - */ -static void -lookup_hello_callback (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HELLO_Message *h, uint32_t trust) -{ - struct LookupHelloContext *lhc = cls; - - if (peer == NULL) - { - lhc->iterator (lhc->iterator_cls, NULL, NULL, 0); - GNUNET_free (lhc); - return; - } - if (h == NULL) - return; - GNUNET_HELLO_iterate_addresses (h, - GNUNET_NO, &lookup_address_callback, lhc); -} - - -/** - * Function that allows a transport to query the known - * network addresses for a given peer. - * - * @param cls closure - * @param timeout after how long should we time out? - * @param target which peer are we looking for? - * @param iter function to call for each known address - * @param iter_cls closure for iter - */ -static void -plugin_env_lookup_address (void *cls, - struct GNUNET_TIME_Relative timeout, - const struct GNUNET_PeerIdentity *target, - GNUNET_TRANSPORT_AddressCallback iter, - void *iter_cls) -{ - struct LookupHelloContext *lhc; - - lhc = GNUNET_malloc (sizeof (struct LookupHelloContext)); - lhc->iterator = iter; - lhc->iterator_cls = iter_cls; - GNUNET_PEERINFO_for_all (cfg, - sched, - target, 0, timeout, &lookup_hello_callback, &lhc); -} - - /** * Notify all of our clients about a peer connecting. */ static void notify_clients_connect (const struct GNUNET_PeerIdentity *peer, - struct GNUNET_TIME_Relative latency) + struct GNUNET_TIME_Relative latency, + uint32_t distance) { struct ConnectInfoMessage cim; struct TransportClient *cpos; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Informing clients about peer `%4s' connecting to us\n", - GNUNET_i2s (peer)); + "Notifying clients about connection from `%s'\n", + GNUNET_i2s (peer)); #endif cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60*1000)); + cim.distance = htonl (distance); cim.latency = GNUNET_TIME_relative_hton (latency); memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity)); cpos = clients; @@ -1401,8 +1600,8 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Informing clients about peer `%4s' disconnecting\n", - GNUNET_i2s (peer)); + "Notifying clients about lost connection to `%s'\n", + GNUNET_i2s (peer)); #endif dim.header.size = htons (sizeof (struct DisconnectInfoMessage)); dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); @@ -1418,111 +1617,227 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) /** - * Copy any validated addresses to buf. + * Find a ForeignAddressList entry for the given neighbour + * that matches the given address and transport. + * + * @param neighbour which peer we care about + * @param tname name of the transport plugin + * @param addr binary address + * @param addrlen length of addr + * @return NULL if no such entry exists + */ +static struct ForeignAddressList * +find_peer_address(struct NeighbourList *neighbour, + const char *tname, + const char *addr, + size_t addrlen) +{ + struct ReadyList *head; + struct ForeignAddressList *address_head; + + head = neighbour->plugins; + while (head != NULL) + { + if (0 == strcmp (tname, head->plugin->short_name)) + break; + head = head->next; + } + if (head == NULL) + return NULL; + + address_head = head->addresses; + while ( (address_head != NULL) && + ( (address_head->addrlen != addrlen) || + (memcmp(address_head->addr, addr, addrlen) != 0) ) ) + address_head = address_head->next; + return address_head; +} + + +/** + * Get the peer address struct for the given neighbour and + * address. If it doesn't yet exist, create it. * - * @return 0 once all addresses have been - * returned + * @param neighbour which peer we care about + * @param tname name of the transport plugin + * @param addr binary address + * @param addrlen length of addr + * @return NULL if we do not have a transport plugin for 'tname' + */ +static struct ForeignAddressList * +add_peer_address(struct NeighbourList *neighbour, + const char *tname, + const char *addr, + size_t addrlen) +{ + struct ReadyList *head; + struct ForeignAddressList *ret; + + ret = find_peer_address (neighbour, tname, addr, addrlen); + if (ret != NULL) + return ret; + head = neighbour->plugins; + while (head != NULL) + { + if (0 == strcmp (tname, head->plugin->short_name)) + break; + head = head->next; + } + if (head == NULL) + return NULL; + ret = GNUNET_malloc(sizeof(struct ForeignAddressList) + addrlen); + ret->addr = (const char*) &ret[1]; + memcpy (&ret[1], addr, addrlen); + ret->addrlen = addrlen; + ret->expires = GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + ret->latency = GNUNET_TIME_relative_get_forever(); + ret->distance = -1; + ret->timeout = GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + ret->ready_list = head; + ret->next = head->addresses; + head->addresses = ret; + return ret; +} + + +/** + * Closure for 'add_validated_address'. + */ +struct AddValidatedAddressContext +{ + /** + * Entry that has been validated. + */ + const struct ValidationEntry *ve; + + /** + * Flag set after we have added the address so + * that we terminate the iteration next time. + */ + int done; +}; + + +/** + * Callback function used to fill a buffer of max bytes with a list of + * addresses in the format used by HELLOs. Should use + * "GNUNET_HELLO_add_address" as a helper function. + * + * @param cls the 'struct AddValidatedAddressContext' with the validated address + * @param max maximum number of bytes that can be written to buf + * @param buf where to write the address information + * @return number of bytes written, 0 to signal the + * end of the iteration. */ static size_t -list_validated_addresses (void *cls, size_t max, void *buf) +add_validated_address (void *cls, + size_t max, void *buf) { - struct ValidationAddress **va = cls; - size_t ret; + struct AddValidatedAddressContext *avac = cls; + const struct ValidationEntry *ve = avac->ve; - while ((NULL != *va) && ((*va)->ok != GNUNET_YES)) - *va = (*va)->next; - if (NULL == *va) + if (GNUNET_YES == avac->done) return 0; - ret = GNUNET_HELLO_add_address ((*va)->transport_name, - (*va)->expiration, - &(*va)[1], (*va)->addr_len, buf, max); - *va = (*va)->next; - return ret; + avac->done = GNUNET_YES; + return GNUNET_HELLO_add_address (ve->transport_name, + GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION), + ve->addr, + ve->addrlen, + buf, + max); } /** - * HELLO validation cleanup task. + * Iterator over hash map entries. Checks if the given + * validation entry is for the same challenge as what + * is given in the PONG. + * + * @param cls the 'struct TransportPongMessage*' + * @param key peer identity + * @param value value in the hash map ('struct ValidationEntry') + * @return GNUNET_YES if we should continue to + * iterate (mismatch), GNUNET_NO if not (entry matched) */ -static void -cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +static int +check_pending_validation (void *cls, + const GNUNET_HashCode * key, + void *value) { - struct ValidationAddress *va; - struct ValidationList *pos; - struct ValidationList *prev; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Absolute first; + const struct TransportPongMessage *pong = cls; + struct ValidationEntry *ve = value; + struct AddValidatedAddressContext avac; + unsigned int challenge = ntohl(pong->challenge); struct GNUNET_HELLO_Message *hello; - struct GNUNET_PeerIdentity pid; + struct GNUNET_PeerIdentity target; struct NeighbourList *n; + struct ForeignAddressList *fal; + if (ve->challenge != challenge) + return GNUNET_YES; + #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "HELLO validation cleanup background task running...\n"); -#endif - now = GNUNET_TIME_absolute_get (); - prev = NULL; - pos = pending_validations; - while (pos != NULL) - { - if (pos->timeout.value < now.value) - { - if (prev == NULL) - pending_validations = pos->next; - else - prev->next = pos->next; - va = pos->addresses; - hello = GNUNET_HELLO_create (&pos->publicKey, - &list_validated_addresses, &va); - GNUNET_CRYPTO_hash (&pos->publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &pid.hashPubKey); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating persistent `%s' message for peer `%4s' based on confirmed addresses.\n", - "HELLO", GNUNET_i2s (&pid)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Confirmed validity of address, peer `%4s' has address `%s' (%s).\n", + GNUNET_h2s (key), + GNUNET_a2s ((const struct sockaddr *) ve->addr, + ve->addrlen), + ve->transport_name); #endif - GNUNET_PEERINFO_add_peer (cfg, sched, &pid, hello); - n = find_neighbour (&pid); - if (NULL != n) - try_transmission_to_peer (n); - GNUNET_free (hello); - while (NULL != (va = pos->addresses)) - { - pos->addresses = va->next; - GNUNET_free (va->transport_name); - GNUNET_free (va); - } - GNUNET_free (pos); - if (prev == NULL) - pos = pending_validations; - else - pos = prev->next; - continue; + /* create the updated HELLO */ + GNUNET_CRYPTO_hash (&ve->publicKey, + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &target.hashPubKey); + avac.done = GNUNET_NO; + avac.ve = ve; + hello = GNUNET_HELLO_create (&ve->publicKey, + &add_validated_address, + &avac); + GNUNET_PEERINFO_add_peer (cfg, sched, + &target, + hello); + GNUNET_free (hello); + n = find_neighbour (&target); + if (n != NULL) + { + fal = add_peer_address (n, ve->transport_name, + ve->addr, + ve->addrlen); + fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); + fal->validated = GNUNET_YES; + fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time); + if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value) + n->latency = fal->latency; + else + n->latency.value = (fal->latency.value + n->latency.value) / 2; + n->distance = fal->distance; + if (GNUNET_NO == n->received_pong) + { + notify_clients_connect (&target, n->latency, n->distance); + n->received_pong = GNUNET_YES; } - prev = pos; - pos = pos->next; - } - - /* finally, reschedule cleanup if needed; list is - ordered by timeout, so we need the last element... */ - if (NULL != pending_validations) - { - first = pending_validations->timeout; - pos = pending_validations; - while (pos != NULL) + if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) { - first = GNUNET_TIME_absolute_min (first, pos->timeout); - pos = pos->next; + GNUNET_SCHEDULER_cancel (sched, + n->retry_task); + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); } - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_absolute_get_remaining (first), - &cleanup_validation, NULL); } -} - + /* clean up validation entry */ + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (validation_map, + key, + ve)); + GNUNET_SCHEDULER_cancel (sched, + ve->timeout_task); + GNUNET_free (ve->transport_name); + GNUNET_free (ve); + return GNUNET_NO; +} /** @@ -1534,131 +1849,219 @@ cleanup_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * (otherwise we may be seeing a MiM attack). * * @param cls closure - * @param name name of the transport that generated the address + * @param message the pong message * @param peer who responded to our challenge - * @param challenge the challenge number we presumably used - * @param sender_addr string describing our sender address (as observed - * by the other peer in human-readable format) + * @param sender_address string describing our sender address (as observed + * by the other peer in binary format) + * @param sender_address_len number of bytes in 'sender_address' */ static void -plugin_env_notify_validation (void *cls, - const char *name, - const struct GNUNET_PeerIdentity *peer, - uint32_t challenge, - const char *sender_addr) +handle_pong (void *cls, const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *sender_address, + size_t sender_address_len) { - unsigned int not_done; - int matched; - struct ValidationList *pos; - struct ValidationAddress *va; - struct GNUNET_PeerIdentity id; - - pos = pending_validations; - while (pos != NULL) - { - GNUNET_CRYPTO_hash (&pos->publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &id.hashPubKey); - if (0 == - memcmp (peer, &id, sizeof (struct GNUNET_PeerIdentity))) - break; - pos = pos->next; - } - if (pos == NULL) - { - /* TODO: call statistics (unmatched PONG) */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Received validation response but have no record of any validation request for `%4s'. Ignoring.\n"), - GNUNET_i2s(peer)); - return; - } - not_done = 0; - matched = GNUNET_NO; - va = pos->addresses; - while (va != NULL) - { - if (va->challenge == challenge) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Confirmed validity of address, peer `%4s' has address `%s'.\n", - GNUNET_i2s (peer), - GNUNET_a2s ((const struct sockaddr*) &va[1], - va->addr_len)); +#if DEBUG_TRANSPORT > 1 + /* we get tons of these that just get discarded, only log + if we are quite verbose */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Receiving `%s' message from `%4s'.\n", "PONG", + GNUNET_i2s (peer)); #endif - GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK, - _("Another peer saw us using the address `%s' via `%s'. If this is not plausible, this address should be listed in the configuration as implausible to avoid MiM attacks.\n"), - sender_addr, - name); - va->ok = GNUNET_YES; - va->expiration = - GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION); - matched = GNUNET_YES; - } - if (va->ok != GNUNET_YES) - not_done++; - va = va->next; - } - if (GNUNET_NO == matched) - { - /* TODO: call statistics (unmatched PONG) */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Received `%s' message but have no record of a matching `%s' message. Ignoring.\n"), - "PONG", "PING"); - } - if (0 == not_done) - { -#if DEBUG_TRANSPORT + if (GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap_get_multiple (validation_map, + &peer->hashPubKey, + &check_pending_validation, + (void*) message)) + { + /* This is *expected* to happen a lot since we send + PONGs to *all* known addresses of the sender of + the PING, so most likely we get multiple PONGs + per PING, and all but the first PONG will end up + here. So really we should not print anything here + unless we want to be very, very verbose... */ +#if DEBUG_TRANSPORT > 2 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "All addresses validated, will now construct `%s' for `%4s'.\n", - "HELLO", - GNUNET_i2s (peer)); + "Received `%s' message from `%4s' but have no record of a matching `%s' message. Ignoring.\n", + "PONG", + GNUNET_i2s (peer), + "PING"); #endif - pos->timeout.value = 0; - GNUNET_SCHEDULER_add_with_priority (sched, - GNUNET_SCHEDULER_PRIORITY_IDLE, - &cleanup_validation, NULL); + return; } - else - { + +#if 0 + /* FIXME: add given address to potential pool of our addresses + (for voting) */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK, + _("Another peer saw us using the address `%s' via `%s'.\n"), + GNUNET_a2s ((const struct sockaddr *) &pong[1], + ntohs(pong->addrlen)), + va->transport_name); +#endif +} + + +static void +neighbour_timeout_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourList *n = cls; + #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Still waiting for %u additional `%s' messages before constructing `%s' for `%4s'.\n", - not_done, - "PONG", - "HELLO", - GNUNET_i2s (peer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, + "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id)); #endif + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + disconnect_neighbour (n, GNUNET_NO); +} + + +/** + * Create a fresh entry in our neighbour list for the given peer. + * Will try to transmit our current HELLO to the new neighbour. Also + * notifies our clients about the new "connection". + * + * @param peer the peer for which we create the entry + * @return the new neighbour list entry + */ +static struct NeighbourList * +setup_new_neighbour (const struct GNUNET_PeerIdentity *peer) +{ + struct NeighbourList *n; + struct TransportPlugin *tp; + struct ReadyList *rl; + + GNUNET_assert (our_hello != NULL); + n = GNUNET_malloc (sizeof (struct NeighbourList)); + n->next = neighbours; + neighbours = n; + n->id = *peer; + n->last_quota_update = GNUNET_TIME_absolute_get (); + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); + tp = plugins; + while (tp != NULL) + { + if (tp->api->send != NULL) + { + rl = GNUNET_malloc (sizeof (struct ReadyList)); + rl->next = n->plugins; + n->plugins = rl; + rl->plugin = tp; + rl->addresses = NULL; + } + tp = tp->next; } + n->latency = GNUNET_TIME_UNIT_FOREVER_REL; + n->distance = -1; + n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + transmit_to_peer (NULL, NULL, 0, + HELLO_ADDRESS_EXPIRATION, + (const char *) our_hello, GNUNET_HELLO_size(our_hello), + GNUNET_NO, n); + return n; } -struct CheckHelloValidatedContext +/** + * Closure for 'check_address_exists'. + */ +struct CheckAddressExistsClosure { /** - * Plugin for which we are validating. + * Address to check for. */ - struct TransportPlugin *plugin; + const void *addr; /** - * Hello that we are validating. + * Name of the transport. */ - struct GNUNET_HELLO_Message *hello; + const char *tname; /** - * Validation list being build. + * Length of addr. */ - struct ValidationList *e; + size_t addrlen; + /** + * Set to GNUNET_YES if the address exists. + */ + int exists; }; /** - * Append the given address to the list of entries - * that need to be validated. + * Iterator over hash map entries. Checks if the given + * validation entry is for the same address as what is given + * in the closure. + * + * @param cls the 'struct CheckAddressExistsClosure*' + * @param key current key code (ignored) + * @param value value in the hash map ('struct ValidationEntry') + * @return GNUNET_YES if we should continue to + * iterate (mismatch), GNUNET_NO if not (entry matched) + */ +static int +check_address_exists (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct CheckAddressExistsClosure *caec = cls; + struct ValidationEntry *ve = value; + if ( (0 == strcmp (caec->tname, + ve->transport_name)) && + (caec->addrlen == ve->addrlen) && + (0 == memcmp (caec->addr, + ve->addr, + caec->addrlen)) ) + { + caec->exists = GNUNET_YES; + return GNUNET_NO; + } + return GNUNET_YES; +} + + +/** + * HELLO validation cleanup task (validation failed). + * + * @param cls the 'struct ValidationEntry' that failed + * @param tc scheduler context (unused) + */ +static void +timeout_hello_validation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct ValidationEntry *va = cls; + struct GNUNET_PeerIdentity pid; + + GNUNET_CRYPTO_hash (&va->publicKey, + sizeof (struct + GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &pid.hashPubKey); + GNUNET_CONTAINER_multihashmap_remove (validation_map, + &pid.hashPubKey, + va); + GNUNET_free (va->transport_name); + GNUNET_free (va); +} + + +/** + * Check if the given address is already being validated; if not, + * append the given address to the list of entries that are being be + * validated and initiate validation. + * + * @param cls closure ('struct CheckHelloValidatedContext *') + * @param tname name of the transport + * @param expiration expiration time + * @param addr the address + * @param addrlen length of the address + * @return GNUNET_OK (always) */ static int run_validation (void *cls, @@ -1666,47 +2069,159 @@ run_validation (void *cls, struct GNUNET_TIME_Absolute expiration, const void *addr, size_t addrlen) { - struct ValidationList *e = cls; - struct TransportPlugin *tp; - struct ValidationAddress *va; + struct CheckHelloValidatedContext *chvc = cls; struct GNUNET_PeerIdentity id; + struct TransportPlugin *tp; + struct ValidationEntry *va; + struct NeighbourList *neighbour; + struct ForeignAddressList *peer_address; + struct TransportPingMessage ping; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk; + struct CheckAddressExistsClosure caec; + char * message_buf; + uint16_t hello_size; + size_t tsize; tp = find_transport (tname); if (tp == NULL) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO | - GNUNET_ERROR_TYPE_BULK, - _ - ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"), - tname); - return GNUNET_OK; + GNUNET_log (GNUNET_ERROR_TYPE_INFO | + GNUNET_ERROR_TYPE_BULK, + _ + ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"), + tname); + return GNUNET_OK; + } + GNUNET_HELLO_get_key (chvc->hello, &pk); + GNUNET_CRYPTO_hash (&pk, + sizeof (struct + GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &id.hashPubKey); + caec.addr = addr; + caec.addrlen = addrlen; + caec.tname = tname; + caec.exists = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_iterate (validation_map, + &check_address_exists, + &caec); + if (caec.exists == GNUNET_YES) + { + /* During validation attempts we will likely trigger the other + peer trying to validate our address which in turn will cause + it to send us its HELLO, so we expect to hit this case rather + frequently. Only print something if we are very verbose. */ +#if DEBUG_TRANSPORT > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Validation of address `%s' via `%s' for peer `%4s' already in progress.\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&id)); +#endif + return GNUNET_OK; + } + va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen); + va->transport_name = GNUNET_strdup (tname); + va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + (unsigned int) -1); + va->send_time = GNUNET_TIME_absolute_get(); + va->addr = (const void*) &va[1]; + memcpy (&va[1], addr, addrlen); + va->addrlen = addrlen; + GNUNET_HELLO_get_key (chvc->hello, + &va->publicKey); + va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, + HELLO_VERIFICATION_TIMEOUT, + &timeout_hello_validation, + va); + GNUNET_CONTAINER_multihashmap_put (validation_map, + &id.hashPubKey, + va, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + neighbour = find_neighbour(&id); + if (neighbour == NULL) + neighbour = setup_new_neighbour(&id); + peer_address = add_peer_address(neighbour, tname, addr, addrlen); + GNUNET_assert(peer_address != NULL); + hello_size = GNUNET_HELLO_size(our_hello); + tsize = sizeof(struct TransportPingMessage) + hello_size; + message_buf = GNUNET_malloc(tsize); + ping.challenge = htonl(va->challenge); + ping.header.size = htons(sizeof(struct TransportPingMessage)); + ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING); + memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity)); + memcpy(message_buf, our_hello, hello_size); + memcpy(&message_buf[hello_size], + &ping, + sizeof(struct TransportPingMessage)); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Performing validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&id), + "HELLO", hello_size, + "PING", sizeof (struct TransportPingMessage)); +#endif + transmit_to_peer (NULL, peer_address, + GNUNET_SCHEDULER_PRIORITY_DEFAULT, + HELLO_VERIFICATION_TIMEOUT, + message_buf, tsize, + GNUNET_YES, neighbour); + GNUNET_free(message_buf); + return GNUNET_OK; +} + + +/** + * Add the given address to the list of foreign addresses + * available for the given peer (check for duplicates). + * + * @param cls the respective 'struct NeighbourList' to update + * @param tname name of the transport + * @param expiration expiration time + * @param addr the address + * @param addrlen length of the address + * @return GNUNET_OK (always) + */ +static int +add_to_foreign_address_list (void *cls, + const char *tname, + struct GNUNET_TIME_Absolute expiration, + const void *addr, size_t addrlen) +{ + struct NeighbourList *n = cls; + struct ForeignAddressList *fal; + + fal = find_peer_address (n, tname, addr, addrlen); + if (fal == NULL) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n", + GNUNET_a2s (addr, addrlen), + tname, + GNUNET_i2s (&n->id), + expiration.value); +#endif + fal = add_peer_address (n, tname, addr, addrlen); } - GNUNET_CRYPTO_hash (&e->publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &id.hashPubKey); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling validation of address `%s' via `%s' for `%4s'\n", - GNUNET_a2s(addr, addrlen), - tname, - GNUNET_i2s(&id)); - - va = GNUNET_malloc (sizeof (struct ValidationAddress) + addrlen); - va->next = e->addresses; - e->addresses = va; - va->transport_name = GNUNET_strdup (tname); - va->addr_len = addrlen; - va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - (unsigned int) -1); - memcpy (&va[1], addr, addrlen); + if (fal == NULL) + return GNUNET_OK; + fal->expires = GNUNET_TIME_absolute_max (expiration, + fal->expires); + fal->validated = GNUNET_YES; return GNUNET_OK; } /** * Check if addresses in validated hello "h" overlap with - * those in "chvc->hello" and update "chvc->hello" accordingly, - * removing those addresses that have already been validated. + * those in "chvc->hello" and validate the rest. + * + * @param cls closure + * @param peer id of the peer, NULL for last call + * @param h hello message for the peer (can be NULL) + * @param trust amount of trust we have in the peer (not used) */ static void check_hello_validated (void *cls, @@ -1715,79 +2230,63 @@ check_hello_validated (void *cls, uint32_t trust) { struct CheckHelloValidatedContext *chvc = cls; - struct ValidationAddress *va; - struct TransportPlugin *tp; - int first_call; - struct GNUNET_PeerIdentity apeer; - - first_call = GNUNET_NO; - if (chvc->e == NULL) - { - first_call = GNUNET_YES; - chvc->e = GNUNET_malloc (sizeof (struct ValidationList)); - GNUNET_assert (GNUNET_OK == - GNUNET_HELLO_get_key (h != NULL ? h : chvc->hello, - &chvc->e->publicKey)); - chvc->e->timeout = - GNUNET_TIME_relative_to_absolute (HELLO_VERIFICATION_TIMEOUT); - chvc->e->next = pending_validations; - pending_validations = chvc->e; - } - if (h != NULL) - { - GNUNET_HELLO_iterate_new_addresses (chvc->hello, - h, - GNUNET_TIME_absolute_get (), - &run_validation, chvc->e); - } - else if (GNUNET_YES == first_call) - { - /* no existing HELLO, all addresses are new */ - GNUNET_HELLO_iterate_addresses (chvc->hello, - GNUNET_NO, &run_validation, chvc->e); - } - if (h != NULL) - return; /* wait for next call */ - /* finally, transmit validation attempts */ - GNUNET_assert (GNUNET_OK == - GNUNET_HELLO_get_id (chvc->hello, - &apeer)); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ready to validate addresses from `%s' message for peer `%4s'\n", - "HELLO", GNUNET_i2s (&apeer)); -#endif - va = chvc->e->addresses; - while (va != NULL) + struct GNUNET_HELLO_Message *plain_hello; + struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk; + struct GNUNET_PeerIdentity target; + struct NeighbourList *n; + + if (peer == NULL) { + chvc->piter = NULL; + GNUNET_CONTAINER_DLL_remove (chvc_head, + chvc_tail, + chvc); + if (GNUNET_NO == chvc->hello_known) + { + /* notify PEERINFO about the peer now, so that we at least + have the public key if some other component needs it */ + GNUNET_HELLO_get_key (chvc->hello, &pk); + GNUNET_CRYPTO_hash (&pk, + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), + &target.hashPubKey); + plain_hello = GNUNET_HELLO_create (&pk, + NULL, + NULL); + GNUNET_PEERINFO_add_peer (cfg, sched, &target, plain_hello); + GNUNET_free (plain_hello); #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Establishing `%s' connection to validate `%s' address `%s' of `%4s'\n", - va->transport_name, - "HELLO", - GNUNET_a2s ((const struct sockaddr*) &va[1], - va->addr_len), - GNUNET_i2s (&apeer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peerinfo had no `%s' message for peer `%4s', full validation needed.\n", + "HELLO", + GNUNET_i2s (&target)); #endif - tp = find_transport (va->transport_name); - GNUNET_assert (tp != NULL); - if (GNUNET_OK != - tp->api->validate (tp->api->cls, - &apeer, - va->challenge, - HELLO_VERIFICATION_TIMEOUT, - &va[1], - va->addr_len)) - va->ok = GNUNET_SYSERR; - va = va->next; + GNUNET_HELLO_iterate_addresses (chvc->hello, + GNUNET_NO, + &run_validation, + chvc); + } + GNUNET_free (chvc); + return; } - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_TIME_absolute_get_remaining (chvc->e->timeout), - &cleanup_validation, NULL); - GNUNET_free (chvc); + if (h == NULL) + return; + chvc->hello_known = GNUNET_YES; + n = find_neighbour (peer); + if (n != NULL) + { + GNUNET_HELLO_iterate_addresses (h, + GNUNET_NO, + &add_to_foreign_address_list, + n); + try_transmission_to_peer (n); + } + GNUNET_HELLO_iterate_new_addresses (chvc->hello, + h, + GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME), + &run_validation, + chvc); } - /** * Process HELLO-message. * @@ -1799,7 +2298,6 @@ static int process_hello (struct TransportPlugin *plugin, const struct GNUNET_MessageHeader *message) { - struct ValidationList *e; uint16_t hsize; struct GNUNET_PeerIdentity target; const struct GNUNET_HELLO_Message *hello; @@ -1814,7 +2312,8 @@ process_hello (struct TransportPlugin *plugin, return GNUNET_SYSERR; } /* first, check if load is too high */ - if (GNUNET_OS_load_cpu_get (cfg) > 100) + if (GNUNET_SCHEDULER_get_load (sched, + GNUNET_SCHEDULER_PRIORITY_BACKGROUND) > MAX_HELLO_LOAD) { /* TODO: call to stats? */ return GNUNET_OK; @@ -1830,40 +2329,26 @@ process_hello (struct TransportPlugin *plugin, &target.hashPubKey); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Processing `%s' message for `%4s'\n", - "HELLO", GNUNET_i2s (&target)); + "Processing `%s' message for `%4s' of size %u\n", + "HELLO", + GNUNET_i2s (&target), + GNUNET_HELLO_size(hello)); #endif - /* check if a HELLO for this peer is already on the validation list */ - e = pending_validations; - while (e != NULL) - { - if (0 == memcmp (&e->publicKey, - &publicKey, - sizeof (struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded))) - { - /* TODO: call to stats? */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s' message for peer `%4s' is already pending; ignoring new message\n", - "HELLO", GNUNET_i2s (&target)); -#endif - return GNUNET_OK; - } - e = e->next; - } + chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize); - chvc->plugin = plugin; - chvc->hello = (struct GNUNET_HELLO_Message *) &chvc[1]; - memcpy (chvc->hello, hello, hsize); + chvc->hello = (const struct GNUNET_HELLO_Message *) &chvc[1]; + memcpy (&chvc[1], hello, hsize); + GNUNET_CONTAINER_DLL_insert (chvc_head, + chvc_tail, + chvc); /* finally, check if HELLO was previously validated (continuation will then schedule actual validation) */ - GNUNET_PEERINFO_for_all (cfg, - sched, - &target, - 0, - HELLO_VERIFICATION_TIMEOUT, - &check_hello_validated, chvc); + chvc->piter = GNUNET_PEERINFO_iterate (cfg, + sched, + &target, + 0, + HELLO_VERIFICATION_TIMEOUT, + &check_hello_validated, chvc); return GNUNET_OK; } @@ -1872,7 +2357,7 @@ process_hello (struct TransportPlugin *plugin, * The peer specified by the given neighbour has timed-out or a plugin * has disconnected. We may either need to do nothing (other plugins * still up), or trigger a full disconnect and clean up. This - * function updates our state and do the necessary notifications. + * function updates our state and does the necessary notifications. * Also notifies our clients that the neighbour is now officially * gone. * @@ -1882,29 +2367,34 @@ process_hello (struct TransportPlugin *plugin, * disconnect? */ static void -disconnect_neighbour (struct NeighbourList *n, - int check) +disconnect_neighbour (struct NeighbourList *n, int check) { struct ReadyList *rpos; struct NeighbourList *npos; struct NeighbourList *nprev; struct MessageQueue *mq; - + struct ForeignAddressList *peer_addresses; + struct ForeignAddressList *peer_pos; + if (GNUNET_YES == check) { rpos = n->plugins; while (NULL != rpos) - { - if (GNUNET_YES == rpos->connected) - return; /* still connected */ - rpos = rpos->next; - } + { + peer_addresses = rpos->addresses; + while (peer_addresses != NULL) + { + if (GNUNET_YES == peer_addresses->connected) + return; /* still connected */ + peer_addresses = peer_addresses->next; + } + rpos = rpos->next; + } } - #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Disconnecting from `%4s'\n", - GNUNET_i2s(&n->id)); + GNUNET_i2s (&n->id)); #endif /* remove n from neighbours list */ nprev = NULL; @@ -1921,119 +2411,178 @@ disconnect_neighbour (struct NeighbourList *n, nprev->next = n->next; /* notify all clients about disconnect */ - notify_clients_disconnect (&n->id); + if (GNUNET_YES == n->received_pong) + notify_clients_disconnect (&n->id); /* clean up all plugins, cancel connections and pending transmissions */ while (NULL != (rpos = n->plugins)) { n->plugins = rpos->next; - GNUNET_assert (rpos->neighbour == n); - if (GNUNET_YES == rpos->connected) - rpos->plugin->api->cancel (rpos->plugin->api->cls, - rpos, - &n->id); + rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id); + + while (rpos->addresses != NULL) + { + peer_pos = rpos->addresses; + rpos->addresses = peer_pos->next; + GNUNET_free(peer_pos); + } GNUNET_free (rpos); } /* free all messages on the queue */ - while (NULL != (mq = n->messages)) - { - n->messages = mq->next; - GNUNET_assert (mq->neighbour == n); + while (NULL != (mq = n->messages_head)) + { + GNUNET_CONTAINER_DLL_remove (n->messages_head, + n->messages_tail, + mq); + GNUNET_assert (0 == memcmp(&mq->neighbour_id, + &n->id, + sizeof(struct GNUNET_PeerIdentity))); GNUNET_free (mq); } if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (sched, - n->timeout_task); + { + GNUNET_SCHEDULER_cancel (sched, n->timeout_task); + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, n->retry_task); + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + } /* finally, free n itself */ GNUNET_free (n); } /** - * Add an entry for each of our transport plugins - * (that are able to send) to the list of plugins - * for this neighbour. + * We have received a PING message from someone. Need to send a PONG message + * in response to the peer by any means necessary. * - * @param neighbour to initialize - */ -static void -add_plugins (struct NeighbourList *neighbour) + * FIXME: With something like TCP where a connection exists, we may + * want to send it that way. But the current API does not seem to + * allow us to do so (can't tell this to the transport!) + */ +static int +handle_ping(void *cls, const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const char *sender_address, + size_t sender_address_len) { - struct TransportPlugin *tp; + struct TransportPlugin *plugin = cls; + struct TransportPingMessage *ping; + struct TransportPongMessage *pong; + uint16_t msize; + struct NeighbourList *n; struct ReadyList *rl; + struct ForeignAddressList *fal; - neighbour->retry_plugins_time - = GNUNET_TIME_relative_to_absolute (PLUGIN_RETRY_FREQUENCY); - tp = plugins; - while (tp != NULL) + msize = ntohs (message->size); + if (msize < sizeof (struct TransportPingMessage)) { - if (tp->api->send != NULL) - { - rl = GNUNET_malloc (sizeof (struct ReadyList)); - rl->next = neighbour->plugins; - neighbour->plugins = rl; - rl->plugin = tp; - rl->neighbour = neighbour; - rl->transmit_ready = GNUNET_YES; - } - tp = tp->next; + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + ping = (struct TransportPingMessage *) message; + if (0 != memcmp (&ping->target, + plugin->env.my_identity, + sizeof (struct GNUNET_PeerIdentity))) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Received `%s' message not destined for me!\n"), + "PING"); + return GNUNET_SYSERR; } -} - - -static void -neighbour_timeout_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NeighbourList *n = cls; - #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Neighbour `%4s' has timed out!\n", - GNUNET_i2s(&n->id)); + "Processing `%s' from `%s'\n", + "PING", + GNUNET_a2s ((const struct sockaddr *)sender_address, + sender_address_len)); #endif - n->timeout_task = GNUNET_SCHEDULER_NO_TASK; - disconnect_neighbour (n, GNUNET_NO); + msize -= sizeof (struct TransportPingMessage); + pong = GNUNET_malloc (sizeof (struct TransportPongMessage) + sender_address_len); + pong->header.size = htons (sizeof (struct TransportPongMessage) + sender_address_len); + pong->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_PONG); + pong->purpose.size = + htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + + sizeof (uint32_t) + + sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + sender_address_len); + pong->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_TCP_PING); + pong->challenge = ping->challenge; + pong->addrlen = htons(sender_address_len); + memcpy(&pong->signer, + &my_public_key, + sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded)); + memcpy (&pong[1], sender_address, sender_address_len); + GNUNET_assert (GNUNET_OK == + GNUNET_CRYPTO_rsa_sign (my_private_key, + &pong->purpose, &pong->signature)); + + n = find_neighbour(peer); + if (n == NULL) + n = setup_new_neighbour(peer); + /* broadcast 'PONG' to all available addresses */ + rl = n->plugins; + while (rl != NULL) + { + fal = rl->addresses; + while (fal != NULL) + { + transmit_to_peer(NULL, fal, + TRANSPORT_PONG_PRIORITY, + HELLO_VERIFICATION_TIMEOUT, + (const char *)pong, + ntohs(pong->header.size), + GNUNET_YES, + n); + fal = fal->next; + } + rl = rl->next; + } + GNUNET_free(pong); + return GNUNET_OK; } /** - * Create a fresh entry in our neighbour list for the given peer. - * Will try to transmit our current HELLO to the new neighbour. Also - * notifies our clients about the new "connection". + * Calculate how long we should delay reading from the TCP socket to + * ensure that we stay within our bandwidth limits (push back). * - * @param peer the peer for which we create the entry - * @return the new neighbour list entry + * @param n for which neighbour should this be calculated + * @return how long to delay receiving more data */ -static struct NeighbourList * -setup_new_neighbour (const struct GNUNET_PeerIdentity *peer) +static struct GNUNET_TIME_Relative +calculate_throttle_delay (struct NeighbourList *n) { - struct NeighbourList *n; + struct GNUNET_TIME_Relative ret; + struct GNUNET_TIME_Absolute now; + uint64_t del; + uint64_t avail; + uint64_t excess; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Setting up new neighbour `%4s', sending our HELLO to introduce ourselves\n", - GNUNET_i2s (peer)); -#endif - GNUNET_assert (our_hello != NULL); - n = GNUNET_malloc (sizeof (struct NeighbourList)); - n->next = neighbours; - neighbours = n; - n->id = *peer; - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->peer_timeout = - GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); - add_plugins (n); - n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); - transmit_to_peer (NULL, 0, - (const struct GNUNET_MessageHeader *) our_hello, - GNUNET_YES, n); - notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL); - return n; + now = GNUNET_TIME_absolute_get (); + del = now.value - n->last_quota_update.value; + if (del > MAX_BANDWIDTH_CARRY) + { + update_quota (n, GNUNET_YES); + del = now.value - n->last_quota_update.value; + GNUNET_assert (del <= MAX_BANDWIDTH_CARRY); + } + if (n->quota_in == 0) + n->quota_in = 1; /* avoid divison by zero */ + avail = del * n->quota_in; + if (avail > n->last_received) + return GNUNET_TIME_UNIT_ZERO; /* can receive right now */ + excess = n->last_received - avail; + ret.value = excess / n->quota_in; + if (ret.value > 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n", + (unsigned long long) excess, + (unsigned long long) n->quota_in, + (unsigned long long) ret.value); + return ret; } @@ -2044,152 +2593,112 @@ setup_new_neighbour (const struct GNUNET_PeerIdentity *peer) * and generally forward to our receive callback. * * @param cls the "struct TransportPlugin *" we gave to the plugin - * @param service_context value passed to the transport-service - * to identify the neighbour; will be NULL on the first - * call for a given peer - * @param latency estimated latency for communicating with the - * given peer * @param peer (claimed) identity of the other peer - * @param message the message, NULL if peer was disconnected - * @return the new service_context that the plugin should use - * for future receive calls for messages from this - * particular peer - */ -static struct ReadyList * -plugin_env_receive (void *cls, - struct ReadyList *service_context, - struct GNUNET_TIME_Relative latency, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message) + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again + * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) + * @param sender_address binary address of the sender (if observed) + * @param sender_address_len number of bytes in sender_address + * @return how long the plugin should wait until receiving more data + * (plugins that do not support this, can ignore the return value) + */ +static struct GNUNET_TIME_Relative +plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + unsigned int distance, const char *sender_address, + size_t sender_address_len) { - const struct GNUNET_MessageHeader ack = { - htons (sizeof (struct GNUNET_MessageHeader)), - htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK) - }; + struct ReadyList *service_context; struct TransportPlugin *plugin = cls; struct TransportClient *cpos; struct InboundMessage *im; + struct ForeignAddressList *peer_address; uint16_t msize; struct NeighbourList *n; - if (service_context != NULL) - { - n = service_context->neighbour; - GNUNET_assert (n != NULL); - } - else - { - n = find_neighbour (peer); - if (n == NULL) - { - if (message == NULL) - return NULL; /* disconnect of peer already marked down */ - n = setup_new_neighbour (peer); - } - service_context = n->plugins; - while ((service_context != NULL) && (plugin != service_context->plugin)) - service_context = service_context->next; - GNUNET_assert ((plugin->api->send == NULL) || - (service_context != NULL)); - } - if (message == NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Receive failed from `%4s', triggering disconnect\n", - GNUNET_i2s(&n->id)); -#endif - /* TODO: call stats */ - if (service_context != NULL) - service_context->connected = GNUNET_NO; - disconnect_neighbour (n, GNUNET_YES); - return NULL; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Processing message of type `%u' received by plugin...\n", - ntohs (message->type)); -#endif - if (service_context != NULL) - { - if (service_context->connected == GNUNET_NO) - { - service_context->connected = GNUNET_YES; - service_context->transmit_ready = GNUNET_YES; - service_context->connect_attempts++; - } - service_context->timeout - = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - service_context->latency = latency; - } - /* update traffic received amount ... */ - msize = ntohs (message->size); - n->last_received += msize; - GNUNET_SCHEDULER_cancel (sched, n->timeout_task); - n->peer_timeout = - GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); - update_quota (n); - if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) - { - /* dropping message due to frequent inbound volume violations! */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING | - GNUNET_ERROR_TYPE_BULK, - _ - ("Dropping incoming message due to repeated bandwidth quota violations.\n")); - /* TODO: call stats */ - GNUNET_assert ( (service_context == NULL) || - (NULL != service_context->neighbour) ); - return service_context; - } - switch (ntohs (message->type)) - { - case GNUNET_MESSAGE_TYPE_HELLO: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving `%s' message from `%4s'.\n", "HELLO", - GNUNET_i2s(peer)); -#endif - process_hello (plugin, message); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending `%s' message to connecting peer `%4s'.\n", "ACK", - GNUNET_i2s(peer)); -#endif - transmit_to_peer (NULL, 0, &ack, GNUNET_YES, n); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK: - n->saw_ack = GNUNET_YES; - /* intentional fall-through! */ - default: + n = find_neighbour (peer); + if (n == NULL) + n = setup_new_neighbour (peer); + update_quota (n, GNUNET_NO); + service_context = n->plugins; + while ((service_context != NULL) && (plugin != service_context->plugin)) + service_context = service_context->next; + GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL)); + if (message != NULL) + { + peer_address = add_peer_address(n, + plugin->short_name, + sender_address, + sender_address_len); + if (peer_address != NULL) + { + peer_address->distance = distance; + if (peer_address->connected == GNUNET_NO) + { + peer_address->connected = GNUNET_YES; + peer_address->connect_attempts++; + } + peer_address->timeout + = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + } + /* update traffic received amount ... */ + msize = ntohs (message->size); + n->distance = distance; + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_SCHEDULER_cancel (sched, + n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) + { + /* dropping message due to frequent inbound volume violations! */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING | + GNUNET_ERROR_TYPE_BULK, + _ + ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), + n->quota_violation_count); + return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */ + } + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_HELLO: + process_hello (plugin, message); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: + handle_ping(plugin, message, peer, sender_address, sender_address_len); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: + handle_pong(plugin, message, peer, sender_address, sender_address_len); + break; + default: #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from `%4s', sending to all clients.\n", - ntohs (message->type), - GNUNET_i2s(peer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s', sending to all clients.\n", + ntohs (message->type), GNUNET_i2s (peer)); #endif - /* transmit message to all clients */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); - im->header.size = htons (sizeof (struct InboundMessage) + msize); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (latency); - im->peer = *peer; - memcpy (&im[1], message, msize); - - cpos = clients; - while (cpos != NULL) - { - transmit_to_client (cpos, &im->header, GNUNET_YES); - cpos = cpos->next; - } - GNUNET_free (im); - } - GNUNET_assert ( (service_context == NULL) || - (NULL != service_context->neighbour) ); - return service_context; + /* transmit message to all clients */ + im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); + im->header.size = htons (sizeof (struct InboundMessage) + msize); + im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + im->latency = GNUNET_TIME_relative_hton (n->latency); + im->peer = *peer; + memcpy (&im[1], message, msize); + cpos = clients; + while (cpos != NULL) + { + transmit_to_client (cpos, &im->header, GNUNET_YES); + cpos = cpos->next; + } + GNUNET_free (im); + } + } + return calculate_throttle_delay (n); } @@ -2209,8 +2718,6 @@ handle_start (void *cls, struct TransportClient *c; struct ConnectInfoMessage cim; struct NeighbourList *n; - struct InboundMessage *im; - struct GNUNET_MessageHeader *ack; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2236,8 +2743,7 @@ handle_start (void *cls, { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending our own `%s' to new client\n", - "HELLO"); + "Sending our own `%s' to new client\n", "HELLO"); #endif transmit_to_client (c, (const struct GNUNET_MessageHeader *) our_hello, @@ -2245,28 +2751,18 @@ handle_start (void *cls, /* tell new client about all existing connections */ cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); - cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.size = htons (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ - ack = (struct GNUNET_MessageHeader *) &im[1]; - ack->size = htons (sizeof (struct GNUNET_MessageHeader)); - ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK); - for (n = neighbours; n != NULL; n = n->next) - { - cim.id = n->id; - transmit_to_client (c, &cim.header, GNUNET_NO); - if (n->saw_ack) - { - im->peer = n->id; - transmit_to_client (c, &im->header, GNUNET_NO); + n = neighbours; + while (n != NULL) + { + if (GNUNET_YES == n->received_pong) + { + cim.id = n->id; + cim.latency = GNUNET_TIME_relative_hton (n->latency); + cim.distance = htonl (n->distance); + transmit_to_client (c, &cim.header, GNUNET_NO); } + n = n->next; } - GNUNET_free (im); } GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2286,10 +2782,6 @@ handle_hello (void *cls, { int ret; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client\n", "HELLO"); -#endif ret = process_hello (NULL, message); GNUNET_SERVER_receive_done (client, ret); } @@ -2349,7 +2841,10 @@ handle_send (void *cls, ntohs (obmm->size), ntohs (obmm->type), GNUNET_i2s (&obm->peer)); #endif - transmit_to_peer (tc, ntohl(obm->priority), obmm, GNUNET_NO, n); + transmit_to_peer (tc, NULL, ntohl (obm->priority), + GNUNET_TIME_relative_ntoh (obm->timeout), + (char *)obmm, + ntohs (obmm->size), GNUNET_NO, n); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2369,85 +2864,44 @@ handle_set_quota (void *cls, const struct QuotaSetMessage *qsm = (const struct QuotaSetMessage *) message; struct NeighbourList *n; - struct TransportPlugin *p; - struct ReadyList *rl; + uint32_t qin; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client for peer `%4s'\n", - "SET_QUOTA", GNUNET_i2s (&qsm->peer)); -#endif n = find_neighbour (&qsm->peer); if (n == NULL) { GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } - update_quota (n); - if (n->quota_in < ntohl (qsm->quota_in)) + qin = ntohl (qsm->quota_in); +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n", + "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer)); +#endif + update_quota (n, GNUNET_YES); + if (n->quota_in < qin) n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_in = ntohl (qsm->quota_in); - rl = n->plugins; - while (rl != NULL) - { - p = rl->plugin; - p->api->set_receive_quota (p->api->cls, - &qsm->peer, ntohl (qsm->quota_in)); - rl = rl->next; - } + n->quota_in = qin; GNUNET_SERVER_receive_done (client, GNUNET_OK); } -/** - * Handle TRY_CONNECT-message. - * - * @param cls closure (always NULL) - * @param client identification of the client - * @param message the actual message - */ static void -handle_try_connect (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +transmit_address_to_client (void *cls, const char *address) { - const struct TryConnectMessage *tcm; + struct GNUNET_SERVER_TransmitContext *tc = cls; + size_t slen; - tcm = (const struct TryConnectMessage *) message; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client %p asking to connect to `%4s'\n", - "TRY_CONNECT", - client, - GNUNET_i2s (&tcm->peer)); -#endif - if (NULL == find_neighbour (&tcm->peer)) - setup_new_neighbour (&tcm->peer); -#if DEBUG_TRANSPORT + if (NULL == address) + slen = 0; else - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Client asked to connect to `%4s', but connection already exists\n", - "TRY_CONNECT", - GNUNET_i2s (&tcm->peer)); -#endif - GNUNET_SERVER_receive_done (client, GNUNET_OK); + slen = strlen (address) + 1; + GNUNET_SERVER_transmit_context_append_data (tc, address, slen, + GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY); + if (NULL == address) + GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); } -static void -transmit_address_to_client (void *cls, const char *address) -{ - struct GNUNET_SERVER_TransmitContext *tc = cls; - size_t slen; - - if (NULL == address) - slen = 0; - else - slen = strlen (address) + 1; - GNUNET_SERVER_transmit_context_append (tc, address, slen, - GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY); - if (NULL == address) - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); -} /** * Handle AddressLookup-message. @@ -2458,52 +2912,58 @@ transmit_address_to_client (void *cls, const char *address) */ static void handle_address_lookup (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { - const struct AddressLookupMessage *alum; - struct TransportPlugin *lsPlugin; - const char *nameTransport; - const char *address; - uint16_t size; - struct GNUNET_SERVER_TransmitContext *tc; - - size = ntohs (message->size); - if (size < sizeof(struct AddressLookupMessage)) - { - GNUNET_break_op (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - alum = (const struct AddressLookupMessage *) message; - uint32_t addressLen = ntohl(alum->addrlen); - if (size <= sizeof(struct AddressLookupMessage) + addressLen) - { - GNUNET_break_op (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - address = (const char *)&alum[1]; - nameTransport = (const char*)&address[addressLen]; - if (nameTransport [size - sizeof (struct AddressLookupMessage) - addressLen -1] != '\0') - { - GNUNET_break_op (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - struct GNUNET_TIME_Absolute timeout= GNUNET_TIME_absolute_ntoh(alum->timeout); - struct GNUNET_TIME_Relative rtimeout = GNUNET_TIME_absolute_get_remaining(timeout); - lsPlugin = find_transport(nameTransport); - if (NULL == lsPlugin) - { - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_SERVER_transmit_context_append (tc, NULL, 0, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY); - GNUNET_SERVER_transmit_context_run (tc, rtimeout); - return; - } - tc = GNUNET_SERVER_transmit_context_create (client); - lsPlugin->api->address_pretty_printer(cls, nameTransport, - address, addressLen, GNUNET_YES, rtimeout, &transmit_address_to_client, tc); + const struct AddressLookupMessage *alum; + struct TransportPlugin *lsPlugin; + const char *nameTransport; + const char *address; + uint16_t size; + struct GNUNET_SERVER_TransmitContext *tc; + + size = ntohs (message->size); + if (size < sizeof (struct AddressLookupMessage)) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + alum = (const struct AddressLookupMessage *) message; + uint32_t addressLen = ntohl (alum->addrlen); + if (size <= sizeof (struct AddressLookupMessage) + addressLen) + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + address = (const char *) &alum[1]; + nameTransport = (const char *) &address[addressLen]; + if (nameTransport + [size - sizeof (struct AddressLookupMessage) - addressLen - 1] != '\0') + { + GNUNET_break_op (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + struct GNUNET_TIME_Absolute timeout = + GNUNET_TIME_absolute_ntoh (alum->timeout); + struct GNUNET_TIME_Relative rtimeout = + GNUNET_TIME_absolute_get_remaining (timeout); + lsPlugin = find_transport (nameTransport); + if (NULL == lsPlugin) + { + tc = GNUNET_SERVER_transmit_context_create (client); + GNUNET_SERVER_transmit_context_append_data (tc, NULL, 0, + GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY); + GNUNET_SERVER_transmit_context_run (tc, rtimeout); + return; + } + tc = GNUNET_SERVER_transmit_context_create (client); + lsPlugin->api->address_pretty_printer (cls, nameTransport, + address, addressLen, GNUNET_YES, + rtimeout, + &transmit_address_to_client, tc); } /** @@ -2519,12 +2979,9 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0}, {&handle_set_quota, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)}, - {&handle_try_connect, NULL, - GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT, - sizeof (struct TryConnectMessage)}, {&handle_address_lookup, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP, - 0 }, + 0}, {NULL, NULL, 0, 0} }; @@ -2537,15 +2994,10 @@ create_environment (struct TransportPlugin *plug) { plug->env.cfg = cfg; plug->env.sched = sched; - plug->env.my_public_key = &my_public_key; - plug->env.my_private_key = my_private_key; plug->env.my_identity = &my_identity; plug->env.cls = plug; plug->env.receive = &plugin_env_receive; - plug->env.lookup = &plugin_env_lookup_address; plug->env.notify_address = &plugin_env_notify_address; - plug->env.notify_validation = &plugin_env_notify_validation; - plug->env.default_quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); plug->env.max_connections = max_connect_per_transport; } @@ -2596,6 +3048,8 @@ client_disconnect_notification (void *cls, struct TransportClient *prev; struct ClientMessageQueueEntry *mqe; + if (client == NULL) + return; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Client disconnected, cleaning up.\n"); @@ -2611,10 +3065,12 @@ client_disconnect_notification (void *cls, return; while (NULL != (mqe = pos->message_queue_head)) { - pos->message_queue_head = mqe->next; + GNUNET_CONTAINER_DLL_remove (pos->message_queue_head, + pos->message_queue_tail, + mqe); + pos->message_count--; GNUNET_free (mqe); } - pos->message_queue_head = NULL; if (prev == NULL) clients = pos->next; else @@ -2624,23 +3080,54 @@ client_disconnect_notification (void *cls, pos->client = NULL; return; } + if (pos->th != NULL) + { + GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); + pos->th = NULL; + } + GNUNET_break (0 == pos->message_count); GNUNET_free (pos); } /** - * Function called when the service shuts down. Unloads our plugins. + * Iterator to free entries in the validation_map. + * + * @param cls closure (unused) + * @param key current key code + * @param value value in the hash map (validation to abort) + * @return GNUNET_YES (always) + */ +static int +abort_validation (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct ValidationEntry *va = value; + + GNUNET_SCHEDULER_cancel (sched, va->timeout_task); + GNUNET_free (va->transport_name); + GNUNET_free (va); + return GNUNET_YES; +} + + +/** + * Function called when the service shuts down. Unloads our plugins + * and cancels pending validations. * * @param cls closure, unused * @param tc task context (unused) */ static void -unload_plugins (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct TransportPlugin *plug; - struct AddressList *al; + struct OwnAddressList *al; + struct CheckHelloValidatedContext *chvc; + while (neighbours != NULL) + disconnect_neighbour (neighbours, GNUNET_NO); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport service is unloading plugins...\n"); @@ -2661,6 +3148,20 @@ unload_plugins (void *cls, if (my_private_key != NULL) GNUNET_CRYPTO_rsa_key_free (my_private_key); GNUNET_free_non_null (our_hello); + + /* free 'chvc' data structure */ + while (NULL != (chvc = chvc_head)) + { + chvc_head = chvc->next; + GNUNET_PEERINFO_iterate_cancel (chvc->piter); + GNUNET_free (chvc); + } + chvc_tail = NULL; + + GNUNET_CONTAINER_multihashmap_iterate (validation_map, + &abort_validation, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (validation_map); } @@ -2675,7 +3176,7 @@ unload_plugins (void *cls, static void run (void *cls, struct GNUNET_SCHEDULER_Handle *s, - struct GNUNET_SERVER_Handle *serv, + struct GNUNET_SERVER_Handle *serv, const struct GNUNET_CONFIGURATION_Handle *c) { char *plugs; @@ -2686,6 +3187,7 @@ run (void *cls, sched = s; cfg = c; + validation_map = GNUNET_CONTAINER_multihashmap_create (64); /* parse configuration */ if ((GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c, @@ -2740,12 +3242,12 @@ run (void *cls, } GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_UNIT_FOREVER_REL, - &unload_plugins, NULL); + &shutdown_task, NULL); if (no_transports) refresh_hello (); + #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Transport service ready.\n")); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Transport service ready.\n")); #endif /* process client requests */ GNUNET_SERVER_add_handlers (server, handlers); @@ -2763,11 +3265,11 @@ int main (int argc, char *const *argv) { return (GNUNET_OK == - GNUNET_SERVICE_run (argc, - argv, - "transport", - GNUNET_SERVICE_OPTION_NONE, - &run, NULL)) ? 0 : 1; + GNUNET_SERVICE_run (argc, + argv, + "transport", + GNUNET_SERVICE_OPTION_NONE, + &run, NULL)) ? 0 : 1; } /* end of gnunet-service-transport.c */