};
+
/**
- * Linked list of messages to be transmitted to
- * the client. Each entry is followed by the
- * actual message.
+ * Linked list of messages to be transmitted to the client. Each
+ * entry is followed by the actual message.
*/
struct ClientMessageQueueEntry
{
/**
- * This is a linked list.
+ * This is a doubly-linked list.
*/
struct ClientMessageQueueEntry *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct ClientMessageQueueEntry *prev;
};
*/
struct ClientMessageQueueEntry *message_queue_tail;
+ /**
+ * Current transmit request handle.
+ */
+ struct GNUNET_CONNECTION_TransmitHandle *th;
+
/**
* Is a call to "transmit_send_continuation" pending? If so, we
* must not free this struct (even if the corresponding client
/**
* Update the quota values for the given neighbour now.
+ *
+ * @param n neighbour to update
+ * @param force GNUNET_YES to force recalculation now
*/
static void
-update_quota (struct NeighbourList *n)
+update_quota (struct NeighbourList *n,
+ int force)
{
- struct GNUNET_TIME_Relative delta;
+ struct GNUNET_TIME_Absolute now;
+ unsigned long long delta;
uint64_t allowed;
uint64_t remaining;
- delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
- if (delta.value < MIN_QUOTA_REFRESH_TIME)
- return; /* not enough time passed for doing quota update */
- allowed = delta.value * n->quota_in;
-
+ now = GNUNET_TIME_absolute_get ();
+ delta = now.value - n->last_quota_update.value;
+ allowed = n->quota_in * delta;
+ if ( (delta < MIN_QUOTA_REFRESH_TIME) &&
+ (!force) &&
+ (allowed < 32 * 1024) )
+ return; /* too early, not enough data */
if (n->last_received < allowed)
{
remaining = allowed - n->last_received;
if (remaining > MAX_BANDWIDTH_CARRY)
remaining = MAX_BANDWIDTH_CARRY;
n->last_received = 0;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
+ n->last_quota_update = now;
n->last_quota_update.value -= remaining;
if (n->quota_violation_count > 0)
n->quota_violation_count--;
else
{
n->last_received -= allowed;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
+ n->last_quota_update = now;
if (n->last_received > allowed)
{
- /* more than twice the allowed rate! */
+ /* much more than the allowed rate! */
n->quota_violation_count += 10;
}
}
uint16_t msize;
size_t tsize;
const struct GNUNET_MessageHeader *msg;
- struct GNUNET_CONNECTION_TransmitHandle *th;
char *cbuf;
+ client->th = NULL;
if (buf == NULL)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
/* fatal error with client, free message queue! */
while (NULL != (q = client->message_queue_head))
{
- client->message_queue_head = q->next;
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
GNUNET_free (q);
}
- client->message_queue_tail = NULL;
client->message_count = 0;
return 0;
}
"Transmitting message of type %u to client.\n",
ntohs (msg->type));
#endif
- client->message_queue_head = q->next;
- if (q->next == NULL)
- client->message_queue_tail = NULL;
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
memcpy (&cbuf[tsize], msg, msize);
tsize += msize;
GNUNET_free (q);
if (NULL != q)
{
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
return tsize;
}
{
struct ClientMessageQueueEntry *q;
uint16_t msize;
- struct GNUNET_CONNECTION_TransmitHandle *th;
if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
{
/* TODO: call to statistics... */
return;
}
- client->message_count++;
msize = ntohs (msg->size);
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
memcpy (&q[1], msg, msize);
- /* append to message queue */
- if (client->message_queue_tail == NULL)
- {
- client->message_queue_tail = q;
- }
- else
+ GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head,
+ client->message_queue_tail,
+ client->message_queue_tail,
+ q);
+ client->message_count++;
+ if (client->th == NULL)
{
- client->message_queue_tail->next = q;
- client->message_queue_tail = q;
- }
- if (client->message_queue_head == NULL)
- {
- client->message_queue_head = q;
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
}
static void
try_transmission_to_peer (struct NeighbourList *neighbour)
{
- struct GNUNET_TIME_Relative min_latency;
struct ReadyList *rl;
struct MessageQueue *mq;
struct GNUNET_TIME_Relative timeout;
if (neighbour->messages_head == NULL)
return; /* nothing to do */
- min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
rl = NULL;
mq = neighbour->messages_head;
/* FIXME: support bi-directional use of TCP */
* (otherwise we may be seeing a MiM attack).
*
* @param cls closure
- * @param name name of the transport that generated the address
+ * @param message the pong message
* @param peer who responded to our challenge
- * @param challenge the challenge number we presumably used
- * @param sender_addr string describing our sender address (as observed
- * by the other peer in human-readable format)
+ * @param sender_address string describing our sender address (as observed
+ * by the other peer in binary format)
+ * @param sender_address_len number of bytes in 'sender_address'
*/
static void
handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
*
* @param cls closure
* @param peer id of the peer, NULL for last call
- * @param hello hello message for the peer (can be NULL)
+ * @param h hello message for the peer (can be NULL)
* @param trust amount of trust we have in the peer (not used)
*/
static void
}
+/**
+ * Calculate how long we should delay reading from the TCP socket to
+ * ensure that we stay within our bandwidth limits (push back).
+ *
+ * @param n for which neighbour should this be calculated
+ * @return how long to delay receiving more data
+ */
+static struct GNUNET_TIME_Relative
+calculate_throttle_delay (struct NeighbourList *n)
+{
+ struct GNUNET_TIME_Relative ret;
+ struct GNUNET_TIME_Absolute now;
+ uint64_t del;
+ uint64_t avail;
+ uint64_t excess;
+
+ now = GNUNET_TIME_absolute_get ();
+ del = now.value - n->last_quota_update.value;
+ if (del > MAX_BANDWIDTH_CARRY)
+ {
+ update_quota (n, GNUNET_YES);
+ del = now.value - n->last_quota_update.value;
+ GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
+ }
+ if (n->quota_in == 0)
+ n->quota_in = 1; /* avoid divison by zero */
+ avail = del * n->quota_in;
+ if (avail > n->last_received)
+ return GNUNET_TIME_UNIT_ZERO; /* can receive right now */
+ excess = n->last_received - avail;
+ ret.value = excess / n->quota_in;
+ if (ret.value > 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
+ (unsigned long long) excess,
+ (unsigned long long) n->quota_in,
+ (unsigned long long) ret.value);
+ return ret;
+}
+
+
/**
* Function called by the plugin for each received message.
* Update data volumes, possibly notify plugins about
* and generally forward to our receive callback.
*
* @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- * (opaque to transport service)
- * @param sender_address_len the length of the sender address
* @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- * for future receive calls for messages from this
- * particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ * (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
unsigned int distance, const char *sender_address,
n = find_neighbour (peer);
if (n == NULL)
- {
- if (message == NULL)
- return; /* disconnect of peer already marked down */
- n = setup_new_neighbour (peer);
- }
+ n = setup_new_neighbour (peer);
+ update_quota (n, GNUNET_NO);
service_context = n->plugins;
while ((service_context != NULL) && (plugin != service_context->plugin))
service_context = service_context->next;
GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
- if (message == NULL)
+ if (message != NULL)
{
+ peer_address = add_peer_address(n,
+ plugin->short_name,
+ sender_address,
+ sender_address_len);
+ if (peer_address != NULL)
+ {
+ peer_address->distance = distance;
+ if (peer_address->connected == GNUNET_NO)
+ {
+ peer_address->connected = GNUNET_YES;
+ peer_address->connect_attempts++;
+ }
+ peer_address->timeout
+ =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ }
+ /* update traffic received amount ... */
+ msize = ntohs (message->size);
+ n->distance = distance;
+ n->peer_timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_SCHEDULER_cancel (sched,
+ n->timeout_task);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+ {
+ /* dropping message due to frequent inbound volume violations! */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"),
+ n->quota_violation_count);
+ return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+ }
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_HELLO:
+ process_hello (plugin, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+ handle_ping(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+ handle_pong(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ default:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Receive failed from `%4s', triggering disconnect\n",
- GNUNET_i2s (&n->id));
-#endif
- /* TODO: call stats */
- disconnect_neighbour (n, GNUNET_YES);
- return;
- }
- peer_address = add_peer_address(n,
- plugin->short_name,
- sender_address,
- sender_address_len);
- if (peer_address != NULL)
- {
- peer_address->distance = distance;
- if (peer_address->connected == GNUNET_NO)
- {
- peer_address->connected = GNUNET_YES;
- peer_address->connect_attempts++;
- }
- peer_address->timeout
- =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- }
- /* update traffic received amount ... */
- msize = ntohs (message->size);
- n->last_received += msize;
- n->distance = distance;
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- GNUNET_SCHEDULER_cancel (sched,
- n->timeout_task);
- n->timeout_task =
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbour_timeout_task, n);
- update_quota (n);
- if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
- {
- /* dropping message due to frequent inbound volume violations! */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
- GNUNET_ERROR_TYPE_BULK,
- _
- ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
- /* TODO: call stats */
- return;
- }
- switch (ntohs (message->type))
- {
- case GNUNET_MESSAGE_TYPE_HELLO:
- process_hello (plugin, message);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
- handle_ping(plugin, message, peer, sender_address, sender_address_len);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
- handle_pong(plugin, message, peer, sender_address, sender_address_len);
- break;
- default:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from `%4s', sending to all clients.\n",
- ntohs (message->type), GNUNET_i2s (peer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s', sending to all clients.\n",
+ ntohs (message->type), GNUNET_i2s (peer));
#endif
- /* transmit message to all clients */
- im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
- im->header.size = htons (sizeof (struct InboundMessage) + msize);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = GNUNET_TIME_relative_hton (n->latency);
- im->peer = *peer;
- memcpy (&im[1], message, msize);
- cpos = clients;
- while (cpos != NULL)
- {
- transmit_to_client (cpos, &im->header, GNUNET_YES);
- cpos = cpos->next;
- }
- GNUNET_free (im);
- }
+ /* transmit message to all clients */
+ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+ im->header.size = htons (sizeof (struct InboundMessage) + msize);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->latency = GNUNET_TIME_relative_hton (n->latency);
+ im->peer = *peer;
+ memcpy (&im[1], message, msize);
+ cpos = clients;
+ while (cpos != NULL)
+ {
+ transmit_to_client (cpos, &im->header, GNUNET_YES);
+ cpos = cpos->next;
+ }
+ GNUNET_free (im);
+ }
+ }
+ return calculate_throttle_delay (n);
}
{
int ret;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client\n", "HELLO");
-#endif
ret = process_hello (NULL, message);
GNUNET_SERVER_receive_done (client, ret);
}
const struct QuotaSetMessage *qsm =
(const struct QuotaSetMessage *) message;
struct NeighbourList *n;
- struct TransportPlugin *p;
- struct ReadyList *rl;
+ uint32_t qin;
n = find_neighbour (&qsm->peer);
if (n == NULL)
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return;
}
-
+ qin = ntohl (qsm->quota_in);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
- "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer));
+ "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer));
#endif
-
- update_quota (n);
- if (n->quota_in < ntohl (qsm->quota_in))
+ update_quota (n, GNUNET_YES);
+ if (n->quota_in < qin)
n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->quota_in = ntohl (qsm->quota_in);
- rl = n->plugins;
- while (rl != NULL)
- {
- p = rl->plugin;
- p->api->set_receive_quota (p->api->cls,
- &qsm->peer, ntohl (qsm->quota_in));
- rl = rl->next;
- }
+ n->quota_in = qin;
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
plug->env.cls = plug;
plug->env.receive = &plugin_env_receive;
plug->env.notify_address = &plugin_env_notify_address;
- plug->env.default_quota_in =
- (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
plug->env.max_connections = max_connect_per_transport;
}
return;
while (NULL != (mqe = pos->message_queue_head))
{
- pos->message_queue_head = mqe->next;
+ GNUNET_CONTAINER_DLL_remove (pos->message_queue_head,
+ pos->message_queue_tail,
+ mqe);
+ pos->message_count--;
GNUNET_free (mqe);
}
- pos->message_queue_head = NULL;
if (prev == NULL)
clients = pos->next;
else
pos->client = NULL;
return;
}
+ if (pos->th != NULL)
+ {
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+ pos->th = NULL;
+ }
+ GNUNET_break (0 == pos->message_count);
GNUNET_free (pos);
}