* Last session used to send data
*/
struct Session * last_session;
+
+ /**
+ * The task resetting inbound quota delay
+ */
+ GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
+ /**
+ * Delay from transport service inbound quota tracker when to receive data again
+ */
+ struct GNUNET_TIME_Relative delay;
};
*/
static int curl_schedule (struct Plugin *plugin);
+/**
+ * Task scheduled to reset the inbound quota delay for a specific peer
+ * @param cls plugin as closure
+ * @param tc task context
+ */
+static void reset_inbound_quota_delay (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct HTTP_PeerContext * pc;
+
+ GNUNET_assert(cls !=NULL);
+
+ pc = (struct HTTP_PeerContext *) cls;
+ pc->reset_task = GNUNET_SCHEDULER_NO_TASK;
+
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+
+ pc->delay = GNUNET_TIME_relative_get_zero ();
+}
+
/**
* Creates a valid url from passed address and id
void *client,
const struct GNUNET_MessageHeader *message)
{
-
+ struct GNUNET_TIME_Relative delay;
struct Session *ps = cls;
GNUNET_assert(ps != NULL);
ntohs(message->size),
GNUNET_i2s(&(ps->peercontext)->identity),http_plugin_address_to_string(NULL,ps->addr,ps->addrlen));
#endif
- pc->plugin->env->receive (ps->peercontext->plugin->env->cls,
- &pc->identity,
- message, 1, ps,
- NULL,
- 0);
+ delay = pc->plugin->env->receive (ps->peercontext->plugin->env->cls,
+ &pc->identity,
+ message, 1, ps,
+ NULL,
+ 0);
+ pc->delay = delay;
+ if (pc->reset_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (pc->plugin->env->sched, pc->reset_task);
+
+ if (delay.value > 0)
+ {
+#if DEBUG_HTTP
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,"Connection %X: Inbound quota management: delay next read for %llu ms \n", ps, delay.value);
+#endif
+ pc->reset_task = GNUNET_SCHEDULER_add_delayed (pc->plugin->env->sched, delay, &reset_inbound_quota_delay, pc);
+ }
}
/**
/* Recieving data */
if ((*upload_data_size > 0) && (ps->recv_active == GNUNET_YES))
{
- res = GNUNET_SERVER_mst_receive(ps->msgtok, ps, upload_data,*upload_data_size, GNUNET_NO, GNUNET_NO);
- (*upload_data_size) = 0;
+ if (pc->delay.value == 0)
+ {
+ res = GNUNET_SERVER_mst_receive(ps->msgtok, ps, upload_data,*upload_data_size, GNUNET_NO, GNUNET_NO);
+ (*upload_data_size) = 0;
+ }
+ else
+ {
+#if DEBUG_HTTP
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n", ps, ps->peercontext->delay.value);
+#endif
+ }
return MHD_YES;
}
else
const struct GNUNET_MessageHeader *message)
{
struct Session *ps = cls;
+ struct GNUNET_TIME_Relative delay;
GNUNET_assert(ps != NULL);
struct HTTP_PeerContext *pc = ps->peercontext;
ntohs(message->size),
GNUNET_i2s(&(pc->identity)),http_plugin_address_to_string(NULL,ps->addr,ps->addrlen));
#endif
- pc->plugin->env->receive (pc->plugin->env->cls,
- &pc->identity,
- message, 1, ps,
- ps->addr,
- ps->addrlen);
+ delay = pc->plugin->env->receive (pc->plugin->env->cls,
+ &pc->identity,
+ message, 1, ps,
+ ps->addr,
+ ps->addrlen);
+
+ pc->delay = delay;
+ if (pc->reset_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (pc->plugin->env->sched, pc->reset_task);
+
+ if (delay.value > 0)
+ {
+#if DEBUG_HTTP
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,"Connection %X: Inbound quota management: delay next read for %llu ms \n", ps, delay.value);
+#endif
+ pc->reset_task = GNUNET_SCHEDULER_add_delayed (pc->plugin->env->sched, delay, &reset_inbound_quota_delay, pc);
+ }
}
static size_t curl_receive_cb( void *stream, size_t size, size_t nmemb, void *ptr)
{
struct Session * ps = ptr;
+
+ if (ps->peercontext->delay.value > 0)
+ {
+#if DEBUG_HTTP
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: no inbound bandwidth available! Next read was delayed for %llu ms\n", ps, ps->peercontext->delay.value);
+#endif
+ return (0);
+ }
+
#if DEBUG_CONNECTIONS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,"Connection %X: %u bytes received\n",ps, size*nmemb);
#endif