X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fgnunet-service-transport.c;h=d654fcc8cfc89c55cd5fe4c60f31f11d682b3d7b;hb=9d2cd5be986a0732b9e5a8fcdf5acd1c7225ef2f;hp=b89bc8bce0d202f7e5630ec2b6e1ccd5f2e4da45;hpb=fcc5b889b3c4783313567deb995d7437434469a3;p=oweals%2Fgnunet.git diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index b89bc8bce..d654fcc8c 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -573,17 +573,22 @@ struct TransportPongMessage }; + /** - * Linked list of messages to be transmitted to - * the client. Each entry is followed by the - * actual message. + * Linked list of messages to be transmitted to the client. Each + * entry is followed by the actual message. */ struct ClientMessageQueueEntry { /** - * This is a linked list. + * This is a doubly-linked list. */ struct ClientMessageQueueEntry *next; + + /** + * This is a doubly-linked list. + */ + struct ClientMessageQueueEntry *prev; }; @@ -615,6 +620,11 @@ struct TransportClient */ struct ClientMessageQueueEntry *message_queue_tail; + /** + * Current transmit request handle. + */ + struct GNUNET_CONNECTION_TransmitHandle *th; + /** * Is a call to "transmit_send_continuation" pending? If so, we * must not free this struct (even if the corresponding client @@ -853,19 +863,26 @@ find_transport (const char *short_name) /** * Update the quota values for the given neighbour now. + * + * @param n neighbour to update + * @param force GNUNET_YES to force recalculation now */ static void -update_quota (struct NeighbourList *n) +update_quota (struct NeighbourList *n, + int force) { - struct GNUNET_TIME_Relative delta; + struct GNUNET_TIME_Absolute now; + unsigned long long delta; uint64_t allowed; uint64_t remaining; - delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - if (delta.value < MIN_QUOTA_REFRESH_TIME) - return; /* not enough time passed for doing quota update */ - allowed = delta.value * n->quota_in; - + now = GNUNET_TIME_absolute_get (); + delta = now.value - n->last_quota_update.value; + allowed = n->quota_in * delta; + if ( (delta < MIN_QUOTA_REFRESH_TIME) && + (!force) && + (allowed < 32 * 1024) ) + return; /* too early, not enough data */ if (n->last_received < allowed) { remaining = allowed - n->last_received; @@ -876,7 +893,7 @@ update_quota (struct NeighbourList *n) if (remaining > MAX_BANDWIDTH_CARRY) remaining = MAX_BANDWIDTH_CARRY; n->last_received = 0; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; n->last_quota_update.value -= remaining; if (n->quota_violation_count > 0) n->quota_violation_count--; @@ -884,10 +901,10 @@ update_quota (struct NeighbourList *n) else { n->last_received -= allowed; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; if (n->last_received > allowed) { - /* more than twice the allowed rate! */ + /* much more than the allowed rate! */ n->quota_violation_count += 10; } } @@ -912,9 +929,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) uint16_t msize; size_t tsize; const struct GNUNET_MessageHeader *msg; - struct GNUNET_CONNECTION_TransmitHandle *th; char *cbuf; + client->th = NULL; if (buf == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -922,10 +939,11 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) /* fatal error with client, free message queue! */ while (NULL != (q = client->message_queue_head)) { - client->message_queue_head = q->next; + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); GNUNET_free (q); } - client->message_queue_tail = NULL; client->message_count = 0; return 0; } @@ -942,9 +960,9 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) "Transmitting message of type %u to client.\n", ntohs (msg->type)); #endif - client->message_queue_head = q->next; - if (q->next == NULL) - client->message_queue_tail = NULL; + GNUNET_CONTAINER_DLL_remove (client->message_queue_head, + client->message_queue_tail, + q); memcpy (&cbuf[tsize], msg, msize); tsize += msize; GNUNET_free (q); @@ -953,12 +971,12 @@ transmit_to_client_callback (void *cls, size_t size, void *buf) if (NULL != q) { GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } return tsize; } @@ -980,7 +998,6 @@ transmit_to_client (struct TransportClient *client, { struct ClientMessageQueueEntry *q; uint16_t msize; - struct GNUNET_CONNECTION_TransmitHandle *th; if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop)) { @@ -991,30 +1008,23 @@ transmit_to_client (struct TransportClient *client, /* TODO: call to statistics... */ return; } - client->message_count++; msize = ntohs (msg->size); GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader)); q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize); memcpy (&q[1], msg, msize); - /* append to message queue */ - if (client->message_queue_tail == NULL) - { - client->message_queue_tail = q; - } - else + GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head, + client->message_queue_tail, + client->message_queue_tail, + q); + client->message_count++; + if (client->th == NULL) { - client->message_queue_tail->next = q; - client->message_queue_tail = q; - } - if (client->message_queue_head == NULL) - { - client->message_queue_head = q; - th = GNUNET_SERVER_notify_transmit_ready (client->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_to_client_callback, - client); - GNUNET_assert (th != NULL); + client->th = GNUNET_SERVER_notify_transmit_ready (client->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_to_client_callback, + client); + GNUNET_assert (client->th != NULL); } } @@ -1181,14 +1191,12 @@ retry_transmission_task (void *cls, static void try_transmission_to_peer (struct NeighbourList *neighbour) { - struct GNUNET_TIME_Relative min_latency; struct ReadyList *rl; struct MessageQueue *mq; struct GNUNET_TIME_Relative timeout; if (neighbour->messages_head == NULL) return; /* nothing to do */ - min_latency = GNUNET_TIME_UNIT_FOREVER_REL; rl = NULL; mq = neighbour->messages_head; /* FIXME: support bi-directional use of TCP */ @@ -1841,11 +1849,11 @@ check_pending_validation (void *cls, * (otherwise we may be seeing a MiM attack). * * @param cls closure - * @param name name of the transport that generated the address + * @param message the pong message * @param peer who responded to our challenge - * @param challenge the challenge number we presumably used - * @param sender_addr string describing our sender address (as observed - * by the other peer in human-readable format) + * @param sender_address string describing our sender address (as observed + * by the other peer in binary format) + * @param sender_address_len number of bytes in 'sender_address' */ static void handle_pong (void *cls, const struct GNUNET_MessageHeader *message, @@ -2212,7 +2220,7 @@ add_to_foreign_address_list (void *cls, * * @param cls closure * @param peer id of the peer, NULL for last call - * @param hello hello message for the peer (can be NULL) + * @param h hello message for the peer (can be NULL) * @param trust amount of trust we have in the peer (not used) */ static void @@ -2537,6 +2545,47 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, } +/** + * Calculate how long we should delay reading from the TCP socket to + * ensure that we stay within our bandwidth limits (push back). + * + * @param n for which neighbour should this be calculated + * @return how long to delay receiving more data + */ +static struct GNUNET_TIME_Relative +calculate_throttle_delay (struct NeighbourList *n) +{ + struct GNUNET_TIME_Relative ret; + struct GNUNET_TIME_Absolute now; + uint64_t del; + uint64_t avail; + uint64_t excess; + + now = GNUNET_TIME_absolute_get (); + del = now.value - n->last_quota_update.value; + if (del > MAX_BANDWIDTH_CARRY) + { + update_quota (n, GNUNET_YES); + del = now.value - n->last_quota_update.value; + GNUNET_assert (del <= MAX_BANDWIDTH_CARRY); + } + if (n->quota_in == 0) + n->quota_in = 1; /* avoid divison by zero */ + avail = del * n->quota_in; + if (avail > n->last_received) + return GNUNET_TIME_UNIT_ZERO; /* can receive right now */ + excess = n->last_received - avail; + ret.value = excess / n->quota_in; + if (ret.value > 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n", + (unsigned long long) excess, + (unsigned long long) n->quota_in, + (unsigned long long) ret.value); + return ret; +} + + /** * Function called by the plugin for each received message. * Update data volumes, possibly notify plugins about @@ -2544,18 +2593,16 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, * and generally forward to our receive callback. * * @param cls the "struct TransportPlugin *" we gave to the plugin - * @param message the message, NULL if peer was disconnected - * @param distance the transport cost to this peer (not latency!) - * @param sender_address the address that the sender reported - * (opaque to transport service) - * @param sender_address_len the length of the sender address * @param peer (claimed) identity of the other peer - * @return the new service_context that the plugin should use - * for future receive calls for messages from this - * particular peer - * - */ -static void + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again + * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) + * @param sender_address binary address of the sender (if observed) + * @param sender_address_len number of bytes in sender_address + * @return how long the plugin should wait until receiving more data + * (plugins that do not support this, can ignore the return value) + */ +static struct GNUNET_TIME_Relative plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message, unsigned int distance, const char *sender_address, @@ -2571,99 +2618,87 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, n = find_neighbour (peer); if (n == NULL) - { - if (message == NULL) - return; /* disconnect of peer already marked down */ - n = setup_new_neighbour (peer); - } + n = setup_new_neighbour (peer); + update_quota (n, GNUNET_NO); service_context = n->plugins; while ((service_context != NULL) && (plugin != service_context->plugin)) service_context = service_context->next; GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL)); - if (message == NULL) + if (message != NULL) { + peer_address = add_peer_address(n, + plugin->short_name, + sender_address, + sender_address_len); + if (peer_address != NULL) + { + peer_address->distance = distance; + if (peer_address->connected == GNUNET_NO) + { + peer_address->connected = GNUNET_YES; + peer_address->connect_attempts++; + } + peer_address->timeout + = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + } + /* update traffic received amount ... */ + msize = ntohs (message->size); + n->distance = distance; + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_SCHEDULER_cancel (sched, + n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) + { + /* dropping message due to frequent inbound volume violations! */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING | + GNUNET_ERROR_TYPE_BULK, + _ + ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), + n->quota_violation_count); + return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */ + } + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_HELLO: + process_hello (plugin, message); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: + handle_ping(plugin, message, peer, sender_address, sender_address_len); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: + handle_pong(plugin, message, peer, sender_address, sender_address_len); + break; + default: #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Receive failed from `%4s', triggering disconnect\n", - GNUNET_i2s (&n->id)); -#endif - /* TODO: call stats */ - disconnect_neighbour (n, GNUNET_YES); - return; - } - peer_address = add_peer_address(n, - plugin->short_name, - sender_address, - sender_address_len); - if (peer_address != NULL) - { - peer_address->distance = distance; - if (peer_address->connected == GNUNET_NO) - { - peer_address->connected = GNUNET_YES; - peer_address->connect_attempts++; - } - peer_address->timeout - = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - } - /* update traffic received amount ... */ - msize = ntohs (message->size); - n->last_received += msize; - n->distance = distance; - n->peer_timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - GNUNET_SCHEDULER_cancel (sched, - n->timeout_task); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); - update_quota (n); - if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) - { - /* dropping message due to frequent inbound volume violations! */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING | - GNUNET_ERROR_TYPE_BULK, - _ - ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count); - /* TODO: call stats */ - return; - } - switch (ntohs (message->type)) - { - case GNUNET_MESSAGE_TYPE_HELLO: - process_hello (plugin, message); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: - handle_ping(plugin, message, peer, sender_address, sender_address_len); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: - handle_pong(plugin, message, peer, sender_address, sender_address_len); - break; - default: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from `%4s', sending to all clients.\n", - ntohs (message->type), GNUNET_i2s (peer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s', sending to all clients.\n", + ntohs (message->type), GNUNET_i2s (peer)); #endif - /* transmit message to all clients */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); - im->header.size = htons (sizeof (struct InboundMessage) + msize); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (n->latency); - im->peer = *peer; - memcpy (&im[1], message, msize); - cpos = clients; - while (cpos != NULL) - { - transmit_to_client (cpos, &im->header, GNUNET_YES); - cpos = cpos->next; - } - GNUNET_free (im); - } + /* transmit message to all clients */ + im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); + im->header.size = htons (sizeof (struct InboundMessage) + msize); + im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + im->latency = GNUNET_TIME_relative_hton (n->latency); + im->peer = *peer; + memcpy (&im[1], message, msize); + cpos = clients; + while (cpos != NULL) + { + transmit_to_client (cpos, &im->header, GNUNET_YES); + cpos = cpos->next; + } + GNUNET_free (im); + } + } + return calculate_throttle_delay (n); } @@ -2747,10 +2782,6 @@ handle_hello (void *cls, { int ret; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client\n", "HELLO"); -#endif ret = process_hello (NULL, message); GNUNET_SERVER_receive_done (client, ret); } @@ -2833,8 +2864,7 @@ handle_set_quota (void *cls, const struct QuotaSetMessage *qsm = (const struct QuotaSetMessage *) message; struct NeighbourList *n; - struct TransportPlugin *p; - struct ReadyList *rl; + uint32_t qin; n = find_neighbour (&qsm->peer); if (n == NULL) @@ -2842,25 +2872,16 @@ handle_set_quota (void *cls, GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } - + qin = ntohl (qsm->quota_in); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n", - "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer)); + "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer)); #endif - - update_quota (n); - if (n->quota_in < ntohl (qsm->quota_in)) + update_quota (n, GNUNET_YES); + if (n->quota_in < qin) n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_in = ntohl (qsm->quota_in); - rl = n->plugins; - while (rl != NULL) - { - p = rl->plugin; - p->api->set_receive_quota (p->api->cls, - &qsm->peer, ntohl (qsm->quota_in)); - rl = rl->next; - } + n->quota_in = qin; GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2977,8 +2998,6 @@ create_environment (struct TransportPlugin *plug) plug->env.cls = plug; plug->env.receive = &plugin_env_receive; plug->env.notify_address = &plugin_env_notify_address; - plug->env.default_quota_in = - (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); plug->env.max_connections = max_connect_per_transport; } @@ -3046,10 +3065,12 @@ client_disconnect_notification (void *cls, return; while (NULL != (mqe = pos->message_queue_head)) { - pos->message_queue_head = mqe->next; + GNUNET_CONTAINER_DLL_remove (pos->message_queue_head, + pos->message_queue_tail, + mqe); + pos->message_count--; GNUNET_free (mqe); } - pos->message_queue_head = NULL; if (prev == NULL) clients = pos->next; else @@ -3059,6 +3080,12 @@ client_disconnect_notification (void *cls, pos->client = NULL; return; } + if (pos->th != NULL) + { + GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th); + pos->th = NULL; + } + GNUNET_break (0 == pos->message_count); GNUNET_free (pos); }