X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftransport%2Fgnunet-service-transport.c;h=d654fcc8cfc89c55cd5fe4c60f31f11d682b3d7b;hb=9d2cd5be986a0732b9e5a8fcdf5acd1c7225ef2f;hp=370000cf15dcb04b63627a1de3117727e323a86e;hpb=f06ca9dcef0d602baa3a7c6462baff0dd70b2e4a;p=oweals%2Fgnunet.git diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index 370000cf1..d654fcc8c 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -863,19 +863,26 @@ find_transport (const char *short_name) /** * Update the quota values for the given neighbour now. + * + * @param n neighbour to update + * @param force GNUNET_YES to force recalculation now */ static void -update_quota (struct NeighbourList *n) +update_quota (struct NeighbourList *n, + int force) { - struct GNUNET_TIME_Relative delta; + struct GNUNET_TIME_Absolute now; + unsigned long long delta; uint64_t allowed; uint64_t remaining; - delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - if (delta.value < MIN_QUOTA_REFRESH_TIME) - return; /* not enough time passed for doing quota update */ - allowed = delta.value * n->quota_in; - + now = GNUNET_TIME_absolute_get (); + delta = now.value - n->last_quota_update.value; + allowed = n->quota_in * delta; + if ( (delta < MIN_QUOTA_REFRESH_TIME) && + (!force) && + (allowed < 32 * 1024) ) + return; /* too early, not enough data */ if (n->last_received < allowed) { remaining = allowed - n->last_received; @@ -886,7 +893,7 @@ update_quota (struct NeighbourList *n) if (remaining > MAX_BANDWIDTH_CARRY) remaining = MAX_BANDWIDTH_CARRY; n->last_received = 0; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; n->last_quota_update.value -= remaining; if (n->quota_violation_count > 0) n->quota_violation_count--; @@ -894,10 +901,10 @@ update_quota (struct NeighbourList *n) else { n->last_received -= allowed; - n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update = now; if (n->last_received > allowed) { - /* more than twice the allowed rate! */ + /* much more than the allowed rate! */ n->quota_violation_count += 10; } } @@ -1184,14 +1191,12 @@ retry_transmission_task (void *cls, static void try_transmission_to_peer (struct NeighbourList *neighbour) { - struct GNUNET_TIME_Relative min_latency; struct ReadyList *rl; struct MessageQueue *mq; struct GNUNET_TIME_Relative timeout; if (neighbour->messages_head == NULL) return; /* nothing to do */ - min_latency = GNUNET_TIME_UNIT_FOREVER_REL; rl = NULL; mq = neighbour->messages_head; /* FIXME: support bi-directional use of TCP */ @@ -1846,7 +1851,7 @@ check_pending_validation (void *cls, * @param cls closure * @param message the pong message * @param peer who responded to our challenge - * @param sender_addr string describing our sender address (as observed + * @param sender_address string describing our sender address (as observed * by the other peer in binary format) * @param sender_address_len number of bytes in 'sender_address' */ @@ -2540,6 +2545,47 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, } +/** + * Calculate how long we should delay reading from the TCP socket to + * ensure that we stay within our bandwidth limits (push back). + * + * @param n for which neighbour should this be calculated + * @return how long to delay receiving more data + */ +static struct GNUNET_TIME_Relative +calculate_throttle_delay (struct NeighbourList *n) +{ + struct GNUNET_TIME_Relative ret; + struct GNUNET_TIME_Absolute now; + uint64_t del; + uint64_t avail; + uint64_t excess; + + now = GNUNET_TIME_absolute_get (); + del = now.value - n->last_quota_update.value; + if (del > MAX_BANDWIDTH_CARRY) + { + update_quota (n, GNUNET_YES); + del = now.value - n->last_quota_update.value; + GNUNET_assert (del <= MAX_BANDWIDTH_CARRY); + } + if (n->quota_in == 0) + n->quota_in = 1; /* avoid divison by zero */ + avail = del * n->quota_in; + if (avail > n->last_received) + return GNUNET_TIME_UNIT_ZERO; /* can receive right now */ + excess = n->last_received - avail; + ret.value = excess / n->quota_in; + if (ret.value > 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n", + (unsigned long long) excess, + (unsigned long long) n->quota_in, + (unsigned long long) ret.value); + return ret; +} + + /** * Function called by the plugin for each received message. * Update data volumes, possibly notify plugins about @@ -2547,18 +2593,16 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, * and generally forward to our receive callback. * * @param cls the "struct TransportPlugin *" we gave to the plugin - * @param message the message, NULL if peer was disconnected - * @param distance the transport cost to this peer (not latency!) - * @param sender_address the address that the sender reported - * (opaque to transport service) - * @param sender_address_len the length of the sender address * @param peer (claimed) identity of the other peer - * @return the new service_context that the plugin should use - * for future receive calls for messages from this - * particular peer - * - */ -static void + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again + * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) + * @param sender_address binary address of the sender (if observed) + * @param sender_address_len number of bytes in sender_address + * @return how long the plugin should wait until receiving more data + * (plugins that do not support this, can ignore the return value) + */ +static struct GNUNET_TIME_Relative plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message, unsigned int distance, const char *sender_address, @@ -2574,99 +2618,87 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, n = find_neighbour (peer); if (n == NULL) - { - if (message == NULL) - return; /* disconnect of peer already marked down */ - n = setup_new_neighbour (peer); - } + n = setup_new_neighbour (peer); + update_quota (n, GNUNET_NO); service_context = n->plugins; while ((service_context != NULL) && (plugin != service_context->plugin)) service_context = service_context->next; GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL)); - if (message == NULL) + if (message != NULL) { + peer_address = add_peer_address(n, + plugin->short_name, + sender_address, + sender_address_len); + if (peer_address != NULL) + { + peer_address->distance = distance; + if (peer_address->connected == GNUNET_NO) + { + peer_address->connected = GNUNET_YES; + peer_address->connect_attempts++; + } + peer_address->timeout + = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + } + /* update traffic received amount ... */ + msize = ntohs (message->size); + n->distance = distance; + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_SCHEDULER_cancel (sched, + n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) + { + /* dropping message due to frequent inbound volume violations! */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING | + GNUNET_ERROR_TYPE_BULK, + _ + ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), + n->quota_violation_count); + return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */ + } + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_HELLO: + process_hello (plugin, message); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: + handle_ping(plugin, message, peer, sender_address, sender_address_len); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: + handle_pong(plugin, message, peer, sender_address, sender_address_len); + break; + default: #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Receive failed from `%4s', triggering disconnect\n", - GNUNET_i2s (&n->id)); -#endif - /* TODO: call stats */ - disconnect_neighbour (n, GNUNET_YES); - return; - } - peer_address = add_peer_address(n, - plugin->short_name, - sender_address, - sender_address_len); - if (peer_address != NULL) - { - peer_address->distance = distance; - if (peer_address->connected == GNUNET_NO) - { - peer_address->connected = GNUNET_YES; - peer_address->connect_attempts++; - } - peer_address->timeout - = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - } - /* update traffic received amount ... */ - msize = ntohs (message->size); - n->last_received += msize; - n->distance = distance; - n->peer_timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - GNUNET_SCHEDULER_cancel (sched, - n->timeout_task); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); - update_quota (n); - if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) - { - /* dropping message due to frequent inbound volume violations! */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING | - GNUNET_ERROR_TYPE_BULK, - _ - ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count); - /* TODO: call stats */ - return; - } - switch (ntohs (message->type)) - { - case GNUNET_MESSAGE_TYPE_HELLO: - process_hello (plugin, message); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: - handle_ping(plugin, message, peer, sender_address, sender_address_len); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: - handle_pong(plugin, message, peer, sender_address, sender_address_len); - break; - default: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from `%4s', sending to all clients.\n", - ntohs (message->type), GNUNET_i2s (peer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s', sending to all clients.\n", + ntohs (message->type), GNUNET_i2s (peer)); #endif - /* transmit message to all clients */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); - im->header.size = htons (sizeof (struct InboundMessage) + msize); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (n->latency); - im->peer = *peer; - memcpy (&im[1], message, msize); - cpos = clients; - while (cpos != NULL) - { - transmit_to_client (cpos, &im->header, GNUNET_YES); - cpos = cpos->next; - } - GNUNET_free (im); - } + /* transmit message to all clients */ + im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); + im->header.size = htons (sizeof (struct InboundMessage) + msize); + im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + im->latency = GNUNET_TIME_relative_hton (n->latency); + im->peer = *peer; + memcpy (&im[1], message, msize); + cpos = clients; + while (cpos != NULL) + { + transmit_to_client (cpos, &im->header, GNUNET_YES); + cpos = cpos->next; + } + GNUNET_free (im); + } + } + return calculate_throttle_delay (n); } @@ -2832,8 +2864,7 @@ handle_set_quota (void *cls, const struct QuotaSetMessage *qsm = (const struct QuotaSetMessage *) message; struct NeighbourList *n; - struct TransportPlugin *p; - struct ReadyList *rl; + uint32_t qin; n = find_neighbour (&qsm->peer); if (n == NULL) @@ -2841,25 +2872,16 @@ handle_set_quota (void *cls, GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } - + qin = ntohl (qsm->quota_in); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n", - "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer)); + "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer)); #endif - - update_quota (n); - if (n->quota_in < ntohl (qsm->quota_in)) + update_quota (n, GNUNET_YES); + if (n->quota_in < qin) n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_in = ntohl (qsm->quota_in); - rl = n->plugins; - while (rl != NULL) - { - p = rl->plugin; - p->api->set_receive_quota (p->api->cls, - &qsm->peer, ntohl (qsm->quota_in)); - rl = rl->next; - } + n->quota_in = qin; GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2976,8 +2998,6 @@ create_environment (struct TransportPlugin *plug) plug->env.cls = plug; plug->env.receive = &plugin_env_receive; plug->env.notify_address = &plugin_env_notify_address; - plug->env.default_quota_in = - (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); plug->env.max_connections = max_connect_per_transport; }