#ifndef DATASTORE_H
#define DATASTORE_H
-#define DEBUG_DATASTORE GNUNET_NO
+#define DEBUG_DATASTORE GNUNET_YES
#include "gnunet_util_lib.h"
{
gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
}
+ GNUNET_assert (h->response_proc == NULL);
transmit_for_result (h, iter, iter_cls, timeout);
}
m = (struct GNUNET_MessageHeader*) &h[1];
m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
m->size = htons(sizeof (struct GNUNET_MessageHeader));
+ GNUNET_assert (h->response_proc == NULL);
transmit_for_result (h, iter, iter_cls, timeout);
}
* @param bpm_out set to the current bandwidth limit (sending) for this peer
* @param latency current latency estimate, "FOREVER" if we have been
* disconnected
- * @param amount set to the amount that was actually reserved or unreserved
+ * @param amount set to the amount that was actually reserved or unreserved;
+ * either the full requested amount or zero (no partial reservations)
* @param preference current traffic preference for the given peer
*/
typedef void
uint64_t allowed;
uint64_t remaining;
+#if 0
+ struct GNUNET_TIME_Absolute now;
+ unsigned long long delta;
+ unsigned long long total_allowed;
+ unsigned long long total_remaining;
+
+ now = GNUNET_TIME_absolute_get ();
+ delta = now.value - session->last_quota_update.value;
+ if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force))
+ return; /* too early, not enough data */
+
+ total_allowed = session->quota_in * delta;
+ if (total_allowed > session->last_received)
+ {
+ /* got less than acceptable */
+ total_remaining = total_allowed - session->last_received;
+ session->last_received = 0;
+ delta = total_remaining / session->quota_in; /* bonus seconds */
+ if (delta > MAX_BANDWIDTH_CARRY)
+ delta = MAX_BANDWIDTH_CARRY; /* limit amount of carry-over */
+ }
+ else
+ {
+ /* got more than acceptable */
+ session->last_received -= total_allowed;
+ delta = 0;
+ }
+ session->last_quota_update.value = now.value - delta;
+#endif
+
+
delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
if (delta.value < MIN_QUOTA_REFRESH_TIME)
return; /* not enough time passed for doing quota update */
}
+/**
+ * Calculate how long we should delay reading from the TCP socket to
+ * ensure that we stay within our bandwidth limits (push back).
+ *
+ * @param n for which neighbour should this be calculated
+ * @return how long to delay receiving more data
+ */
+static struct GNUNET_TIME_Relative
+calculate_throttle_delay (struct NeighbourList *n)
+{
+ struct GNUNET_TIME_Relative ret;
+ struct GNUNET_TIME_Absolute now;
+ uint64_t del;
+ uint64_t avail;
+ uint64_t excess;
+
+ now = GNUNET_TIME_absolute_get ();
+ del = now.value - n->last_quota_update.value;
+ if (del > MAX_BANDWIDTH_CARRY)
+ {
+ update_quota (n /*, GNUNET_YES*/);
+ del = now.value - n->last_quota_update.value;
+ GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
+ }
+ if (n->quota_in == 0)
+ n->quota_in = 1; /* avoid divison by zero */
+ avail = del * n->quota_in;
+ if (avail > n->last_received)
+ return GNUNET_TIME_UNIT_ZERO; /* can receive right now */
+ excess = n->last_received - avail;
+ ret.value = excess / n->quota_in;
+ if (ret.value > 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
+ (unsigned long long) excess,
+ (unsigned long long) n->quota_in,
+ (unsigned long long) ret.value);
+ return ret;
+}
+
+
/**
* Function called by the plugin for each received message.
* Update data volumes, possibly notify plugins about
* and generally forward to our receive callback.
*
* @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- * (opaque to transport service)
- * @param sender_address_len the length of the sender address
* @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- * for future receive calls for messages from this
- * particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ * (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
unsigned int distance, const char *sender_address,
n = find_neighbour (peer);
if (n == NULL)
- {
- if (message == NULL)
- return; /* disconnect of peer already marked down */
- n = setup_new_neighbour (peer);
- }
+ n = setup_new_neighbour (peer);
service_context = n->plugins;
while ((service_context != NULL) && (plugin != service_context->plugin))
service_context = service_context->next;
GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
- if (message == NULL)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Receive failed from `%4s', triggering disconnect\n",
- GNUNET_i2s (&n->id));
-#endif
- /* TODO: call stats */
- disconnect_neighbour (n, GNUNET_YES);
- return;
- }
- peer_address = add_peer_address(n,
- plugin->short_name,
- sender_address,
- sender_address_len);
- if (peer_address != NULL)
+ if (message != NULL)
{
- peer_address->distance = distance;
- if (peer_address->connected == GNUNET_NO)
- {
- peer_address->connected = GNUNET_YES;
- peer_address->connect_attempts++;
- }
- peer_address->timeout
- =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- }
- /* update traffic received amount ... */
- msize = ntohs (message->size);
- n->last_received += msize;
- n->distance = distance;
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- GNUNET_SCHEDULER_cancel (sched,
- n->timeout_task);
- n->timeout_task =
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbour_timeout_task, n);
- update_quota (n);
- if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
- {
- /* dropping message due to frequent inbound volume violations! */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
- GNUNET_ERROR_TYPE_BULK,
- _
- ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
- /* TODO: call stats */
- return;
- }
- switch (ntohs (message->type))
- {
- case GNUNET_MESSAGE_TYPE_HELLO:
- process_hello (plugin, message);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
- handle_ping(plugin, message, peer, sender_address, sender_address_len);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
- handle_pong(plugin, message, peer, sender_address, sender_address_len);
- break;
- default:
+ peer_address = add_peer_address(n,
+ plugin->short_name,
+ sender_address,
+ sender_address_len);
+ if (peer_address != NULL)
+ {
+ peer_address->distance = distance;
+ if (peer_address->connected == GNUNET_NO)
+ {
+ peer_address->connected = GNUNET_YES;
+ peer_address->connect_attempts++;
+ }
+ peer_address->timeout
+ =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ }
+ /* update traffic received amount ... */
+ msize = ntohs (message->size);
+ n->distance = distance;
+ n->peer_timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_SCHEDULER_cancel (sched,
+ n->timeout_task);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+ {
+ /* dropping message due to frequent inbound volume violations! */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"),
+ n->quota_violation_count);
+ return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+ }
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_HELLO:
+ process_hello (plugin, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+ handle_ping(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+ handle_pong(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ default:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from `%4s', sending to all clients.\n",
- ntohs (message->type), GNUNET_i2s (peer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s', sending to all clients.\n",
+ ntohs (message->type), GNUNET_i2s (peer));
#endif
- /* transmit message to all clients */
- im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
- im->header.size = htons (sizeof (struct InboundMessage) + msize);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = GNUNET_TIME_relative_hton (n->latency);
- im->peer = *peer;
- memcpy (&im[1], message, msize);
- cpos = clients;
- while (cpos != NULL)
- {
- transmit_to_client (cpos, &im->header, GNUNET_YES);
- cpos = cpos->next;
- }
- GNUNET_free (im);
- }
+ /* transmit message to all clients */
+ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+ im->header.size = htons (sizeof (struct InboundMessage) + msize);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->latency = GNUNET_TIME_relative_hton (n->latency);
+ im->peer = *peer;
+ memcpy (&im[1], message, msize);
+ cpos = clients;
+ while (cpos != NULL)
+ {
+ transmit_to_client (cpos, &im->header, GNUNET_YES);
+ cpos = cpos->next;
+ }
+ GNUNET_free (im);
+ }
+ }
+ return calculate_throttle_delay (n);
}
const struct QuotaSetMessage *qsm =
(const struct QuotaSetMessage *) message;
struct NeighbourList *n;
- struct TransportPlugin *p;
- struct ReadyList *rl;
uint32_t qin;
n = find_neighbour (&qsm->peer);
if (n->quota_in < qin)
n->last_quota_update = GNUNET_TIME_absolute_get ();
n->quota_in = qin;
- rl = n->plugins;
- while (rl != NULL)
- {
- p = rl->plugin;
- p->api->set_receive_quota (p->api->cls,
- &qsm->peer, qin);
- rl = rl->next;
- }
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
plug->env.cls = plug;
plug->env.receive = &plugin_env_receive;
plug->env.notify_address = &plugin_env_notify_address;
- plug->env.default_quota_in =
- (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
plug->env.max_connections = max_connect_per_transport;
}
* Note that the destructors of transport plugins will
* be given the value returned by the constructor
* and is expected to return a NULL pointer.
- *
- * TODO:
- * - consider moving DATA message (latency measurement)
- * to service; avoids encapsulation overheads and
- * would enable latency measurements for non-bidi
- * transports.
- * -
- *
* @author Christian Grothoff
*/
#ifndef PLUGIN_TRANSPORT_H
*
* @param cls closure
* @param peer (claimed) identity of the other peer
- * @param message the message, NULL if peer was disconnected
- * @param distance in overlay hops; use 1 unless DV
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
* @param sender_address binary address of the sender (if observed)
* @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ * (plugins that do not support this, can ignore the return value)
*/
-typedef void (*GNUNET_TRANSPORT_PluginReceiveCallback) (void *cls,
- const struct
- GNUNET_PeerIdentity *
- peer,
- const struct
- GNUNET_MessageHeader *
- message,
- uint32_t distance,
- const char *sender_address,
- size_t sender_address_len);
+typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_PluginReceiveCallback) (void *cls,
+ const struct
+ GNUNET_PeerIdentity *
+ peer,
+ const struct
+ GNUNET_MessageHeader *
+ message,
+ uint32_t distance,
+ const char *sender_address,
+ size_t sender_address_len);
/**
expires);
+/**
+ * Function that will be called whenever the plugin receives data over
+ * the network and wants to determine how long it should wait until
+ * the next time it reads from the given peer. Note that some plugins
+ * (such as UDP) may not be able to wait (for a particular peer), so
+ * the waiting part is optional. Plugins that can wait should call
+ * this function, sleep the given amount of time, and call it again
+ * (with zero bytes read) UNTIL it returns zero and only then read.
+ *
+ * @param cls closure
+ * @param peer which peer did we read data from
+ * @param amount_recved number of bytes read (can be zero)
+ * @return how long to wait until reading more from this peer
+ * (to enforce inbound quotas)
+ */
+typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_TrafficReport) (void *cls,
+ const struct
+ GNUNET_PeerIdentity *peer,
+ size_t amount_recved);
+
+
/**
* The transport service will pass a pointer to a struct
* of this type as the first and only argument to the
GNUNET_TRANSPORT_AddressNotification notify_address;
/**
- * What is the default quota (in terms of incoming bytes per
- * ms) for new connections?
+ * Inform service about traffic received, get information
+ * about when we might be willing to receive more.
*/
- uint32_t default_quota_in;
+ GNUNET_TRANSPORT_TrafficReport traffic_report;
/**
* What is the maximum number of connections that this transport
asc, void *asc_cls);
-/**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit. The transport should limit its read/select
- * calls to stay below the quota (in terms of incoming data).
- *
- * @param cls closure
- * @param peer the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-typedef void
- (*GNUNET_TRANSPORT_SetQuota) (void *cls,
- const struct GNUNET_PeerIdentity * target,
- uint32_t quota_in);
-
-
/**
* Another peer has suggested an address for this peer and transport
* plugin. Check that this could be a valid address. This function
*/
GNUNET_TRANSPORT_AddressPrettyPrinter address_pretty_printer;
- /**
- * Function that the transport service can use to try to enforce a
- * quota for the number of bytes received via this transport.
- * Transports that can not refuse incoming data (such as UDP)
- * are free to ignore these calls.
- */
- GNUNET_TRANSPORT_SetQuota set_receive_quota;
-
/**
* Function that will be called to check if a binary address
* for this plugin is well-formed. If clearly needed, patch
struct GNUNET_PeerIdentity target;
/**
- * At what time did we reset last_received last?
+ * ID of task used to delay receiving more to throttle sender.
*/
- struct GNUNET_TIME_Absolute last_quota_update;
+ GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
/**
* Address of the other peer (either based on our 'connect'
*/
void *connect_addr;
- /**
- * How many bytes have we received since the "last_quota_update"
- * timestamp?
- */
- uint64_t last_received;
-
- /**
- * Number of bytes per ms that this peer is allowed
- * to send to us.
- */
- uint32_t quota_in;
-
/**
* Length of connect_addr.
*/
};
-/**
- * Find a session handle for the given peer.
- * FIXME: using a hash map we could do this in O(1).
- *
- * @return NULL if no matching session exists
- */
-static struct Session *
-find_session_by_target (struct Plugin *plugin,
- const struct GNUNET_PeerIdentity *target)
-{
- struct Session *ret;
-
- ret = plugin->sessions;
- while ( (ret != NULL) &&
- ((GNUNET_SYSERR == ret->expecting_welcome) ||
- (0 != memcmp (target,
- &ret->target, sizeof (struct GNUNET_PeerIdentity)))))
- ret = ret->next;
- return ret;
-}
-
-
/**
* Find the session handle for the given client.
*
plugin->sessions = ret;
ret->client = client;
ret->target = *target;
- ret->last_quota_update = GNUNET_TIME_absolute_get ();
- // FIXME: This is simply wrong...
- ret->quota_in = plugin->env->default_quota_in;
ret->expecting_welcome = GNUNET_YES;
pm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (struct WelcomeMessage));
pm->msg = (const char*) &pm[1];
transport service may know about this one, so we need to
notify transport service about disconnect */
// FIXME: we should have a very clear connect-disconnect
- // protocol with gnunet-service-transport!
- session->plugin->env->receive (session->plugin->env->cls,
- &session->target, NULL,
- 1,
- session->connect_addr,
- session->connect_alen);
+ // protocol with gnunet-service-transport!
+ // FIXME: but this is not possible for all plugins, so what gives?
+ }
+ if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (session->plugin->env->sched,
+ session->receive_delay_task);
+ if (session->client != NULL)
+ GNUNET_SERVER_receive_done (session->client,
+ GNUNET_SYSERR);
}
if (session->client != NULL)
{
GNUNET_SERVER_client_drop (session->client);
session->client = NULL;
- }
+ }
GNUNET_free_non_null (session->connect_addr);
GNUNET_free (session);
}
}
-/**
- * Update the last-received and bandwidth quota values
- * for this session.
- *
- * @param session session to update
- * @param force set to GNUNET_YES if we should update even
- * though the minimum refresh time has not yet expired
- */
-static void
-update_quota (struct Session *session, int force)
-{
- struct GNUNET_TIME_Absolute now;
- unsigned long long delta;
- unsigned long long total_allowed;
- unsigned long long total_remaining;
-
- now = GNUNET_TIME_absolute_get ();
- delta = now.value - session->last_quota_update.value;
- if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force))
- return; /* too early, not enough data */
-
- total_allowed = session->quota_in * delta;
- if (total_allowed > session->last_received)
- {
- /* got less than acceptable */
- total_remaining = total_allowed - session->last_received;
- session->last_received = 0;
- delta = total_remaining / session->quota_in; /* bonus seconds */
- if (delta > MAX_BANDWIDTH_CARRY)
- delta = MAX_BANDWIDTH_CARRY; /* limit amount of carry-over */
- }
- else
- {
- /* got more than acceptable */
- session->last_received -= total_allowed;
- delta = 0;
- }
- session->last_quota_update.value = now.value - delta;
-}
-
-
-/**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit. The transport should limit its read/select
- * calls to stay below the quota (in terms of incoming data).
- *
- * @param cls closure
- * @param target the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-static void
-tcp_plugin_set_receive_quota (void *cls,
- const struct GNUNET_PeerIdentity *target,
- uint32_t quota_in)
-{
- struct Plugin *plugin = cls;
- struct Session *session;
-
- // FIXME: This is simply wrong:
- // We may have multiple sessions for the target,
- // and some OTHER session might be the one to
- // survive; not to mention the inbound-quota should
- // be enforced across transports!
- // => keep quota-related states in the service (globally, per peer)
- // and update/query the information when it is needed!
- // => we can likely get rid of this entire function and
- // replace it with a query/update API!
- session = find_session_by_target (plugin, target);
- if (session == NULL)
- {
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "tcp",
- "Could not find session for peer `%4s' to update quota.\n",
- GNUNET_i2s (target));
- return; /* peer must have disconnected, ignore */
- }
- if (session->quota_in != quota_in)
- {
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "tcp",
- "Changing quota for peer `%4s' from %u to %u\n",
- GNUNET_i2s (target),
- session->quota_in,
- quota_in);
- update_quota (session, GNUNET_YES);
- if (session->quota_in > quota_in)
- session->last_quota_update = GNUNET_TIME_absolute_get ();
- session->quota_in = quota_in;
- }
-}
-
-
/**
* Check if the given port is plausible (must be either
* our listen port or our advertised port). If it is
}
-/**
- * Calculate how long we should delay reading from the TCP socket to
- * ensure that we stay within our bandwidth limits (push back).
- *
- * @param session for which client should this be calculated
- */
-static struct GNUNET_TIME_Relative
-calculate_throttle_delay (struct Session *session)
-{
- struct GNUNET_TIME_Relative ret;
- struct GNUNET_TIME_Absolute now;
- uint64_t del;
- uint64_t avail;
- uint64_t excess;
-
- now = GNUNET_TIME_absolute_get ();
- del = now.value - session->last_quota_update.value;
- if (del > MAX_BANDWIDTH_CARRY)
- {
- update_quota (session, GNUNET_YES);
- del = now.value - session->last_quota_update.value;
- GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
- }
- if (session->quota_in == 0)
- session->quota_in = 1; /* avoid divison by zero */
- avail = del * session->quota_in;
- if (avail > session->last_received)
- return GNUNET_TIME_UNIT_ZERO; /* can receive right now */
- excess = session->last_received - avail;
- ret.value = excess / session->quota_in;
- GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
- "tcp",
- "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
- (unsigned long long) excess,
- (unsigned long long) session->quota_in,
- (unsigned long long) ret.value);
- return ret;
-}
-
-
/**
* Task to signal the server that we can continue
* receiving from the TCP client now.
delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Session *session = cls;
- GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
+ struct GNUNET_TIME_Relative delay;
+
+ session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ delay = session->plugin->env->receive (session->plugin->env->cls,
+ &session->target,
+ NULL, 0, NULL, 0);
+ if (delay.value == 0)
+ GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
+ else
+ session->receive_delay_task =
+ GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
+ delay, &delayed_done, session);
}
(unsigned int) ntohs (message->type),
GNUNET_i2s (&session->target));
#endif
- plugin->env->receive (plugin->env->cls, &session->target, message, 1,
- session->connect_addr,
- session->connect_alen);
- /* update bandwidth used */
- session->last_received += msize;
- update_quota (session, GNUNET_NO);
- delay = calculate_throttle_delay (session);
+ delay = plugin->env->receive (plugin->env->cls, &session->target, message, 1,
+ session->connect_addr,
+ session->connect_alen);
+
if (delay.value == 0)
GNUNET_SERVER_receive_done (client, GNUNET_OK);
else
- GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
- delay, &delayed_done, session);
+ session->receive_delay_task =
+ GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
+ delay, &delayed_done, session);
}
api->send = &tcp_plugin_send;
api->disconnect = &tcp_plugin_disconnect;
api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
- api->set_receive_quota = &tcp_plugin_set_receive_quota;
api->check_address = &tcp_plugin_check_address;
plugin->service = service;
plugin->server = GNUNET_SERVICE_get_server (service);
asc (asc_cls, NULL);
}
-/**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit. The transport should limit its read/select
- * calls to stay below the quota (in terms of incoming data).
- *
- * @param cls closure
- * @param target the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-static void
-template_plugin_set_receive_quota (void *cls,
- const struct GNUNET_PeerIdentity *target,
- uint32_t quota_in)
-{
- // struct Plugin *plugin = cls;
- // FIXME!
-}
/**
api->send = &template_plugin_send;
api->disconnect = &template_plugin_disconnect;
api->address_pretty_printer = &template_plugin_address_pretty_printer;
- api->set_receive_quota = &template_plugin_set_receive_quota;
api->check_address = &template_plugin_address_suggested;
return api;
}
!numeric, timeout, &append_port, ppc);
}
-/**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit. This call has no meaning for UDP, as if
- * we don't receive data it still comes in. UDP has no friendliness
- * guarantees, and our buffers will fill at some level.
- *
- * @param cls closure
- * @param target the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-static void
-udp_plugin_set_receive_quota (void *cls,
- const struct GNUNET_PeerIdentity *target,
- uint32_t quota_in)
-{
- return; /* Do nothing */
-}
/**
* The exported method. Makes the core api available via a global and
api->send = &udp_plugin_send;
api->disconnect = &udp_disconnect;
api->address_pretty_printer = &udp_plugin_address_pretty_printer;
- api->set_receive_quota = &udp_plugin_set_receive_quota;
api->check_address = &udp_check_address;
plugin->service = service;
!numeric, timeout, &append_port, ppc);
}
-/**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit. This call has no meaning for UDP, as if
- * we don't receive data it still comes in. UDP has no friendliness
- * guarantees, and our buffers will fill at some level.
- *
- * @param cls closure
- * @param target the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-static void
-udp_nat_plugin_set_receive_quota (void *cls,
- const struct GNUNET_PeerIdentity *target,
- uint32_t quota_in)
-{
- return; /* Do nothing */
-}
/**
* The exported method. Makes the core api available via a global and
api->send = &udp_nat_plugin_send;
api->disconnect = &udp_nat_disconnect;
api->address_pretty_printer = &udp_nat_plugin_address_pretty_printer;
- api->set_receive_quota = &udp_nat_plugin_set_receive_quota;
api->check_address = &udp_nat_check_address;
plugin->service = service;