From 629ff286ace0543d2d984319fba6bacf60408890 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 1 Mar 2010 13:16:24 +0000 Subject: [PATCH] fixing API issue of who is responsible for quota in --- src/datastore/datastore.h | 2 +- src/datastore/datastore_api.c | 2 + src/include/gnunet_core_service.h | 3 +- src/transport/gnunet-service-transport.c | 263 +++++++++++++--------- src/transport/plugin_transport.h | 85 ++++--- src/transport/plugin_transport_tcp.c | 221 +++--------------- src/transport/plugin_transport_template.c | 18 -- src/transport/plugin_transport_udp.c | 18 -- src/transport/plugin_transport_udp_nat.c | 18 -- 9 files changed, 230 insertions(+), 400 deletions(-) diff --git a/src/datastore/datastore.h b/src/datastore/datastore.h index aa2646c0a..f827f8766 100644 --- a/src/datastore/datastore.h +++ b/src/datastore/datastore.h @@ -27,7 +27,7 @@ #ifndef DATASTORE_H #define DATASTORE_H -#define DEBUG_DATASTORE GNUNET_NO +#define DEBUG_DATASTORE GNUNET_YES #include "gnunet_util_lib.h" diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 064f36025..3b7c3a2ed 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -656,6 +656,7 @@ GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h, { gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode)); } + GNUNET_assert (h->response_proc == NULL); transmit_for_result (h, iter, iter_cls, timeout); } @@ -680,6 +681,7 @@ GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h, m = (struct GNUNET_MessageHeader*) &h[1]; m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM); m->size = htons(sizeof (struct GNUNET_MessageHeader)); + GNUNET_assert (h->response_proc == NULL); transmit_for_result (h, iter, iter_cls, timeout); } diff --git a/src/include/gnunet_core_service.h b/src/include/gnunet_core_service.h index 060a0f8ee..af0a4eaa9 100644 --- a/src/include/gnunet_core_service.h +++ b/src/include/gnunet_core_service.h @@ -273,7 +273,8 @@ GNUNET_CORE_peer_request_connect_cancel (struct GNUNET_CORE_PeerRequestHandle *r * @param bpm_out set to the current bandwidth limit (sending) for this peer * @param latency current latency estimate, "FOREVER" if we have been * disconnected - * @param amount set to the amount that was actually reserved or unreserved + * @param amount set to the amount that was actually reserved or unreserved; + * either the full requested amount or zero (no partial reservations) * @param preference current traffic preference for the given peer */ typedef void diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index 3c5feaf51..5048b029c 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -871,6 +871,37 @@ update_quota (struct NeighbourList *n) uint64_t allowed; uint64_t remaining; +#if 0 + struct GNUNET_TIME_Absolute now; + unsigned long long delta; + unsigned long long total_allowed; + unsigned long long total_remaining; + + now = GNUNET_TIME_absolute_get (); + delta = now.value - session->last_quota_update.value; + if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force)) + return; /* too early, not enough data */ + + total_allowed = session->quota_in * delta; + if (total_allowed > session->last_received) + { + /* got less than acceptable */ + total_remaining = total_allowed - session->last_received; + session->last_received = 0; + delta = total_remaining / session->quota_in; /* bonus seconds */ + if (delta > MAX_BANDWIDTH_CARRY) + delta = MAX_BANDWIDTH_CARRY; /* limit amount of carry-over */ + } + else + { + /* got more than acceptable */ + session->last_received -= total_allowed; + delta = 0; + } + session->last_quota_update.value = now.value - delta; +#endif + + delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); if (delta.value < MIN_QUOTA_REFRESH_TIME) return; /* not enough time passed for doing quota update */ @@ -2538,6 +2569,47 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, } +/** + * Calculate how long we should delay reading from the TCP socket to + * ensure that we stay within our bandwidth limits (push back). + * + * @param n for which neighbour should this be calculated + * @return how long to delay receiving more data + */ +static struct GNUNET_TIME_Relative +calculate_throttle_delay (struct NeighbourList *n) +{ + struct GNUNET_TIME_Relative ret; + struct GNUNET_TIME_Absolute now; + uint64_t del; + uint64_t avail; + uint64_t excess; + + now = GNUNET_TIME_absolute_get (); + del = now.value - n->last_quota_update.value; + if (del > MAX_BANDWIDTH_CARRY) + { + update_quota (n /*, GNUNET_YES*/); + del = now.value - n->last_quota_update.value; + GNUNET_assert (del <= MAX_BANDWIDTH_CARRY); + } + if (n->quota_in == 0) + n->quota_in = 1; /* avoid divison by zero */ + avail = del * n->quota_in; + if (avail > n->last_received) + return GNUNET_TIME_UNIT_ZERO; /* can receive right now */ + excess = n->last_received - avail; + ret.value = excess / n->quota_in; + if (ret.value > 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n", + (unsigned long long) excess, + (unsigned long long) n->quota_in, + (unsigned long long) ret.value); + return ret; +} + + /** * Function called by the plugin for each received message. * Update data volumes, possibly notify plugins about @@ -2545,18 +2617,16 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, * and generally forward to our receive callback. * * @param cls the "struct TransportPlugin *" we gave to the plugin - * @param message the message, NULL if peer was disconnected - * @param distance the transport cost to this peer (not latency!) - * @param sender_address the address that the sender reported - * (opaque to transport service) - * @param sender_address_len the length of the sender address * @param peer (claimed) identity of the other peer - * @return the new service_context that the plugin should use - * for future receive calls for messages from this - * particular peer - * - */ -static void + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again + * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) + * @param sender_address binary address of the sender (if observed) + * @param sender_address_len number of bytes in sender_address + * @return how long the plugin should wait until receiving more data + * (plugins that do not support this, can ignore the return value) + */ +static struct GNUNET_TIME_Relative plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message, unsigned int distance, const char *sender_address, @@ -2572,99 +2642,86 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, n = find_neighbour (peer); if (n == NULL) - { - if (message == NULL) - return; /* disconnect of peer already marked down */ - n = setup_new_neighbour (peer); - } + n = setup_new_neighbour (peer); service_context = n->plugins; while ((service_context != NULL) && (plugin != service_context->plugin)) service_context = service_context->next; GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL)); - if (message == NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Receive failed from `%4s', triggering disconnect\n", - GNUNET_i2s (&n->id)); -#endif - /* TODO: call stats */ - disconnect_neighbour (n, GNUNET_YES); - return; - } - peer_address = add_peer_address(n, - plugin->short_name, - sender_address, - sender_address_len); - if (peer_address != NULL) + if (message != NULL) { - peer_address->distance = distance; - if (peer_address->connected == GNUNET_NO) - { - peer_address->connected = GNUNET_YES; - peer_address->connect_attempts++; - } - peer_address->timeout - = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - } - /* update traffic received amount ... */ - msize = ntohs (message->size); - n->last_received += msize; - n->distance = distance; - n->peer_timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - GNUNET_SCHEDULER_cancel (sched, - n->timeout_task); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (sched, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); - update_quota (n); - if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) - { - /* dropping message due to frequent inbound volume violations! */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING | - GNUNET_ERROR_TYPE_BULK, - _ - ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count); - /* TODO: call stats */ - return; - } - switch (ntohs (message->type)) - { - case GNUNET_MESSAGE_TYPE_HELLO: - process_hello (plugin, message); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: - handle_ping(plugin, message, peer, sender_address, sender_address_len); - break; - case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: - handle_pong(plugin, message, peer, sender_address, sender_address_len); - break; - default: + peer_address = add_peer_address(n, + plugin->short_name, + sender_address, + sender_address_len); + if (peer_address != NULL) + { + peer_address->distance = distance; + if (peer_address->connected == GNUNET_NO) + { + peer_address->connected = GNUNET_YES; + peer_address->connect_attempts++; + } + peer_address->timeout + = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + } + /* update traffic received amount ... */ + msize = ntohs (message->size); + n->distance = distance; + n->peer_timeout = + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_SCHEDULER_cancel (sched, + n->timeout_task); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &neighbour_timeout_task, n); + if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) + { + /* dropping message due to frequent inbound volume violations! */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING | + GNUNET_ERROR_TYPE_BULK, + _ + ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), + n->quota_violation_count); + return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */ + } + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_HELLO: + process_hello (plugin, message); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PING: + handle_ping(plugin, message, peer, sender_address, sender_address_len); + break; + case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG: + handle_pong(plugin, message, peer, sender_address, sender_address_len); + break; + default: #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from `%4s', sending to all clients.\n", - ntohs (message->type), GNUNET_i2s (peer)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s', sending to all clients.\n", + ntohs (message->type), GNUNET_i2s (peer)); #endif - /* transmit message to all clients */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); - im->header.size = htons (sizeof (struct InboundMessage) + msize); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (n->latency); - im->peer = *peer; - memcpy (&im[1], message, msize); - cpos = clients; - while (cpos != NULL) - { - transmit_to_client (cpos, &im->header, GNUNET_YES); - cpos = cpos->next; - } - GNUNET_free (im); - } + /* transmit message to all clients */ + im = GNUNET_malloc (sizeof (struct InboundMessage) + msize); + im->header.size = htons (sizeof (struct InboundMessage) + msize); + im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); + im->latency = GNUNET_TIME_relative_hton (n->latency); + im->peer = *peer; + memcpy (&im[1], message, msize); + cpos = clients; + while (cpos != NULL) + { + transmit_to_client (cpos, &im->header, GNUNET_YES); + cpos = cpos->next; + } + GNUNET_free (im); + } + } + return calculate_throttle_delay (n); } @@ -2830,8 +2887,6 @@ handle_set_quota (void *cls, const struct QuotaSetMessage *qsm = (const struct QuotaSetMessage *) message; struct NeighbourList *n; - struct TransportPlugin *p; - struct ReadyList *rl; uint32_t qin; n = find_neighbour (&qsm->peer); @@ -2850,14 +2905,6 @@ handle_set_quota (void *cls, if (n->quota_in < qin) n->last_quota_update = GNUNET_TIME_absolute_get (); n->quota_in = qin; - rl = n->plugins; - while (rl != NULL) - { - p = rl->plugin; - p->api->set_receive_quota (p->api->cls, - &qsm->peer, qin); - rl = rl->next; - } GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2974,8 +3021,6 @@ create_environment (struct TransportPlugin *plug) plug->env.cls = plug; plug->env.receive = &plugin_env_receive; plug->env.notify_address = &plugin_env_notify_address; - plug->env.default_quota_in = - (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000); plug->env.max_connections = max_connect_per_transport; } diff --git a/src/transport/plugin_transport.h b/src/transport/plugin_transport.h index 2630ffbad..40653a618 100644 --- a/src/transport/plugin_transport.h +++ b/src/transport/plugin_transport.h @@ -26,14 +26,6 @@ * Note that the destructors of transport plugins will * be given the value returned by the constructor * and is expected to return a NULL pointer. - * - * TODO: - * - consider moving DATA message (latency measurement) - * to service; avoids encapsulation overheads and - * would enable latency measurements for non-bidi - * transports. - * - - * * @author Christian Grothoff */ #ifndef PLUGIN_TRANSPORT_H @@ -51,21 +43,24 @@ * * @param cls closure * @param peer (claimed) identity of the other peer - * @param message the message, NULL if peer was disconnected - * @param distance in overlay hops; use 1 unless DV + * @param message the message, NULL if we only care about + * learning about the delay until we should receive again + * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) * @param sender_address binary address of the sender (if observed) * @param sender_address_len number of bytes in sender_address + * @return how long the plugin should wait until receiving more data + * (plugins that do not support this, can ignore the return value) */ -typedef void (*GNUNET_TRANSPORT_PluginReceiveCallback) (void *cls, - const struct - GNUNET_PeerIdentity * - peer, - const struct - GNUNET_MessageHeader * - message, - uint32_t distance, - const char *sender_address, - size_t sender_address_len); +typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_PluginReceiveCallback) (void *cls, + const struct + GNUNET_PeerIdentity * + peer, + const struct + GNUNET_MessageHeader * + message, + uint32_t distance, + const char *sender_address, + size_t sender_address_len); /** @@ -88,6 +83,27 @@ typedef void (*GNUNET_TRANSPORT_AddressNotification) (void *cls, expires); +/** + * Function that will be called whenever the plugin receives data over + * the network and wants to determine how long it should wait until + * the next time it reads from the given peer. Note that some plugins + * (such as UDP) may not be able to wait (for a particular peer), so + * the waiting part is optional. Plugins that can wait should call + * this function, sleep the given amount of time, and call it again + * (with zero bytes read) UNTIL it returns zero and only then read. + * + * @param cls closure + * @param peer which peer did we read data from + * @param amount_recved number of bytes read (can be zero) + * @return how long to wait until reading more from this peer + * (to enforce inbound quotas) + */ +typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_TrafficReport) (void *cls, + const struct + GNUNET_PeerIdentity *peer, + size_t amount_recved); + + /** * The transport service will pass a pointer to a struct * of this type as the first and only argument to the @@ -129,10 +145,10 @@ struct GNUNET_TRANSPORT_PluginEnvironment GNUNET_TRANSPORT_AddressNotification notify_address; /** - * What is the default quota (in terms of incoming bytes per - * ms) for new connections? + * Inform service about traffic received, get information + * about when we might be willing to receive more. */ - uint32_t default_quota_in; + GNUNET_TRANSPORT_TrafficReport traffic_report; /** * What is the maximum number of connections that this transport @@ -269,21 +285,6 @@ typedef void asc, void *asc_cls); -/** - * Set a quota for receiving data from the given peer; this is a - * per-transport limit. The transport should limit its read/select - * calls to stay below the quota (in terms of incoming data). - * - * @param cls closure - * @param peer the peer for whom the quota is given - * @param quota_in quota for receiving/sending data in bytes per ms - */ -typedef void - (*GNUNET_TRANSPORT_SetQuota) (void *cls, - const struct GNUNET_PeerIdentity * target, - uint32_t quota_in); - - /** * Another peer has suggested an address for this peer and transport * plugin. Check that this could be a valid address. This function @@ -337,14 +338,6 @@ struct GNUNET_TRANSPORT_PluginFunctions */ GNUNET_TRANSPORT_AddressPrettyPrinter address_pretty_printer; - /** - * Function that the transport service can use to try to enforce a - * quota for the number of bytes received via this transport. - * Transports that can not refuse incoming data (such as UDP) - * are free to ignore these calls. - */ - GNUNET_TRANSPORT_SetQuota set_receive_quota; - /** * Function that will be called to check if a binary address * for this plugin is well-formed. If clearly needed, patch diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index 98a5f1638..2ec80f9e9 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -165,9 +165,9 @@ struct Session struct GNUNET_PeerIdentity target; /** - * At what time did we reset last_received last? + * ID of task used to delay receiving more to throttle sender. */ - struct GNUNET_TIME_Absolute last_quota_update; + GNUNET_SCHEDULER_TaskIdentifier receive_delay_task; /** * Address of the other peer (either based on our 'connect' @@ -175,18 +175,6 @@ struct Session */ void *connect_addr; - /** - * How many bytes have we received since the "last_quota_update" - * timestamp? - */ - uint64_t last_received; - - /** - * Number of bytes per ms that this peer is allowed - * to send to us. - */ - uint32_t quota_in; - /** * Length of connect_addr. */ @@ -265,28 +253,6 @@ struct Plugin }; -/** - * Find a session handle for the given peer. - * FIXME: using a hash map we could do this in O(1). - * - * @return NULL if no matching session exists - */ -static struct Session * -find_session_by_target (struct Plugin *plugin, - const struct GNUNET_PeerIdentity *target) -{ - struct Session *ret; - - ret = plugin->sessions; - while ( (ret != NULL) && - ((GNUNET_SYSERR == ret->expecting_welcome) || - (0 != memcmp (target, - &ret->target, sizeof (struct GNUNET_PeerIdentity))))) - ret = ret->next; - return ret; -} - - /** * Find the session handle for the given client. * @@ -332,9 +298,6 @@ create_session (struct Plugin *plugin, plugin->sessions = ret; ret->client = client; ret->target = *target; - ret->last_quota_update = GNUNET_TIME_absolute_get (); - // FIXME: This is simply wrong... - ret->quota_in = plugin->env->default_quota_in; ret->expecting_welcome = GNUNET_YES; pm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (struct WelcomeMessage)); pm->msg = (const char*) &pm[1]; @@ -534,18 +497,22 @@ disconnect_session (struct Session *session) transport service may know about this one, so we need to notify transport service about disconnect */ // FIXME: we should have a very clear connect-disconnect - // protocol with gnunet-service-transport! - session->plugin->env->receive (session->plugin->env->cls, - &session->target, NULL, - 1, - session->connect_addr, - session->connect_alen); + // protocol with gnunet-service-transport! + // FIXME: but this is not possible for all plugins, so what gives? + } + if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (session->plugin->env->sched, + session->receive_delay_task); + if (session->client != NULL) + GNUNET_SERVER_receive_done (session->client, + GNUNET_SYSERR); } if (session->client != NULL) { GNUNET_SERVER_client_drop (session->client); session->client = NULL; - } + } GNUNET_free_non_null (session->connect_addr); GNUNET_free (session); } @@ -845,98 +812,6 @@ tcp_plugin_address_pretty_printer (void *cls, } -/** - * Update the last-received and bandwidth quota values - * for this session. - * - * @param session session to update - * @param force set to GNUNET_YES if we should update even - * though the minimum refresh time has not yet expired - */ -static void -update_quota (struct Session *session, int force) -{ - struct GNUNET_TIME_Absolute now; - unsigned long long delta; - unsigned long long total_allowed; - unsigned long long total_remaining; - - now = GNUNET_TIME_absolute_get (); - delta = now.value - session->last_quota_update.value; - if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force)) - return; /* too early, not enough data */ - - total_allowed = session->quota_in * delta; - if (total_allowed > session->last_received) - { - /* got less than acceptable */ - total_remaining = total_allowed - session->last_received; - session->last_received = 0; - delta = total_remaining / session->quota_in; /* bonus seconds */ - if (delta > MAX_BANDWIDTH_CARRY) - delta = MAX_BANDWIDTH_CARRY; /* limit amount of carry-over */ - } - else - { - /* got more than acceptable */ - session->last_received -= total_allowed; - delta = 0; - } - session->last_quota_update.value = now.value - delta; -} - - -/** - * Set a quota for receiving data from the given peer; this is a - * per-transport limit. The transport should limit its read/select - * calls to stay below the quota (in terms of incoming data). - * - * @param cls closure - * @param target the peer for whom the quota is given - * @param quota_in quota for receiving/sending data in bytes per ms - */ -static void -tcp_plugin_set_receive_quota (void *cls, - const struct GNUNET_PeerIdentity *target, - uint32_t quota_in) -{ - struct Plugin *plugin = cls; - struct Session *session; - - // FIXME: This is simply wrong: - // We may have multiple sessions for the target, - // and some OTHER session might be the one to - // survive; not to mention the inbound-quota should - // be enforced across transports! - // => keep quota-related states in the service (globally, per peer) - // and update/query the information when it is needed! - // => we can likely get rid of this entire function and - // replace it with a query/update API! - session = find_session_by_target (plugin, target); - if (session == NULL) - { - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "tcp", - "Could not find session for peer `%4s' to update quota.\n", - GNUNET_i2s (target)); - return; /* peer must have disconnected, ignore */ - } - if (session->quota_in != quota_in) - { - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "tcp", - "Changing quota for peer `%4s' from %u to %u\n", - GNUNET_i2s (target), - session->quota_in, - quota_in); - update_quota (session, GNUNET_YES); - if (session->quota_in > quota_in) - session->last_quota_update = GNUNET_TIME_absolute_get (); - session->quota_in = quota_in; - } -} - - /** * Check if the given port is plausible (must be either * our listen port or our advertised port). If it is @@ -1072,46 +947,6 @@ handle_tcp_welcome (void *cls, } -/** - * Calculate how long we should delay reading from the TCP socket to - * ensure that we stay within our bandwidth limits (push back). - * - * @param session for which client should this be calculated - */ -static struct GNUNET_TIME_Relative -calculate_throttle_delay (struct Session *session) -{ - struct GNUNET_TIME_Relative ret; - struct GNUNET_TIME_Absolute now; - uint64_t del; - uint64_t avail; - uint64_t excess; - - now = GNUNET_TIME_absolute_get (); - del = now.value - session->last_quota_update.value; - if (del > MAX_BANDWIDTH_CARRY) - { - update_quota (session, GNUNET_YES); - del = now.value - session->last_quota_update.value; - GNUNET_assert (del <= MAX_BANDWIDTH_CARRY); - } - if (session->quota_in == 0) - session->quota_in = 1; /* avoid divison by zero */ - avail = del * session->quota_in; - if (avail > session->last_received) - return GNUNET_TIME_UNIT_ZERO; /* can receive right now */ - excess = session->last_received - avail; - ret.value = excess / session->quota_in; - GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, - "tcp", - "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n", - (unsigned long long) excess, - (unsigned long long) session->quota_in, - (unsigned long long) ret.value); - return ret; -} - - /** * Task to signal the server that we can continue * receiving from the TCP client now. @@ -1120,7 +955,18 @@ static void delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Session *session = cls; - GNUNET_SERVER_receive_done (session->client, GNUNET_OK); + struct GNUNET_TIME_Relative delay; + + session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK; + delay = session->plugin->env->receive (session->plugin->env->cls, + &session->target, + NULL, 0, NULL, 0); + if (delay.value == 0) + GNUNET_SERVER_receive_done (session->client, GNUNET_OK); + else + session->receive_delay_task = + GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched, + delay, &delayed_done, session); } @@ -1165,18 +1011,16 @@ handle_tcp_data (void *cls, (unsigned int) ntohs (message->type), GNUNET_i2s (&session->target)); #endif - plugin->env->receive (plugin->env->cls, &session->target, message, 1, - session->connect_addr, - session->connect_alen); - /* update bandwidth used */ - session->last_received += msize; - update_quota (session, GNUNET_NO); - delay = calculate_throttle_delay (session); + delay = plugin->env->receive (plugin->env->cls, &session->target, message, 1, + session->connect_addr, + session->connect_alen); + if (delay.value == 0) GNUNET_SERVER_receive_done (client, GNUNET_OK); else - GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched, - delay, &delayed_done, session); + session->receive_delay_task = + GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched, + delay, &delayed_done, session); } @@ -1342,7 +1186,6 @@ libgnunet_plugin_transport_tcp_init (void *cls) api->send = &tcp_plugin_send; api->disconnect = &tcp_plugin_disconnect; api->address_pretty_printer = &tcp_plugin_address_pretty_printer; - api->set_receive_quota = &tcp_plugin_set_receive_quota; api->check_address = &tcp_plugin_check_address; plugin->service = service; plugin->server = GNUNET_SERVICE_get_server (service); diff --git a/src/transport/plugin_transport_template.c b/src/transport/plugin_transport_template.c index fc1c1722b..6ad555a51 100644 --- a/src/transport/plugin_transport_template.c +++ b/src/transport/plugin_transport_template.c @@ -219,23 +219,6 @@ template_plugin_address_pretty_printer (void *cls, asc (asc_cls, NULL); } -/** - * Set a quota for receiving data from the given peer; this is a - * per-transport limit. The transport should limit its read/select - * calls to stay below the quota (in terms of incoming data). - * - * @param cls closure - * @param target the peer for whom the quota is given - * @param quota_in quota for receiving/sending data in bytes per ms - */ -static void -template_plugin_set_receive_quota (void *cls, - const struct GNUNET_PeerIdentity *target, - uint32_t quota_in) -{ - // struct Plugin *plugin = cls; - // FIXME! -} /** @@ -280,7 +263,6 @@ gnunet_plugin_transport_template_init (void *cls) api->send = &template_plugin_send; api->disconnect = &template_plugin_disconnect; api->address_pretty_printer = &template_plugin_address_pretty_printer; - api->set_receive_quota = &template_plugin_set_receive_quota; api->check_address = &template_plugin_address_suggested; return api; } diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index 1cfa1ef43..c00ba1298 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -682,23 +682,6 @@ udp_plugin_address_pretty_printer (void *cls, !numeric, timeout, &append_port, ppc); } -/** - * Set a quota for receiving data from the given peer; this is a - * per-transport limit. This call has no meaning for UDP, as if - * we don't receive data it still comes in. UDP has no friendliness - * guarantees, and our buffers will fill at some level. - * - * @param cls closure - * @param target the peer for whom the quota is given - * @param quota_in quota for receiving/sending data in bytes per ms - */ -static void -udp_plugin_set_receive_quota (void *cls, - const struct GNUNET_PeerIdentity *target, - uint32_t quota_in) -{ - return; /* Do nothing */ -} /** * The exported method. Makes the core api available via a global and @@ -766,7 +749,6 @@ libgnunet_plugin_transport_udp_init (void *cls) api->send = &udp_plugin_send; api->disconnect = &udp_disconnect; api->address_pretty_printer = &udp_plugin_address_pretty_printer; - api->set_receive_quota = &udp_plugin_set_receive_quota; api->check_address = &udp_check_address; plugin->service = service; diff --git a/src/transport/plugin_transport_udp_nat.c b/src/transport/plugin_transport_udp_nat.c index a4e6d1fb0..8bf5ac06b 100644 --- a/src/transport/plugin_transport_udp_nat.c +++ b/src/transport/plugin_transport_udp_nat.c @@ -1595,23 +1595,6 @@ udp_nat_plugin_address_pretty_printer (void *cls, !numeric, timeout, &append_port, ppc); } -/** - * Set a quota for receiving data from the given peer; this is a - * per-transport limit. This call has no meaning for UDP, as if - * we don't receive data it still comes in. UDP has no friendliness - * guarantees, and our buffers will fill at some level. - * - * @param cls closure - * @param target the peer for whom the quota is given - * @param quota_in quota for receiving/sending data in bytes per ms - */ -static void -udp_nat_plugin_set_receive_quota (void *cls, - const struct GNUNET_PeerIdentity *target, - uint32_t quota_in) -{ - return; /* Do nothing */ -} /** * The exported method. Makes the core api available via a global and @@ -1715,7 +1698,6 @@ libgnunet_plugin_transport_udp_nat_init (void *cls) api->send = &udp_nat_plugin_send; api->disconnect = &udp_nat_disconnect; api->address_pretty_printer = &udp_nat_plugin_address_pretty_printer; - api->set_receive_quota = &udp_nat_plugin_set_receive_quota; api->check_address = &udp_nat_check_address; plugin->service = service; -- 2.25.1