X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fgnunet-service-transport.c;h=d654fcc8cfc89c55cd5fe4c60f31f11d682b3d7b;hb=9d2cd5be986a0732b9e5a8fcdf5acd1c7225ef2f;hp=4c94e293965fa0fb30c006435766bdb4721cc1e0;hpb=71bf2d9735a07c7d827b275d0e1dd253210139d9;p=oweals%2Fgnunet.git diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index 4c94e2939..d654fcc8c 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -197,6 +197,12 @@ struct ForeignAddressList */ unsigned int connect_attempts; + /** + * DV distance to this peer (1 if no DV is used). + * FIXME: need to set this from transport plugins! + */ + uint32_t distance; + }; @@ -284,11 +290,11 @@ struct TransportPlugin }; -struct NeighborList; +struct NeighbourList; /** - * For each neighbor we keep a list of messages - * that we still want to transmit to the neighbor. + * For each neighbour we keep a list of messages + * that we still want to transmit to the neighbour. */ struct MessageQueue { @@ -327,9 +333,9 @@ struct MessageQueue struct ForeignAddressList *specific_address; /** - * Peer ID of the Neighbor this entry belongs to. + * Peer ID of the Neighbour this entry belongs to. */ - struct GNUNET_PeerIdentity neighbor_id; + struct GNUNET_PeerIdentity neighbour_id; /** * Plugin that we used for the transmission. @@ -337,6 +343,11 @@ struct MessageQueue */ struct TransportPlugin *plugin; + /** + * At what time should we fail? + */ + struct GNUNET_TIME_Absolute timeout; + /** * Internal message of the transport system that should not be * included in the usual SEND-SEND_OK transmission confirmation @@ -355,7 +366,7 @@ struct MessageQueue /** - * For a given Neighbor, which plugins are available + * For a given Neighbour, which plugins are available * to talk to this peer and what are their costs? */ struct ReadyList @@ -381,15 +392,15 @@ struct ReadyList /** - * Entry in linked list of all of our current neighbors. + * Entry in linked list of all of our current neighbours. */ -struct NeighborList +struct NeighbourList { /** * This is a linked list. */ - struct NeighborList *next; + struct NeighbourList *next; /** * Which of our transports is connected to this peer @@ -410,7 +421,7 @@ struct NeighborList struct MessageQueue *messages_tail; /** - * Identity of this neighbor. + * Identity of this neighbour. */ struct GNUNET_PeerIdentity id; @@ -420,6 +431,12 @@ struct NeighborList */ GNUNET_SCHEDULER_TaskIdentifier timeout_task; + /** + * ID of task scheduled to run when we should retry transmitting + * the head of the message queue. + */ + GNUNET_SCHEDULER_TaskIdentifier retry_task; + /** * How long until we should consider this peer dead * (if we don't receive another message in the @@ -437,7 +454,7 @@ struct NeighborList * this particular peer. This latency may have been calculated * over multiple transports. This value reflects how long it took * us to receive a response when SENDING via this particular - * transport/neighbor/address combination! + * transport/neighbour/address combination! * * FIXME: we need to periodically send PINGs to update this * latency (at least more often than the current "huge" (11h?) @@ -445,6 +462,11 @@ struct NeighborList */ struct GNUNET_TIME_Relative latency; + /** + * DV distance to this peer (1 if no DV is used). + */ + uint32_t distance; + /** * How many bytes have we received since the "last_quota_update" * timestamp? @@ -452,7 +474,7 @@ struct NeighborList uint64_t last_received; /** - * Global quota for inbound traffic for the neighbor in bytes/ms. + * Global quota for inbound traffic for the neighbour in bytes/ms. */ uint32_t quota_in; @@ -465,9 +487,8 @@ struct NeighborList unsigned int quota_violation_count; /** - * Have we seen an ACK from this neighbor in the past? - * (used to make up a fake ACK for clients connecting after - * the neighbor connected to us). + * Have we seen an PONG from this neighbour in the past (and + * not had a disconnect since)? */ int received_pong; @@ -552,17 +573,22 @@ struct TransportPongMessage }; + /** - * Linked list of messages to be transmitted to - * the client. Each entry is followed by the - * actual message. + * Linked list of messages to be transmitted to the client. Each + * entry is followed by the actual message. */ struct ClientMessageQueueEntry { /** - * This is a linked list. + * This is a doubly-linked list. */ struct ClientMessageQueueEntry *next; + + /** + * This is a doubly-linked list. + */ + struct ClientMessageQueueEntry *prev; }; @@ -594,6 +620,11 @@ struct TransportClient */ struct ClientMessageQueueEntry *message_queue_tail; + /** + * Current transmit request handle. + */ + struct GNUNET_CONNECTION_TransmitHandle *th; + /** * Is a call to "transmit_send_continuation" pending? If so, we * must not free this struct (even if the corresponding client @@ -698,7 +729,7 @@ struct CheckHelloValidatedContext static struct GNUNET_HELLO_Message *our_hello; /** - * "version" of "our_hello". Used to see if a given neighbor has + * "version" of "our_hello". Used to see if a given neighbour has * already been sent the latest version of our HELLO message. */ static unsigned int our_hello_version; @@ -744,12 +775,12 @@ static struct TransportPlugin *plugins; static struct GNUNET_SERVER_Handle *server; /** - * All known neighbors and their HELLOs. + * All known neighbours and their HELLOs. */ -static struct NeighborList *neighbors; +static struct NeighbourList *neighbours; /** - * Number of neighbors we'd like to have. + * Number of neighbours we'd like to have. */ static uint32_t max_connect_per_transport; @@ -771,43 +802,42 @@ static struct CheckHelloValidatedContext *chvc_tail; static struct GNUNET_CONTAINER_MultiHashMap *validation_map; - /** - * The peer specified by the given neighbor has timed-out or a plugin + * The peer specified by the given neighbour has timed-out or a plugin * has disconnected. We may either need to do nothing (other plugins * still up), or trigger a full disconnect and clean up. This * function updates our state and do the necessary notifications. - * Also notifies our clients that the neighbor is now officially + * Also notifies our clients that the neighbour is now officially * gone. * - * @param n the neighbor list entry for the peer + * @param n the neighbour list entry for the peer * @param check should we just check if all plugins * disconnected or must we ask all plugins to * disconnect? */ -static void disconnect_neighbor (struct NeighborList *n, int check); +static void disconnect_neighbour (struct NeighbourList *n, int check); /** - * Check the ready list for the given neighbor and if a plugin is + * Check the ready list for the given neighbour and if a plugin is * ready for transmission (and if we have a message), do so! * - * @param neighbor target peer for which to transmit + * @param neighbour target peer for which to transmit */ -static void try_transmission_to_peer (struct NeighborList *neighbor); +static void try_transmission_to_peer (struct NeighbourList *neighbour); /** - * Find an entry in the neighbor list for a particular peer. + * Find an entry in the neighbour list for a particular peer. * if sender_address is not specified (NULL) then return the * first matching entry. If sender_address is specified, then * make sure that the address and address_len also matches. * * @return NULL if not found. */ -static struct NeighborList * -find_neighbor (const struct GNUNET_PeerIdentity *key) +static struct NeighbourList * +find_neighbour (const struct GNUNET_PeerIdentity *key) { - struct NeighborList *head = neighbors; + struct NeighbourList *head = neighbours; while ((head != NULL) && (0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity)))) @@ -832,20 +862,27 @@ find_transport (const char *short_name) /** - * Update the quota values for the given neighbor now. + * Update the quota values for the given neighbour now. + * + * @param n neighbour to update + * @param force GNUNET_YES to force recalculation now */ static void -update_quota (struct NeighborList *n) +update_quota (struct NeighbourList *n, + int force) { - struct GNUNET_TIME_Relative delta; + struct GNUNET_TIME_Absolute now; + unsigned long long delta; uint64_t allowed; uint64_t remaining; - delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - if (delta.value < MIN_QUOTA_REFRESH_TIME) - return; /* not enough time passed for doing quota update */ - allowed = delta.value * n->quota_in; - + now = GNUNET_TIME_absolute_get (); + delta = now.value - n->last_quota_update.value; + allowed = n->quota_in * delta; + if ( (delta < MIN_QUOTA_REFRESH_TIME) && + (!force) && + (allowed < 32 * 1024) ) + return; /* too early, not enough data */ if (n->last_received < allowed) { remaining = allowed - n->last_received; @@ -856,7 +893,7 @@ update_quota (struct NeighborList *n) if (remaining > MAX_BANDWIDTH_CARRY) remaining = MAX_BANDWIDTH_CARRY; n->last_received = 0; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; n->last_quota_update.value -= remaining; if (n->quota_violation_count > 0) n->quota_violation_count--; @@ -864,10 +901,10 @@ update_quota (struct NeighborList *n) else { n->last_received -= allowed; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; if (n->last_received > allowed) { - /* more than twice the allowed rate! */ + /* much more than the allowed rate! */ n->quota_violation_count += 10; } } @@ -892,9 +929,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) uint16_t msize; size_t tsize; const struct GNUNET_MessageHeader *msg; - struct GNUNET_CONNECTION_TransmitHandle *th; char *cbuf; + client->th = NULL; if (buf == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -902,10 +939,11 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) /* fatal error with client, free message queue! */ while (NULL != (q = client->message_queue_head)) { - client->message_queue_head = q->next; + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); GNUNET_free (q); } - client->message_queue_tail = NULL; client->message_count = 0; return 0; } @@ -922,9 +960,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) "Transmitting message of type %u to client.\n", ntohs (msg->type)); #endif - client->message_queue_head = q->next; - if (q->next == NULL) - client->message_queue_tail = NULL; + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); memcpy (&cbuf[tsize], msg, msize); tsize += msize; GNUNET_free (q); @@ -933,12 +971,12 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) if (NULL != q) { GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } return tsize; } @@ -960,7 +998,6 @@ transmit_to_client (struct TransportClient *client, { struct ClientMessageQueueEntry *q; uint16_t msize; - struct GNUNET_CONNECTION_TransmitHandle *th; if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop)) { @@ -971,34 +1008,51 @@ transmit_to_client (struct TransportClient *client, /* TODO: call to statistics... */ return; } - client->message_count++; msize = ntohs (msg->size); GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize); memcpy (&q[1], msg, msize); - /* append to message queue */ - if (client->message_queue_tail == NULL) - { - client->message_queue_tail = q; - } - else + GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head, + client->message_queue_tail, + client->message_queue_tail, + q); + client->message_count++; + if (client->th == NULL) { - client->message_queue_tail->next = q; - client->message_queue_tail = q; - } - if (client->message_queue_head == NULL) - { - client->message_queue_head = q; - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } } +/** + * Transmit a 'SEND_OK' notification to the given client for the + * given neighbour. + * + * @param client who to notify + * @param n neighbour to notify about + * @param result status code for the transmission request + */ +static void +transmit_send_ok (struct TransportClient *client, + struct NeighbourList *n, + int result) +{ + struct SendOkMessage send_ok_msg; + + send_ok_msg.header.size = htons (sizeof (send_ok_msg)); + send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); + send_ok_msg.success = htonl (result); + send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency); + send_ok_msg.peer = n->id; + transmit_to_client (client, &send_ok_msg.header, GNUNET_NO); +} + + /** * Function called by the GNUNET_TRANSPORT_TransmitFunction * upon "completion" of a send request. This tells the API @@ -1019,76 +1073,48 @@ transmit_send_continuation (void *cls, int result) { struct MessageQueue *mq = cls; - /*struct ReadyList *rl;*/ /* We no longer use the ReadyList for anything here, safe to remove? */ - struct SendOkMessage send_ok_msg; - struct NeighborList *n; + struct NeighbourList *n; - GNUNET_assert (mq != NULL); - n = find_neighbor(&mq->neighbor_id); - if (n == NULL) /* Neighbor must have been removed asynchronously! */ - return; - - /* Otherwise, let's make sure we've got the right peer */ - GNUNET_assert (0 == - memcmp (&n->id, target, - sizeof (struct GNUNET_PeerIdentity))); - - if (result == GNUNET_OK) + n = find_neighbour(&mq->neighbour_id); + GNUNET_assert (n != NULL); + if (mq->specific_address != NULL) { - if (mq->specific_address != NULL) + if (result == GNUNET_OK) { mq->specific_address->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); mq->specific_address->connected = GNUNET_YES; - } - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission to peer `%s' failed, marking connection as down.\n", - GNUNET_i2s (target)); - if (mq->specific_address != NULL) - mq->specific_address->connected = GNUNET_NO; + } + else + { + mq->specific_address->connected = GNUNET_NO; + } + if (! mq->internal_msg) + mq->specific_address->in_transmit = GNUNET_NO; } - if ( (! mq->internal_msg) && - (mq->specific_address != NULL) ) - mq->specific_address->in_transmit = GNUNET_NO; - if (mq->client != NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Notifying client %p about transmission to peer `%4s'.\n", - mq->client, GNUNET_i2s (target)); - send_ok_msg.header.size = htons (sizeof (send_ok_msg)); - send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - send_ok_msg.success = htonl (result); - send_ok_msg.peer = n->id; - transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO); - } + transmit_send_ok (mq->client, n, result); GNUNET_free (mq); - /* one plugin just became ready again, try transmitting - another message (if available) */ - if (result == GNUNET_OK) - try_transmission_to_peer (n); - else - disconnect_neighbor (n, GNUNET_YES); + try_transmission_to_peer (n); + if (result != GNUNET_OK) + disconnect_neighbour (n, GNUNET_YES); } /** * Find an address in any of the available transports for - * the given neighbor that would be good for message + * the given neighbour that would be good for message * transmission. This is essentially the transport selection * routine. * - * @param neighbor for whom to select an address + * @param neighbour for whom to select an address * @return selected address, NULL if we have none */ struct ForeignAddressList * -find_ready_address(struct NeighborList *neighbor) +find_ready_address(struct NeighbourList *neighbour) { - struct ReadyList *head = neighbor->plugins; + struct ReadyList *head = neighbour->plugins; struct ForeignAddressList *addresses; struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); struct ForeignAddressList *best_address; @@ -1105,7 +1131,7 @@ find_ready_address(struct NeighborList *neighbor) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Marking long-time inactive connection to `%4s' as down.\n", - GNUNET_i2s (&neighbor->id)); + GNUNET_i2s (&neighbour->id)); #endif addresses->connected = GNUNET_NO; } @@ -1142,41 +1168,81 @@ find_ready_address(struct NeighborList *neighbor) /** - * Check the ready list for the given neighbor and if a plugin is + * We should re-try transmitting to the given peer, + * hopefully we've learned something in the meantime. + */ +static void +retry_transmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NeighbourList *n = cls; + + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); +} + + +/** + * Check the ready list for the given neighbour and if a plugin is * ready for transmission (and if we have a message), do so! * - * @param neighbor target peer for which to transmit + * @param neighbour target peer for which to transmit */ static void -try_transmission_to_peer (struct NeighborList *neighbor) +try_transmission_to_peer (struct NeighbourList *neighbour) { - struct GNUNET_TIME_Relative min_latency; struct ReadyList *rl; struct MessageQueue *mq; + struct GNUNET_TIME_Relative timeout; - if (neighbor->messages_head == NULL) + if (neighbour->messages_head == NULL) return; /* nothing to do */ - min_latency = GNUNET_TIME_UNIT_FOREVER_REL; rl = NULL; - mq = neighbor->messages_head; + mq = neighbour->messages_head; + /* FIXME: support bi-directional use of TCP */ if (mq->specific_address == NULL) - mq->specific_address = find_ready_address(neighbor); + mq->specific_address = find_ready_address(neighbour); if (mq->specific_address == NULL) { + timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); + if (timeout.value == 0) + { +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No destination address available to transmit message of size %u to peer `%4s'\n", + mq->message_buf_size, + GNUNET_i2s (&mq->neighbour_id)); +#endif + if (mq->client != NULL) + transmit_send_ok (mq->client, neighbour, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (neighbour->messages_head, + neighbour->messages_tail, + mq); + GNUNET_free (mq); + return; /* nobody ready */ + } + if (neighbour->retry_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, + neighbour->retry_task); + neighbour->retry_task = GNUNET_SCHEDULER_add_delayed (sched, + timeout, + &retry_transmission_task, + neighbour); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No destination address available to transmit message of size %u to peer `%4s'\n", + "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n", mq->message_buf_size, - GNUNET_i2s (&mq->neighbor_id)); + GNUNET_i2s (&mq->neighbour_id), + timeout.value); #endif - return; /* nobody ready */ + return; } + GNUNET_CONTAINER_DLL_remove (neighbour->messages_head, + neighbour->messages_tail, + mq); if (mq->specific_address->connected == GNUNET_NO) mq->specific_address->connect_attempts++; rl = mq->specific_address->ready_list; - GNUNET_CONTAINER_DLL_remove (neighbor->messages_head, - neighbor->messages_tail, - mq); mq->plugin = rl->plugin; if (!mq->internal_msg) mq->specific_address->in_transmit = GNUNET_YES; @@ -1184,13 +1250,13 @@ try_transmission_to_peer (struct NeighborList *neighbor) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of size %u for `%4s' to `%s' via plugin `%s'\n", mq->message_buf_size, - GNUNET_i2s (&neighbor->id), + GNUNET_i2s (&neighbour->id), GNUNET_a2s (mq->specific_address->addr, mq->specific_address->addrlen), rl->plugin->short_name); #endif rl->plugin->api->send (rl->plugin->api->cls, - &mq->neighbor_id, + &mq->neighbour_id, mq->message_buf, mq->message_buf_size, mq->priority, @@ -1208,19 +1274,21 @@ try_transmission_to_peer (struct NeighborList *neighbor) * @param client source of the transmission request (can be NULL) * @param peer_address ForeignAddressList where we should send this message * @param priority how important is the message + * @param timeout how long do we have to transmit? * @param message_buf message(s) to send GNUNET_MessageHeader(s) * @param message_buf_size total size of all messages in message_buf * @param is_internal is this an internal message; these are pre-pended and * also do not count for plugins being "ready" to transmit - * @param neighbor handle to the neighbor for transmission + * @param neighbour handle to the neighbour for transmission */ static void transmit_to_peer (struct TransportClient *client, struct ForeignAddressList *peer_address, unsigned int priority, + struct GNUNET_TIME_Relative timeout, const char *message_buf, size_t message_buf_size, - int is_internal, struct NeighborList *neighbor) + int is_internal, struct NeighbourList *neighbour) { struct MessageQueue *mq; @@ -1228,7 +1296,7 @@ transmit_to_peer (struct TransportClient *client, if (client != NULL) { /* check for duplicate submission */ - mq = neighbor->messages_head; + mq = neighbour->messages_head; while (NULL != mq) { if (mq->client == client) @@ -1248,20 +1316,20 @@ transmit_to_peer (struct TransportClient *client, memcpy (&mq[1], message_buf, message_buf_size); mq->message_buf = (const char*) &mq[1]; mq->message_buf_size = message_buf_size; - memcpy(&mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity)); + memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity)); mq->internal_msg = is_internal; mq->priority = priority; - + mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); if (is_internal) - GNUNET_CONTAINER_DLL_insert (neighbor->messages_head, - neighbor->messages_tail, + GNUNET_CONTAINER_DLL_insert (neighbour->messages_head, + neighbour->messages_tail, mq); else - GNUNET_CONTAINER_DLL_insert_after (neighbor->messages_head, - neighbor->messages_tail, - neighbor->messages_tail, + GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head, + neighbour->messages_tail, + neighbour->messages_tail, mq); - try_transmission_to_peer (neighbor); + try_transmission_to_peer (neighbour); } @@ -1313,7 +1381,7 @@ refresh_hello () { struct GNUNET_HELLO_Message *hello; struct TransportClient *cpos; - struct NeighborList *npos; + struct NeighbourList *npos; struct GeneratorContext gc; gc.plug_pos = plugins; @@ -1337,15 +1405,16 @@ refresh_hello () our_hello = hello; our_hello_version++; GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello); - npos = neighbors; + npos = neighbours; while (npos != NULL) { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Transmitting updated `%s' to neighbor `%4s'\n", + "Transmitting updated `%s' to neighbour `%4s'\n", "HELLO", GNUNET_i2s (&npos->id)); #endif transmit_to_peer (NULL, NULL, 0, + HELLO_ADDRESS_EXPIRATION, (const char *) our_hello, GNUNET_HELLO_size(our_hello), GNUNET_NO, npos); @@ -1495,14 +1564,20 @@ plugin_env_notify_address (void *cls, */ static void notify_clients_connect (const struct GNUNET_PeerIdentity *peer, - struct GNUNET_TIME_Relative latency) + struct GNUNET_TIME_Relative latency, + uint32_t distance) { struct ConnectInfoMessage cim; struct TransportClient *cpos; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Notifying clients about connection from `%s'\n", + GNUNET_i2s (peer)); +#endif cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); + cim.distance = htonl (distance); cim.latency = GNUNET_TIME_relative_hton (latency); memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity)); cpos = clients; @@ -1523,6 +1598,11 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) struct DisconnectInfoMessage dim; struct TransportClient *cpos; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Notifying clients about lost connection to `%s'\n", + GNUNET_i2s (peer)); +#endif dim.header.size = htons (sizeof (struct DisconnectInfoMessage)); dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); dim.reserved = htonl (0); @@ -1540,14 +1620,14 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) * Find a ForeignAddressList entry for the given neighbour * that matches the given address and transport. * - * @param neighbor which peer we care about + * @param neighbour which peer we care about * @param tname name of the transport plugin * @param addr binary address * @param addrlen length of addr * @return NULL if no such entry exists */ static struct ForeignAddressList * -find_peer_address(struct NeighborList *neighbor, +find_peer_address(struct NeighbourList *neighbour, const char *tname, const char *addr, size_t addrlen) @@ -1555,7 +1635,7 @@ find_peer_address(struct NeighborList *neighbor, struct ReadyList *head; struct ForeignAddressList *address_head; - head = neighbor->plugins; + head = neighbour->plugins; while (head != NULL) { if (0 == strcmp (tname, head->plugin->short_name)) @@ -1575,17 +1655,17 @@ find_peer_address(struct NeighborList *neighbor, /** - * Get the peer address struct for the given neighbor and + * Get the peer address struct for the given neighbour and * address. If it doesn't yet exist, create it. * - * @param neighbor which peer we care about + * @param neighbour which peer we care about * @param tname name of the transport plugin * @param addr binary address * @param addrlen length of addr * @return NULL if we do not have a transport plugin for 'tname' */ static struct ForeignAddressList * -add_peer_address(struct NeighborList *neighbor, +add_peer_address(struct NeighbourList *neighbour, const char *tname, const char *addr, size_t addrlen) @@ -1593,10 +1673,10 @@ add_peer_address(struct NeighborList *neighbor, struct ReadyList *head; struct ForeignAddressList *ret; - ret = find_peer_address (neighbor, tname, addr, addrlen); + ret = find_peer_address (neighbour, tname, addr, addrlen); if (ret != NULL) return ret; - head = neighbor->plugins; + head = neighbour->plugins; while (head != NULL) { if (0 == strcmp (tname, head->plugin->short_name)) @@ -1612,6 +1692,7 @@ add_peer_address(struct NeighborList *neighbor, ret->expires = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); ret->latency = GNUNET_TIME_relative_get_forever(); + ret->distance = -1; ret->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); ret->ready_list = head; @@ -1691,7 +1772,7 @@ check_pending_validation (void *cls, unsigned int challenge = ntohl(pong->challenge); struct GNUNET_HELLO_Message *hello; struct GNUNET_PeerIdentity target; - struct NeighborList *n; + struct NeighbourList *n; struct ForeignAddressList *fal; if (ve->challenge != challenge) @@ -1718,7 +1799,7 @@ check_pending_validation (void *cls, &target, hello); GNUNET_free (hello); - n = find_neighbor (&target); + n = find_neighbour (&target); if (n != NULL) { fal = add_peer_address (n, ve->transport_name, @@ -1731,6 +1812,19 @@ check_pending_validation (void *cls, n->latency = fal->latency; else n->latency.value = (fal->latency.value + n->latency.value) / 2; + n->distance = fal->distance; + if (GNUNET_NO == n->received_pong) + { + notify_clients_connect (&target, n->latency, n->distance); + n->received_pong = GNUNET_YES; + } + if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, + n->retry_task); + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission_to_peer (n); + } } /* clean up validation entry */ @@ -1755,11 +1849,11 @@ check_pending_validation (void *cls, * (otherwise we may be seeing a MiM attack). * * @param cls closure - * @param name name of the transport that generated the address + * @param message the pong message * @param peer who responded to our challenge - * @param challenge the challenge number we presumably used - * @param sender_addr string describing our sender address (as observed - * by the other peer in human-readable format) + * @param sender_address string describing our sender address (as observed + * by the other peer in binary format) + * @param sender_address_len number of bytes in 'sender_address' */ static void handle_pong (void *cls, const struct GNUNET_MessageHeader *message, @@ -1795,6 +1889,7 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message, #endif return; } + #if 0 /* FIXME: add given address to potential pool of our addresses (for voting) */ @@ -1808,39 +1903,39 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message, static void -neighbor_timeout_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +neighbour_timeout_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct NeighborList *n = cls; + struct NeighbourList *n = cls; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Neighbor `%4s' has timed out!\n", GNUNET_i2s (&n->id)); + "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id)); #endif n->timeout_task = GNUNET_SCHEDULER_NO_TASK; - disconnect_neighbor (n, GNUNET_NO); + disconnect_neighbour (n, GNUNET_NO); } /** - * Create a fresh entry in our neighbor list for the given peer. - * Will try to transmit our current HELLO to the new neighbor. Also + * Create a fresh entry in our neighbour list for the given peer. + * Will try to transmit our current HELLO to the new neighbour. Also * notifies our clients about the new "connection". * * @param peer the peer for which we create the entry - * @return the new neighbor list entry + * @return the new neighbour list entry */ -static struct NeighborList * -setup_new_neighbor (const struct GNUNET_PeerIdentity *peer) +static struct NeighbourList * +setup_new_neighbour (const struct GNUNET_PeerIdentity *peer) { - struct NeighborList *n; + struct NeighbourList *n; struct TransportPlugin *tp; struct ReadyList *rl; GNUNET_assert (our_hello != NULL); - n = GNUNET_malloc (sizeof (struct NeighborList)); - n->next = neighbors; - neighbors = n; + n = GNUNET_malloc (sizeof (struct NeighbourList)); + n->next = neighbours; + neighbours = n; n->id = *peer; n->last_quota_update = GNUNET_TIME_absolute_get (); n->peer_timeout = @@ -1861,13 +1956,14 @@ setup_new_neighbor (const struct GNUNET_PeerIdentity *peer) tp = tp->next; } n->latency = GNUNET_TIME_UNIT_FOREVER_REL; + n->distance = -1; n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbor_timeout_task, n); + &neighbour_timeout_task, n); transmit_to_peer (NULL, NULL, 0, + HELLO_ADDRESS_EXPIRATION, (const char *) our_hello, GNUNET_HELLO_size(our_hello), GNUNET_NO, n); - notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL); return n; } @@ -1977,7 +2073,7 @@ run_validation (void *cls, struct GNUNET_PeerIdentity id; struct TransportPlugin *tp; struct ValidationEntry *va; - struct NeighborList *neighbor; + struct NeighbourList *neighbour; struct ForeignAddressList *peer_address; struct TransportPingMessage ping; struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk; @@ -2041,10 +2137,10 @@ run_validation (void *cls, &id.hashPubKey, va, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - neighbor = find_neighbor(&id); - if (neighbor == NULL) - neighbor = setup_new_neighbor(&id); - peer_address = add_peer_address(neighbor, tname, addr, addrlen); + neighbour = find_neighbour(&id); + if (neighbour == NULL) + neighbour = setup_new_neighbour(&id); + peer_address = add_peer_address(neighbour, tname, addr, addrlen); GNUNET_assert(peer_address != NULL); hello_size = GNUNET_HELLO_size(our_hello); tsize = sizeof(struct TransportPingMessage) + hello_size; @@ -2068,8 +2164,9 @@ run_validation (void *cls, #endif transmit_to_peer (NULL, peer_address, GNUNET_SCHEDULER_PRIORITY_DEFAULT, + HELLO_VERIFICATION_TIMEOUT, message_buf, tsize, - GNUNET_YES, neighbor); + GNUNET_YES, neighbour); GNUNET_free(message_buf); return GNUNET_OK; } @@ -2079,7 +2176,7 @@ run_validation (void *cls, * Add the given address to the list of foreign addresses * available for the given peer (check for duplicates). * - * @param cls the respective 'struct NeighborList' to update + * @param cls the respective 'struct NeighbourList' to update * @param tname name of the transport * @param expiration expiration time * @param addr the address @@ -2092,7 +2189,7 @@ add_to_foreign_address_list (void *cls, struct GNUNET_TIME_Absolute expiration, const void *addr, size_t addrlen) { - struct NeighborList *n = cls; + struct NeighbourList *n = cls; struct ForeignAddressList *fal; fal = find_peer_address (n, tname, addr, addrlen); @@ -2100,10 +2197,11 @@ add_to_foreign_address_list (void *cls, { #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding address `%s' (%s) for peer `%4s' due to peerinfo data.\n", + "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n", GNUNET_a2s (addr, addrlen), tname, - GNUNET_i2s (&n->id)); + GNUNET_i2s (&n->id), + expiration.value); #endif fal = add_peer_address (n, tname, addr, addrlen); } @@ -2122,7 +2220,7 @@ add_to_foreign_address_list (void *cls, * * @param cls closure * @param peer id of the peer, NULL for last call - * @param hello hello message for the peer (can be NULL) + * @param h hello message for the peer (can be NULL) * @param trust amount of trust we have in the peer (not used) */ static void @@ -2135,7 +2233,7 @@ check_hello_validated (void *cls, struct GNUNET_HELLO_Message *plain_hello; struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk; struct GNUNET_PeerIdentity target; - struct NeighborList *n; + struct NeighbourList *n; if (peer == NULL) { @@ -2173,12 +2271,15 @@ check_hello_validated (void *cls, if (h == NULL) return; chvc->hello_known = GNUNET_YES; - n = find_neighbor (peer); + n = find_neighbour (peer); if (n != NULL) - GNUNET_HELLO_iterate_addresses (h, - GNUNET_NO, - &add_to_foreign_address_list, - n); + { + GNUNET_HELLO_iterate_addresses (h, + GNUNET_NO, + &add_to_foreign_address_list, + n); + try_transmission_to_peer (n); + } GNUNET_HELLO_iterate_new_addresses (chvc->hello, h, GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME), @@ -2253,41 +2354,28 @@ process_hello (struct TransportPlugin *plugin, /** - * The peer specified by the given neighbor has timed-out or a plugin + * The peer specified by the given neighbour has timed-out or a plugin * has disconnected. We may either need to do nothing (other plugins * still up), or trigger a full disconnect and clean up. This * function updates our state and does the necessary notifications. - * Also notifies our clients that the neighbor is now officially + * Also notifies our clients that the neighbour is now officially * gone. * - * @param n the neighbor list entry for the peer + * @param n the neighbour list entry for the peer * @param check should we just check if all plugins * disconnected or must we ask all plugins to * disconnect? */ static void -disconnect_neighbor (struct NeighborList *current_handle, int check) +disconnect_neighbour (struct NeighbourList *n, int check) { struct ReadyList *rpos; - struct NeighborList *npos; - struct NeighborList *nprev; - struct NeighborList *n; + struct NeighbourList *npos; + struct NeighbourList *nprev; struct MessageQueue *mq; struct ForeignAddressList *peer_addresses; struct ForeignAddressList *peer_pos; - if (neighbors == NULL) - return; /* We don't have any neighbors, so client has an already removed handle! */ - - npos = neighbors; - while ((npos != NULL) && (current_handle != npos)) - npos = npos->next; - - if (npos == NULL) - return; /* Couldn't find neighbor in existing list, must have been already removed! */ - else - n = npos; - if (GNUNET_YES == check) { rpos = n->plugins; @@ -2303,14 +2391,14 @@ disconnect_neighbor (struct NeighborList *current_handle, int check) rpos = rpos->next; } } - #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id)); + "Disconnecting from `%4s'\n", + GNUNET_i2s (&n->id)); #endif - /* remove n from neighbors list */ + /* remove n from neighbours list */ nprev = NULL; - npos = neighbors; + npos = neighbours; while ((npos != NULL) && (npos != n)) { nprev = npos; @@ -2318,12 +2406,13 @@ disconnect_neighbor (struct NeighborList *current_handle, int check) } GNUNET_assert (npos != NULL); if (nprev == NULL) - neighbors = n->next; + neighbours = n->next; else nprev->next = n->next; /* notify all clients about disconnect */ - notify_clients_disconnect (&n->id); + if (GNUNET_YES == n->received_pong) + notify_clients_disconnect (&n->id); /* clean up all plugins, cancel connections and pending transmissions */ while (NULL != (rpos = n->plugins)) @@ -2346,13 +2435,21 @@ disconnect_neighbor (struct NeighborList *current_handle, int check) GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); - GNUNET_assert (0 == memcmp(&mq->neighbor_id, + GNUNET_assert (0 == memcmp(&mq->neighbour_id, &n->id, sizeof(struct GNUNET_PeerIdentity))); GNUNET_free (mq); } if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (sched, n->timeout_task); + { + GNUNET_SCHEDULER_cancel (sched, n->timeout_task); + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + if (n->retry_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, n->retry_task); + n->retry_task = GNUNET_SCHEDULER_NO_TASK; + } /* finally, free n itself */ GNUNET_free (n); } @@ -2376,7 +2473,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, struct TransportPingMessage *ping; struct TransportPongMessage *pong; uint16_t msize; - struct NeighborList *n; + struct NeighbourList *n; struct ReadyList *rl; struct ForeignAddressList *fal; @@ -2422,9 +2519,9 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, GNUNET_CRYPTO_rsa_sign (my_private_key, &pong->purpose, &pong->signature)); - n = find_neighbor(peer); + n = find_neighbour(peer); if (n == NULL) - n = setup_new_neighbor(peer); + n = setup_new_neighbour(peer); /* broadcast 'PONG' to all available addresses */ rl = n->plugins; while (rl != NULL) @@ -2434,6 +2531,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, { transmit_to_peer(NULL, fal, TRANSPORT_PONG_PRIORITY, + HELLO_VERIFICATION_TIMEOUT, (const char *)pong, ntohs(pong->header.size), GNUNET_YES, @@ -2447,6 +2545,47 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, } +/** + * Calculate how long we should delay reading from the TCP socket to + * ensure that we stay within our bandwidth limits (push back). + * + * @param n for which neighbour should this be calculated + * @return how long to delay receiving more data + */ +static struct GNUNET_TIME_Relative +calculate_throttle_delay (struct NeighbourList *n) +{ + struct GNUNET_TIME_Relative ret; + struct GNUNET_TIME_Absolute now; + uint64_t del; + uint64_t avail; + uint64_t excess; + + now = GNUNET_TIME_absolute_get (); + del = now.value - n->last_quota_update.value; + if (del > MAX_BANDWIDTH_CARRY) + { + update_quota (n, GNUNET_YES); + del = now.value - n->last_quota_update.value; + GNUNET_assert (del <= MAX_BANDWIDTH_CARRY); + } + if (n->quota_in == 0) + n->quota_in = 1; /* avoid divison by zero */ + avail = del * n->quota_in; + if (avail > n->last_received) + return GNUNET_TIME_UNIT_ZERO; /* can receive right now */ + excess = n->last_received - avail; + ret.value = excess / n->quota_in; + if (ret.value > 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n", + (unsigned long long) excess, + (unsigned long long) n->quota_in, + (unsigned long long) ret.value); + return ret; +} + + /** * Function called by the plugin for each received message. * Update data volumes, possibly notify plugins about @@ -2454,18 +2593,16 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, * and generally forward to our receive callback. * * @param cls the "struct TransportPlugin *" we gave to the plugin - * @param message the message, NULL if peer was disconnected - * @param distance the transport cost to this peer (not latency!) - * @param sender_address the address that the sender reported - * (opaque to transport service) - * @param sender_address_len the length of the sender address * @param peer (claimed) identity of the other peer - * @return the new service_context that the plugin should use - * for future receive calls for messages from this - * particular peer - * - */ -static void + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again + * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) + * @param sender_address binary address of the sender (if observed) + * @param sender_address_len number of bytes in sender_address + * @return how long the plugin should wait until receiving more data + * (plugins that do not support this, can ignore the return value) + */ +static struct GNUNET_TIME_Relative plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message, unsigned int distance, const char *sender_address, @@ -2477,101 +2614,91 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, struct InboundMessage *im; struct ForeignAddressList *peer_address; uint16_t msize; - struct NeighborList *n; + struct NeighbourList *n; - n = find_neighbor (peer); + n = find_neighbour (peer); if (n == NULL) - { - if (message == NULL) - return; /* disconnect of peer already marked down */ - n = setup_new_neighbor (peer); - } + n = setup_new_neighbour (peer); + update_quota (n, GNUNET_NO); service_context = n->plugins; while ((service_context != NULL) && (plugin != service_context->plugin)) service_context = service_context->next; GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL)); - if (message == NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Receive failed from `%4s', triggering disconnect\n", - GNUNET_i2s (&n->id)); -#endif - /* TODO: call stats */ - disconnect_neighbor (n, GNUNET_YES); - return; - } - peer_address = add_peer_address(n, - plugin->short_name, - sender_address, - sender_address_len); - if (peer_address != NULL) + if (message != NULL) { - if (peer_address->connected == GNUNET_NO) - { - peer_address->connected = GNUNET_YES; - peer_address->connect_attempts++; - } - peer_address->timeout - = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - } - /* update traffic received amount ... */ - msize = ntohs (message->size); - n->last_received += msize; - GNUNET_SCHEDULER_cancel (sched, n->timeout_task); - n->peer_timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbor_timeout_task, n); - update_quota (n); - if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) - { - /* dropping message due to frequent inbound volume violations! */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING | - GNUNET_ERROR_TYPE_BULK, - _ - ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count); - /* TODO: call stats */ - return; - } - switch (ntohs (message->type)) - { - case GNUNET_MESSAGE_TYPE_HELLO: - process_hello (plugin, message); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: - handle_ping(plugin, message, peer, sender_address, sender_address_len); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: - handle_pong(plugin, message, peer, sender_address, sender_address_len); - break; - default: + peer_address = add_peer_address(n, + plugin->short_name, + sender_address, + sender_address_len); + if (peer_address != NULL) + { + peer_address->distance = distance; + if (peer_address->connected == GNUNET_NO) + { + peer_address->connected = GNUNET_YES; + peer_address->connect_attempts++; + } + peer_address->timeout + = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + } + /* update traffic received amount ... */ + msize = ntohs (message->size); + n->distance = distance; + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_SCHEDULER_cancel (sched, + n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) + { + /* dropping message due to frequent inbound volume violations! */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING | + GNUNET_ERROR_TYPE_BULK, + _ + ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), + n->quota_violation_count); + return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */ + } + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_HELLO: + process_hello (plugin, message); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: + handle_ping(plugin, message, peer, sender_address, sender_address_len); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: + handle_pong(plugin, message, peer, sender_address, sender_address_len); + break; + default: #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from `%4s', sending to all clients.\n", - ntohs (message->type), GNUNET_i2s (peer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s', sending to all clients.\n", + ntohs (message->type), GNUNET_i2s (peer)); #endif - /* transmit message to all clients */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); - im->header.size = htons (sizeof (struct InboundMessage) + msize); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (n->latency); - im->peer = *peer; - memcpy (&im[1], message, msize); - - cpos = clients; - while (cpos != NULL) - { - transmit_to_client (cpos, &im->header, GNUNET_YES); - cpos = cpos->next; - } - GNUNET_free (im); - } + /* transmit message to all clients */ + im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); + im->header.size = htons (sizeof (struct InboundMessage) + msize); + im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + im->latency = GNUNET_TIME_relative_hton (n->latency); + im->peer = *peer; + memcpy (&im[1], message, msize); + cpos = clients; + while (cpos != NULL) + { + transmit_to_client (cpos, &im->header, GNUNET_YES); + cpos = cpos->next; + } + GNUNET_free (im); + } + } + return calculate_throttle_delay (n); } @@ -2590,9 +2717,7 @@ handle_start (void *cls, { struct TransportClient *c; struct ConnectInfoMessage cim; - struct NeighborList *n; - struct InboundMessage *im; - struct GNUNET_MessageHeader *ack; + struct NeighbourList *n; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2626,33 +2751,18 @@ handle_start (void *cls, /* tell new client about all existing connections */ cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = - htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); - cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.size = htons (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */ - ack = (struct GNUNET_MessageHeader *) &im[1]; - ack->size = htons (sizeof (struct GNUNET_MessageHeader)); - ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK); - for (n = neighbors; n != NULL; n = n->next) - { - cim.id = n->id; - transmit_to_client (c, &cim.header, GNUNET_NO); - if (n->received_pong) - { - im->peer = n->id; - transmit_to_client (c, &im->header, GNUNET_NO); + n = neighbours; + while (n != NULL) + { + if (GNUNET_YES == n->received_pong) + { + cim.id = n->id; + cim.latency = GNUNET_TIME_relative_hton (n->latency); + cim.distance = htonl (n->distance); + transmit_to_client (c, &cim.header, GNUNET_NO); } + n = n->next; } - GNUNET_free (im); - } - else - { - fprintf(stderr, "Our hello is NULL!\n"); } GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2672,10 +2782,6 @@ handle_hello (void *cls, { int ret; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client\n", "HELLO"); -#endif ret = process_hello (NULL, message); GNUNET_SERVER_receive_done (client, ret); } @@ -2694,7 +2800,7 @@ handle_send (void *cls, const struct GNUNET_MessageHeader *message) { struct TransportClient *tc; - struct NeighborList *n; + struct NeighbourList *n; const struct OutboundMessage *obm; const struct GNUNET_MessageHeader *obmm; uint16_t size; @@ -2722,9 +2828,9 @@ handle_send (void *cls, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - n = find_neighbor (&obm->peer); + n = find_neighbour (&obm->peer); if (n == NULL) - n = setup_new_neighbor (&obm->peer); /* But won't ever add address, we have none! */ + n = setup_new_neighbour (&obm->peer); tc = clients; while ((tc != NULL) && (tc->client != client)) tc = tc->next; @@ -2735,7 +2841,9 @@ handle_send (void *cls, ntohs (obmm->size), ntohs (obmm->type), GNUNET_i2s (&obm->peer)); #endif - transmit_to_peer (tc, NULL, ntohl (obm->priority), (char *)obmm, + transmit_to_peer (tc, NULL, ntohl (obm->priority), + GNUNET_TIME_relative_ntoh (obm->timeout), + (char *)obmm, ntohs (obmm->size), GNUNET_NO, n); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2755,62 +2863,25 @@ handle_set_quota (void *cls, { const struct QuotaSetMessage *qsm = (const struct QuotaSetMessage *) message; - struct NeighborList *n; - struct TransportPlugin *p; - struct ReadyList *rl; + struct NeighbourList *n; + uint32_t qin; - n = find_neighbor (&qsm->peer); + n = find_neighbour (&qsm->peer); if (n == NULL) { GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } - + qin = ntohl (qsm->quota_in); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n", - "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer)); + "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer)); #endif - - update_quota (n); - if (n->quota_in < ntohl (qsm->quota_in)) + update_quota (n, GNUNET_YES); + if (n->quota_in < qin) n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_in = ntohl (qsm->quota_in); - rl = n->plugins; - while (rl != NULL) - { - p = rl->plugin; - p->api->set_receive_quota (p->api->cls, - &qsm->peer, ntohl (qsm->quota_in)); - rl = rl->next; - } - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - -/** - * Handle TRY_CONNECT-message. - * - * @param cls closure (always NULL) - * @param client identification of the client - * @param message the actual message - */ -static void -handle_try_connect (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct TryConnectMessage *tcm; - struct NeighborList *neighbor; - tcm = (const struct TryConnectMessage *) message; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client %p asking to connect to `%4s'\n", - "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer)); -#endif - neighbor = find_neighbor(&tcm->peer); - if (neighbor == NULL) - setup_new_neighbor (&tcm->peer); + n->quota_in = qin; GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2908,9 +2979,6 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0}, {&handle_set_quota, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)}, - {&handle_try_connect, NULL, - GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT, - sizeof (struct TryConnectMessage)}, {&handle_address_lookup, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP, 0}, @@ -2930,8 +2998,6 @@ create_environment (struct TransportPlugin *plug) plug->env.cls = plug; plug->env.receive = &plugin_env_receive; plug->env.notify_address = &plugin_env_notify_address; - plug->env.default_quota_in = - (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); plug->env.max_connections = max_connect_per_transport; } @@ -2999,10 +3065,12 @@ client_disconnect_notification (void *cls, return; while (NULL != (mqe = pos->message_queue_head)) { - pos->message_queue_head = mqe->next; + GNUNET_CONTAINER_DLL_remove (pos->message_queue_head, + pos->message_queue_tail, + mqe); + pos->message_count--; GNUNET_free (mqe); } - pos->message_queue_head = NULL; if (prev == NULL) clients = pos->next; else @@ -3012,6 +3080,12 @@ client_disconnect_notification (void *cls, pos->client = NULL; return; } + if (pos->th != NULL) + { + GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); + pos->th = NULL; + } + GNUNET_break (0 == pos->message_count); GNUNET_free (pos); } @@ -3052,6 +3126,8 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct OwnAddressList *al; struct CheckHelloValidatedContext *chvc; + while (neighbours != NULL) + disconnect_neighbour (neighbours, GNUNET_NO); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport service is unloading plugins...\n");