/**
* Update the quota values for the given neighbour now.
+ *
+ * @param n neighbour to update
+ * @param force GNUNET_YES to force recalculation now
*/
static void
-update_quota (struct NeighbourList *n)
+update_quota (struct NeighbourList *n,
+ int force)
{
- struct GNUNET_TIME_Relative delta;
+ struct GNUNET_TIME_Absolute now;
+ unsigned long long delta;
uint64_t allowed;
uint64_t remaining;
- delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
- if (delta.value < MIN_QUOTA_REFRESH_TIME)
- return; /* not enough time passed for doing quota update */
- allowed = delta.value * n->quota_in;
-
+ now = GNUNET_TIME_absolute_get ();
+ delta = now.value - n->last_quota_update.value;
+ allowed = n->quota_in * delta;
+ if ( (delta < MIN_QUOTA_REFRESH_TIME) &&
+ (!force) &&
+ (allowed < 32 * 1024) )
+ return; /* too early, not enough data */
if (n->last_received < allowed)
{
remaining = allowed - n->last_received;
if (remaining > MAX_BANDWIDTH_CARRY)
remaining = MAX_BANDWIDTH_CARRY;
n->last_received = 0;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
+ n->last_quota_update = now;
n->last_quota_update.value -= remaining;
if (n->quota_violation_count > 0)
n->quota_violation_count--;
else
{
n->last_received -= allowed;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
+ n->last_quota_update = now;
if (n->last_received > allowed)
{
- /* more than twice the allowed rate! */
+ /* much more than the allowed rate! */
n->quota_violation_count += 10;
}
}
* @param cls closure
* @param message the pong message
* @param peer who responded to our challenge
- * @param sender_addr string describing our sender address (as observed
+ * @param sender_address string describing our sender address (as observed
* by the other peer in binary format)
* @param sender_address_len number of bytes in 'sender_address'
*/
}
+/**
+ * Calculate how long we should delay reading from the TCP socket to
+ * ensure that we stay within our bandwidth limits (push back).
+ *
+ * @param n for which neighbour should this be calculated
+ * @return how long to delay receiving more data
+ */
+static struct GNUNET_TIME_Relative
+calculate_throttle_delay (struct NeighbourList *n)
+{
+ struct GNUNET_TIME_Relative ret;
+ struct GNUNET_TIME_Absolute now;
+ uint64_t del;
+ uint64_t avail;
+ uint64_t excess;
+
+ now = GNUNET_TIME_absolute_get ();
+ del = now.value - n->last_quota_update.value;
+ if (del > MAX_BANDWIDTH_CARRY)
+ {
+ update_quota (n, GNUNET_YES);
+ del = now.value - n->last_quota_update.value;
+ GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
+ }
+ if (n->quota_in == 0)
+ n->quota_in = 1; /* avoid divison by zero */
+ avail = del * n->quota_in;
+ if (avail > n->last_received)
+ return GNUNET_TIME_UNIT_ZERO; /* can receive right now */
+ excess = n->last_received - avail;
+ ret.value = excess / n->quota_in;
+ if (ret.value > 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
+ (unsigned long long) excess,
+ (unsigned long long) n->quota_in,
+ (unsigned long long) ret.value);
+ return ret;
+}
+
+
/**
* Function called by the plugin for each received message.
* Update data volumes, possibly notify plugins about
* and generally forward to our receive callback.
*
* @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- * (opaque to transport service)
- * @param sender_address_len the length of the sender address
* @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- * for future receive calls for messages from this
- * particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ * (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
unsigned int distance, const char *sender_address,
n = find_neighbour (peer);
if (n == NULL)
- {
- if (message == NULL)
- return; /* disconnect of peer already marked down */
- n = setup_new_neighbour (peer);
- }
+ n = setup_new_neighbour (peer);
+ update_quota (n, GNUNET_NO);
service_context = n->plugins;
while ((service_context != NULL) && (plugin != service_context->plugin))
service_context = service_context->next;
GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
- if (message == NULL)
+ if (message != NULL)
{
+ peer_address = add_peer_address(n,
+ plugin->short_name,
+ sender_address,
+ sender_address_len);
+ if (peer_address != NULL)
+ {
+ peer_address->distance = distance;
+ if (peer_address->connected == GNUNET_NO)
+ {
+ peer_address->connected = GNUNET_YES;
+ peer_address->connect_attempts++;
+ }
+ peer_address->timeout
+ =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ }
+ /* update traffic received amount ... */
+ msize = ntohs (message->size);
+ n->distance = distance;
+ n->peer_timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_SCHEDULER_cancel (sched,
+ n->timeout_task);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+ {
+ /* dropping message due to frequent inbound volume violations! */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"),
+ n->quota_violation_count);
+ return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+ }
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_HELLO:
+ process_hello (plugin, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+ handle_ping(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+ handle_pong(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ default:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Receive failed from `%4s', triggering disconnect\n",
- GNUNET_i2s (&n->id));
-#endif
- /* TODO: call stats */
- disconnect_neighbour (n, GNUNET_YES);
- return;
- }
- peer_address = add_peer_address(n,
- plugin->short_name,
- sender_address,
- sender_address_len);
- if (peer_address != NULL)
- {
- peer_address->distance = distance;
- if (peer_address->connected == GNUNET_NO)
- {
- peer_address->connected = GNUNET_YES;
- peer_address->connect_attempts++;
- }
- peer_address->timeout
- =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- }
- /* update traffic received amount ... */
- msize = ntohs (message->size);
- n->last_received += msize;
- n->distance = distance;
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- GNUNET_SCHEDULER_cancel (sched,
- n->timeout_task);
- n->timeout_task =
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbour_timeout_task, n);
- update_quota (n);
- if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
- {
- /* dropping message due to frequent inbound volume violations! */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
- GNUNET_ERROR_TYPE_BULK,
- _
- ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
- /* TODO: call stats */
- return;
- }
- switch (ntohs (message->type))
- {
- case GNUNET_MESSAGE_TYPE_HELLO:
- process_hello (plugin, message);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
- handle_ping(plugin, message, peer, sender_address, sender_address_len);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
- handle_pong(plugin, message, peer, sender_address, sender_address_len);
- break;
- default:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from `%4s', sending to all clients.\n",
- ntohs (message->type), GNUNET_i2s (peer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s', sending to all clients.\n",
+ ntohs (message->type), GNUNET_i2s (peer));
#endif
- /* transmit message to all clients */
- im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
- im->header.size = htons (sizeof (struct InboundMessage) + msize);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = GNUNET_TIME_relative_hton (n->latency);
- im->peer = *peer;
- memcpy (&im[1], message, msize);
- cpos = clients;
- while (cpos != NULL)
- {
- transmit_to_client (cpos, &im->header, GNUNET_YES);
- cpos = cpos->next;
- }
- GNUNET_free (im);
- }
+ /* transmit message to all clients */
+ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+ im->header.size = htons (sizeof (struct InboundMessage) + msize);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->latency = GNUNET_TIME_relative_hton (n->latency);
+ im->peer = *peer;
+ memcpy (&im[1], message, msize);
+ cpos = clients;
+ while (cpos != NULL)
+ {
+ transmit_to_client (cpos, &im->header, GNUNET_YES);
+ cpos = cpos->next;
+ }
+ GNUNET_free (im);
+ }
+ }
+ return calculate_throttle_delay (n);
}
const struct QuotaSetMessage *qsm =
(const struct QuotaSetMessage *) message;
struct NeighbourList *n;
- struct TransportPlugin *p;
- struct ReadyList *rl;
+ uint32_t qin;
n = find_neighbour (&qsm->peer);
if (n == NULL)
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return;
}
-
+ qin = ntohl (qsm->quota_in);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
- "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer));
+ "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer));
#endif
-
- update_quota (n);
- if (n->quota_in < ntohl (qsm->quota_in))
+ update_quota (n, GNUNET_YES);
+ if (n->quota_in < qin)
n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->quota_in = ntohl (qsm->quota_in);
- rl = n->plugins;
- while (rl != NULL)
- {
- p = rl->plugin;
- p->api->set_receive_quota (p->api->cls,
- &qsm->peer, ntohl (qsm->quota_in));
- rl = rl->next;
- }
+ n->quota_in = qin;
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
plug->env.cls = plug;
plug->env.receive = &plugin_env_receive;
plug->env.notify_address = &plugin_env_notify_address;
- plug->env.default_quota_in =
- (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
plug->env.max_connections = max_connect_per_transport;
}