From 87eac53659e293cf7366ca735cb5d1730b3fa19e Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Tue, 5 Oct 2010 11:53:18 +0000 Subject: [PATCH] implemented inbound quota regulation --- src/transport/plugin_transport_http.c | 99 +++++++++++++++++++++++---- 1 file changed, 86 insertions(+), 13 deletions(-) diff --git a/src/transport/plugin_transport_http.c b/src/transport/plugin_transport_http.c index eaa7b1199..675645aa3 100644 --- a/src/transport/plugin_transport_http.c +++ b/src/transport/plugin_transport_http.c @@ -223,6 +223,16 @@ struct HTTP_PeerContext * Last session used to send data */ struct Session * last_session; + + /** + * The task resetting inbound quota delay + */ + GNUNET_SCHEDULER_TaskIdentifier reset_task; + + /** + * Delay from transport service inbound quota tracker when to receive data again + */ + struct GNUNET_TIME_Relative delay; }; @@ -516,6 +526,27 @@ static int send_check_connections (struct Plugin *plugin, struct Session *ps); */ static int curl_schedule (struct Plugin *plugin); +/** + * Task scheduled to reset the inbound quota delay for a specific peer + * @param cls plugin as closure + * @param tc task context + */ +static void reset_inbound_quota_delay (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct HTTP_PeerContext * pc; + + GNUNET_assert(cls !=NULL); + + pc = (struct HTTP_PeerContext *) cls; + pc->reset_task = GNUNET_SCHEDULER_NO_TASK; + + if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) + return; + + pc->delay = GNUNET_TIME_relative_get_zero (); +} + /** * Creates a valid url from passed address and id @@ -824,7 +855,7 @@ static void mhd_write_mst_cb (void *cls, void *client, const struct GNUNET_MessageHeader *message) { - + struct GNUNET_TIME_Relative delay; struct Session *ps = cls; GNUNET_assert(ps != NULL); @@ -838,11 +869,22 @@ static void mhd_write_mst_cb (void *cls, ntohs(message->size), GNUNET_i2s(&(ps->peercontext)->identity),http_plugin_address_to_string(NULL,ps->addr,ps->addrlen)); #endif - pc->plugin->env->receive (ps->peercontext->plugin->env->cls, - &pc->identity, - message, 1, ps, - NULL, - 0); + delay = pc->plugin->env->receive (ps->peercontext->plugin->env->cls, + &pc->identity, + message, 1, ps, + NULL, + 0); + pc->delay = delay; + if (pc->reset_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (pc->plugin->env->sched, pc->reset_task); + + if (delay.value > 0) + { +#if DEBUG_HTTP + GNUNET_log (GNUNET_ERROR_TYPE_ERROR,"Connection %X: Inbound quota management: delay next read for %llu ms \n", ps, delay.value); +#endif + pc->reset_task = GNUNET_SCHEDULER_add_delayed (pc->plugin->env->sched, delay, &reset_inbound_quota_delay, pc); + } } /** @@ -1133,8 +1175,17 @@ mdh_access_cb (void *cls, /* Recieving data */ if ((*upload_data_size > 0) && (ps->recv_active == GNUNET_YES)) { - res = GNUNET_SERVER_mst_receive(ps->msgtok, ps, upload_data,*upload_data_size, GNUNET_NO, GNUNET_NO); - (*upload_data_size) = 0; + if (pc->delay.value == 0) + { + res = GNUNET_SERVER_mst_receive(ps->msgtok, ps, upload_data,*upload_data_size, GNUNET_NO, GNUNET_NO); + (*upload_data_size) = 0; + } + else + { +#if DEBUG_HTTP + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n", ps, ps->peercontext->delay.value); +#endif + } return MHD_YES; } else @@ -1504,6 +1555,7 @@ static void curl_receive_mst_cb (void *cls, const struct GNUNET_MessageHeader *message) { struct Session *ps = cls; + struct GNUNET_TIME_Relative delay; GNUNET_assert(ps != NULL); struct HTTP_PeerContext *pc = ps->peercontext; @@ -1516,11 +1568,23 @@ static void curl_receive_mst_cb (void *cls, ntohs(message->size), GNUNET_i2s(&(pc->identity)),http_plugin_address_to_string(NULL,ps->addr,ps->addrlen)); #endif - pc->plugin->env->receive (pc->plugin->env->cls, - &pc->identity, - message, 1, ps, - ps->addr, - ps->addrlen); + delay = pc->plugin->env->receive (pc->plugin->env->cls, + &pc->identity, + message, 1, ps, + ps->addr, + ps->addrlen); + + pc->delay = delay; + if (pc->reset_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (pc->plugin->env->sched, pc->reset_task); + + if (delay.value > 0) + { +#if DEBUG_HTTP + GNUNET_log (GNUNET_ERROR_TYPE_ERROR,"Connection %X: Inbound quota management: delay next read for %llu ms \n", ps, delay.value); +#endif + pc->reset_task = GNUNET_SCHEDULER_add_delayed (pc->plugin->env->sched, delay, &reset_inbound_quota_delay, pc); + } } @@ -1536,6 +1600,15 @@ static void curl_receive_mst_cb (void *cls, static size_t curl_receive_cb( void *stream, size_t size, size_t nmemb, void *ptr) { struct Session * ps = ptr; + + if (ps->peercontext->delay.value > 0) + { +#if DEBUG_HTTP + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n", ps, ps->peercontext->delay.value); +#endif + return (0); + } + #if DEBUG_CONNECTIONS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: %u bytes received\n",ps, size*nmemb); #endif -- 2.25.1