#include "plugin_transport.h"
#include "transport.h"
+/**
+ * Should we do some additional checks (to validate behavior
+ * of clients)?
+ */
+#define EXTRA_CHECKS GNUNET_YES
+
/**
* How many messages can we have pending for a given client process
* before we start to drop incoming messages? We typically should
#define TRANSPORT_DEFAULT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
/**
- * FIXME: document!
+ * Priority to use for PONG messages.
*/
-#define TRANSPORT_DEFAULT_PRIORITY 4
+#define TRANSPORT_PONG_PRIORITY 4
/**
* How often do we re-add (cheaper) plugins to our list of plugins
#define HELLO_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12)
+/**
+ * How long before an existing address expires should we again try to
+ * validate it? Must be (significantly) smaller than
+ * HELLO_ADDRESS_EXPIRATION.
+ */
+#define HELLO_REVALIDATION_START_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 1)
+
+
/**
* List of addresses of other peers
*/
* transport simply stalls writing to the stream but does not
* formerly get a signal that the other peer died.
*/
- /* FIXME: Do we need this? */
struct GNUNET_TIME_Absolute timeout;
/**
- * Is this plugin currently connected? The first time we transmit
- * or send data to a peer via a particular plugin, we set this to
- * GNUNET_YES. If we later get an error (disconnect notification or
- * transmission failure), we set it back to GNUNET_NO. Each time
- * the value is set to GNUNET_YES, we increment the
- * "connect_attempts" counter. If that one reaches a particular
- * threshold, we consider the address to not be working properly at
- * this time and remove it from the eligible list.
+ * Are we currently connected via this address? The first time we
+ * successfully transmit or receive data to a peer via a particular
+ * address, we set this to GNUNET_YES. If we later get an error
+ * (disconnect notification, transmission failure, timeout), we set
+ * it back to GNUNET_NO.
*/
int connected;
/**
- * Is this plugin ready to transmit to the specific target?
- * GNUNET_NO if not. Initially, all plugins are marked ready. If a
- * transmission is in progress, "transmit_ready" is set to
- * GNUNET_NO.
+ * Is this plugin currently busy transmitting to the specific target?
+ * GNUNET_NO if not (initial, default state is GNUNET_NO). Internal
+ * messages do not count as 'in transmit'.
*/
- int transmit_ready;
+ int in_transmit;
/**
* Has this address been validated yet?
int validated;
/**
- * How often have we tried to connect using this plugin?
+ * How often have we tried to connect using this plugin? Used to
+ * discriminate against addresses that do not work well.
+ * FIXME: not yet used, but should be!
*/
unsigned int connect_attempts;
+ /**
+ * DV distance to this peer (1 if no DV is used).
+ * FIXME: need to set this from transport plugins!
+ */
+ uint32_t distance;
+
};
};
-struct NeighborList;
+struct NeighbourList;
/**
- * For each neighbor we keep a list of messages
- * that we still want to transmit to the neighbor.
+ * For each neighbour we keep a list of messages
+ * that we still want to transmit to the neighbour.
*/
struct MessageQueue
{
/**
- * This is a linked list.
+ * This is a doubly linked list.
*/
struct MessageQueue *next;
+ /**
+ * This is a doubly linked list.
+ */
+ struct MessageQueue *prev;
+
/**
* The message(s) we want to transmit, GNUNET_MessageHeader(s)
* stuck together in memory. Allocated at the end of this struct.
struct ForeignAddressList *specific_address;
/**
- * Peer ID of the Neighbor this entry belongs to.
+ * Peer ID of the Neighbour this entry belongs to.
*/
- struct GNUNET_PeerIdentity neighbor_id;
+ struct GNUNET_PeerIdentity neighbour_id;
/**
* Plugin that we used for the transmission.
*/
struct TransportPlugin *plugin;
+ /**
+ * At what time should we fail?
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
/**
* Internal message of the transport system that should not be
* included in the usual SEND-SEND_OK transmission confirmation
/**
- * For a given Neighbor, which plugins are available
+ * For a given Neighbour, which plugins are available
* to talk to this peer and what are their costs?
*/
struct ReadyList
*/
struct ForeignAddressList *addresses;
- /**
- * Is this plugin ready to transmit to the specific target?
- * GNUNET_NO if not. Initially, all plugins are marked ready. If a
- * transmission is in progress, "transmit_ready" is set to
- * GNUNET_NO.
- */
- // FIXME: is this dead?
- int plugin_transmit_ready;
-
- /**
- * Is the plugin represented by this entry currently connected to
- * the respective peer?
- */
- int connected;
-
};
/**
- * Entry in linked list of all of our current neighbors.
+ * Entry in linked list of all of our current neighbours.
*/
-struct NeighborList
+struct NeighbourList
{
/**
* This is a linked list.
*/
- struct NeighborList *next;
+ struct NeighbourList *next;
/**
* Which of our transports is connected to this peer
struct ReadyList *plugins;
/**
- * List of messages we would like to send to this peer;
+ * Head of list of messages we would like to send to this peer;
* must contain at most one message per client.
*/
- struct MessageQueue *messages;
+ struct MessageQueue *messages_head;
+
+ /**
+ * Tail of list of messages we would like to send to this peer; must
+ * contain at most one message per client.
+ */
+ struct MessageQueue *messages_tail;
/**
- * Identity of this neighbor.
+ * Identity of this neighbour.
*/
struct GNUNET_PeerIdentity id;
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+ /**
+ * ID of task scheduled to run when we should retry transmitting
+ * the head of the message queue.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retry_task;
+
/**
* How long until we should consider this peer dead
* (if we don't receive another message in the
* this particular peer. This latency may have been calculated
* over multiple transports. This value reflects how long it took
* us to receive a response when SENDING via this particular
- * transport/neighbor/address combination!
- * FIXME: why is this NBO?
+ * transport/neighbour/address combination!
+ *
+ * FIXME: we need to periodically send PINGs to update this
+ * latency (at least more often than the current "huge" (11h?)
+ * update interval).
+ */
+ struct GNUNET_TIME_Relative latency;
+
+ /**
+ * DV distance to this peer (1 if no DV is used).
*/
- struct GNUNET_TIME_RelativeNBO latency;
+ uint32_t distance;
/**
* How many bytes have we received since the "last_quota_update"
uint64_t last_received;
/**
- * Global quota for inbound traffic for the neighbor in bytes/ms.
+ * Global quota for inbound traffic for the neighbour in bytes/ms.
*/
uint32_t quota_in;
unsigned int quota_violation_count;
/**
- * Have we seen an ACK from this neighbor in the past?
- * (used to make up a fake ACK for clients connecting after
- * the neighbor connected to us).
+ * Have we seen an PONG from this neighbour in the past (and
+ * not had a disconnect since)?
*/
int received_pong;
};
+
/**
- * Linked list of messages to be transmitted to
- * the client. Each entry is followed by the
- * actual message.
+ * Linked list of messages to be transmitted to the client. Each
+ * entry is followed by the actual message.
*/
struct ClientMessageQueueEntry
{
/**
- * This is a linked list.
+ * This is a doubly-linked list.
*/
struct ClientMessageQueueEntry *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct ClientMessageQueueEntry *prev;
};
*/
struct ClientMessageQueueEntry *message_queue_tail;
+ /**
+ * Current transmit request handle.
+ */
+ struct GNUNET_CONNECTION_TransmitHandle *th;
+
/**
* Is a call to "transmit_send_continuation" pending? If so, we
* must not free this struct (even if the corresponding client
static struct GNUNET_HELLO_Message *our_hello;
/**
- * "version" of "our_hello". Used to see if a given neighbor has
+ * "version" of "our_hello". Used to see if a given neighbour has
* already been sent the latest version of our HELLO message.
*/
static unsigned int our_hello_version;
static struct GNUNET_SERVER_Handle *server;
/**
- * All known neighbors and their HELLOs.
+ * All known neighbours and their HELLOs.
*/
-static struct NeighborList *neighbors;
+static struct NeighbourList *neighbours;
/**
- * Number of neighbors we'd like to have.
+ * Number of neighbours we'd like to have.
*/
static uint32_t max_connect_per_transport;
static struct GNUNET_CONTAINER_MultiHashMap *validation_map;
-
/**
- * The peer specified by the given neighbor has timed-out or a plugin
+ * The peer specified by the given neighbour has timed-out or a plugin
* has disconnected. We may either need to do nothing (other plugins
* still up), or trigger a full disconnect and clean up. This
* function updates our state and do the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
+ * Also notifies our clients that the neighbour is now officially
* gone.
*
- * @param n the neighbor list entry for the peer
+ * @param n the neighbour list entry for the peer
* @param check should we just check if all plugins
* disconnected or must we ask all plugins to
* disconnect?
*/
-static void disconnect_neighbor (struct NeighborList *n, int check);
+static void disconnect_neighbour (struct NeighbourList *n, int check);
/**
- * Check the ready list for the given neighbor and if a plugin is
+ * Check the ready list for the given neighbour and if a plugin is
* ready for transmission (and if we have a message), do so!
*
- * @param neighbor target peer for which to transmit
+ * @param neighbour target peer for which to transmit
*/
-static void try_transmission_to_peer (struct NeighborList *neighbor);
+static void try_transmission_to_peer (struct NeighbourList *neighbour);
/**
- * Find an entry in the neighbor list for a particular peer.
+ * Find an entry in the neighbour list for a particular peer.
* if sender_address is not specified (NULL) then return the
* first matching entry. If sender_address is specified, then
* make sure that the address and address_len also matches.
*
* @return NULL if not found.
*/
-static struct NeighborList *
-find_neighbor (const struct GNUNET_PeerIdentity *key)
+static struct NeighbourList *
+find_neighbour (const struct GNUNET_PeerIdentity *key)
{
- struct NeighborList *head = neighbors;
+ struct NeighbourList *head = neighbours;
while ((head != NULL) &&
(0 != memcmp (key, &head->id, sizeof (struct GNUNET_PeerIdentity))))
/**
- * Update the quota values for the given neighbor now.
+ * Update the quota values for the given neighbour now.
+ *
+ * @param n neighbour to update
+ * @param force GNUNET_YES to force recalculation now
*/
static void
-update_quota (struct NeighborList *n)
+update_quota (struct NeighbourList *n,
+ int force)
{
- struct GNUNET_TIME_Relative delta;
+ struct GNUNET_TIME_Absolute now;
+ unsigned long long delta;
uint64_t allowed;
uint64_t remaining;
- delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
- if (delta.value < MIN_QUOTA_REFRESH_TIME)
- return; /* not enough time passed for doing quota update */
- allowed = delta.value * n->quota_in;
-
+ now = GNUNET_TIME_absolute_get ();
+ delta = now.value - n->last_quota_update.value;
+ allowed = n->quota_in * delta;
+ if ( (delta < MIN_QUOTA_REFRESH_TIME) &&
+ (!force) &&
+ (allowed < 32 * 1024) )
+ return; /* too early, not enough data */
if (n->last_received < allowed)
{
remaining = allowed - n->last_received;
if (remaining > MAX_BANDWIDTH_CARRY)
remaining = MAX_BANDWIDTH_CARRY;
n->last_received = 0;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
+ n->last_quota_update = now;
n->last_quota_update.value -= remaining;
if (n->quota_violation_count > 0)
n->quota_violation_count--;
else
{
n->last_received -= allowed;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
+ n->last_quota_update = now;
if (n->last_received > allowed)
{
- /* more than twice the allowed rate! */
+ /* much more than the allowed rate! */
n->quota_violation_count += 10;
}
}
uint16_t msize;
size_t tsize;
const struct GNUNET_MessageHeader *msg;
- struct GNUNET_CONNECTION_TransmitHandle *th;
char *cbuf;
+ client->th = NULL;
if (buf == NULL)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
/* fatal error with client, free message queue! */
while (NULL != (q = client->message_queue_head))
{
- client->message_queue_head = q->next;
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
GNUNET_free (q);
}
- client->message_queue_tail = NULL;
client->message_count = 0;
return 0;
}
"Transmitting message of type %u to client.\n",
ntohs (msg->type));
#endif
- client->message_queue_head = q->next;
- if (q->next == NULL)
- client->message_queue_tail = NULL;
+ GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
+ client->message_queue_tail,
+ q);
memcpy (&cbuf[tsize], msg, msize);
tsize += msize;
GNUNET_free (q);
if (NULL != q)
{
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
return tsize;
}
{
struct ClientMessageQueueEntry *q;
uint16_t msize;
- struct GNUNET_CONNECTION_TransmitHandle *th;
if ((client->message_count >= MAX_PENDING) && (GNUNET_YES == may_drop))
{
/* TODO: call to statistics... */
return;
}
- client->message_count++;
msize = ntohs (msg->size);
GNUNET_assert (msize >= sizeof (struct GNUNET_MessageHeader));
q = GNUNET_malloc (sizeof (struct ClientMessageQueueEntry) + msize);
memcpy (&q[1], msg, msize);
- /* append to message queue */
- if (client->message_queue_tail == NULL)
- {
- client->message_queue_tail = q;
- }
- else
+ GNUNET_CONTAINER_DLL_insert_after (client->message_queue_head,
+ client->message_queue_tail,
+ client->message_queue_tail,
+ q);
+ client->message_count++;
+ if (client->th == NULL)
{
- client->message_queue_tail->next = q;
- client->message_queue_tail = q;
- }
- if (client->message_queue_head == NULL)
- {
- client->message_queue_head = q;
- th = GNUNET_SERVER_notify_transmit_ready (client->client,
- msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &transmit_to_client_callback,
- client);
- GNUNET_assert (th != NULL);
+ client->th = GNUNET_SERVER_notify_transmit_ready (client->client,
+ msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_to_client_callback,
+ client);
+ GNUNET_assert (client->th != NULL);
}
}
+/**
+ * Transmit a 'SEND_OK' notification to the given client for the
+ * given neighbour.
+ *
+ * @param client who to notify
+ * @param n neighbour to notify about
+ * @param result status code for the transmission request
+ */
+static void
+transmit_send_ok (struct TransportClient *client,
+ struct NeighbourList *n,
+ int result)
+{
+ struct SendOkMessage send_ok_msg;
+
+ send_ok_msg.header.size = htons (sizeof (send_ok_msg));
+ send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+ send_ok_msg.success = htonl (result);
+ send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency);
+ send_ok_msg.peer = n->id;
+ transmit_to_client (client, &send_ok_msg.header, GNUNET_NO);
+}
+
+
/**
* Function called by the GNUNET_TRANSPORT_TransmitFunction
* upon "completion" of a send request. This tells the API
int result)
{
struct MessageQueue *mq = cls;
- /*struct ReadyList *rl;*/ /* We no longer use the ReadyList for anything here, safe to remove? */
- struct SendOkMessage send_ok_msg;
- struct NeighborList *n;
-
- GNUNET_assert (mq != NULL);
- n = find_neighbor(&mq->neighbor_id);
- if (n == NULL) /* Neighbor must have been removed asynchronously! */
- return;
-
- /* Otherwise, let's make sure we've got the right peer */
- GNUNET_assert (0 ==
- memcmp (&n->id, target,
- sizeof (struct GNUNET_PeerIdentity)));
+ struct NeighbourList *n;
- if (result == GNUNET_OK)
- {
- mq->specific_address->timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission to peer `%s' failed, marking connection as down.\n",
- GNUNET_i2s (target));
- mq->specific_address->connected = GNUNET_NO;
- }
- if (!mq->internal_msg)
+ n = find_neighbour(&mq->neighbour_id);
+ GNUNET_assert (n != NULL);
+ if (mq->specific_address != NULL)
{
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Setting transmit_ready on transport!\n");
-#endif
- mq->specific_address->transmit_ready = GNUNET_YES;
+ if (result == GNUNET_OK)
+ {
+ mq->specific_address->timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ mq->specific_address->connected = GNUNET_YES;
+ }
+ else
+ {
+ mq->specific_address->connected = GNUNET_NO;
+ }
+ if (! mq->internal_msg)
+ mq->specific_address->in_transmit = GNUNET_NO;
}
-
if (mq->client != NULL)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Notifying client %p about transmission to peer `%4s'.\n",
- mq->client, GNUNET_i2s (target));
- send_ok_msg.header.size = htons (sizeof (send_ok_msg));
- send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- send_ok_msg.success = htonl (result);
- send_ok_msg.peer = n->id;
- transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
- }
+ transmit_send_ok (mq->client, n, result);
GNUNET_free (mq);
- /* one plugin just became ready again, try transmitting
- another message (if available) */
- if (result == GNUNET_OK)
- try_transmission_to_peer (n);
- else
- disconnect_neighbor (n, GNUNET_YES);
+ try_transmission_to_peer (n);
+ if (result != GNUNET_OK)
+ disconnect_neighbour (n, GNUNET_YES);
}
/**
* Find an address in any of the available transports for
- * the given neighbor that would be good for message
+ * the given neighbour that would be good for message
* transmission. This is essentially the transport selection
* routine.
*
- * @param neighbor for whom to select an address
+ * @param neighbour for whom to select an address
* @return selected address, NULL if we have none
*/
struct ForeignAddressList *
-find_ready_address(struct NeighborList *neighbor)
+find_ready_address(struct NeighbourList *neighbour)
{
- struct ReadyList *head = neighbor->plugins;
+ struct ReadyList *head = neighbour->plugins;
struct ForeignAddressList *addresses;
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
struct ForeignAddressList *best_address;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Marking long-time inactive connection to `%4s' as down.\n",
- GNUNET_i2s (&neighbor->id));
+ GNUNET_i2s (&neighbour->id));
#endif
addresses->connected = GNUNET_NO;
}
if ( ( (best_address == NULL) ||
(addresses->connected == GNUNET_YES) ||
(best_address->connected == GNUNET_NO) ) &&
- (addresses->transmit_ready == GNUNET_YES) &&
+ (addresses->in_transmit == GNUNET_NO) &&
( (best_address == NULL) ||
(addresses->latency.value < best_address->latency.value)) )
best_address = addresses;
/**
- * Check the ready list for the given neighbor and if a plugin is
+ * We should re-try transmitting to the given peer,
+ * hopefully we've learned something in the meantime.
+ */
+static void
+retry_transmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct NeighbourList *n = cls;
+
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ try_transmission_to_peer (n);
+}
+
+
+/**
+ * Check the ready list for the given neighbour and if a plugin is
* ready for transmission (and if we have a message), do so!
*
- * @param neighbor target peer for which to transmit
+ * @param neighbour target peer for which to transmit
*/
static void
-try_transmission_to_peer (struct NeighborList *neighbor)
+try_transmission_to_peer (struct NeighbourList *neighbour)
{
- struct GNUNET_TIME_Relative min_latency;
struct ReadyList *rl;
struct MessageQueue *mq;
+ struct GNUNET_TIME_Relative timeout;
- if (neighbor->messages == NULL)
+ if (neighbour->messages_head == NULL)
return; /* nothing to do */
- min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
rl = NULL;
- mq = neighbor->messages;
+ mq = neighbour->messages_head;
+ /* FIXME: support bi-directional use of TCP */
if (mq->specific_address == NULL)
- mq->specific_address = find_ready_address(neighbor);
+ mq->specific_address = find_ready_address(neighbour);
if (mq->specific_address == NULL)
{
+ timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
+ if (timeout.value == 0)
+ {
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No destination address available to transmit message of size %u to peer `%4s'\n",
+ mq->message_buf_size,
+ GNUNET_i2s (&mq->neighbour_id));
+#endif
+ if (mq->client != NULL)
+ transmit_send_ok (mq->client, neighbour, GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
+ GNUNET_free (mq);
+ return; /* nobody ready */
+ }
+ if (neighbour->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ GNUNET_SCHEDULER_cancel (sched,
+ neighbour->retry_task);
+ neighbour->retry_task = GNUNET_SCHEDULER_add_delayed (sched,
+ timeout,
+ &retry_transmission_task,
+ neighbour);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No destination address available to transmit message of size %u to peer `%4s'\n",
+ "No validated destination address available to transmit message of size %u to peer `%4s', will wait %llums to find an address.\n",
mq->message_buf_size,
- GNUNET_i2s (&mq->neighbor_id));
+ GNUNET_i2s (&mq->neighbour_id),
+ timeout.value);
#endif
- return; /* nobody ready */
+ return;
}
+ GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
+ if (mq->specific_address->connected == GNUNET_NO)
+ mq->specific_address->connect_attempts++;
rl = mq->specific_address->ready_list;
- neighbor->messages = mq->next;
mq->plugin = rl->plugin;
if (!mq->internal_msg)
- mq->specific_address->transmit_ready = GNUNET_NO;
+ mq->specific_address->in_transmit = GNUNET_YES;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending message of size %u for `%4s' to `%s' via plugin `%s'\n",
mq->message_buf_size,
- GNUNET_i2s (&neighbor->id),
+ GNUNET_i2s (&neighbour->id),
GNUNET_a2s (mq->specific_address->addr,
mq->specific_address->addrlen),
rl->plugin->short_name);
#endif
rl->plugin->api->send (rl->plugin->api->cls,
- &mq->neighbor_id,
+ &mq->neighbour_id,
mq->message_buf,
mq->message_buf_size,
mq->priority,
* @param client source of the transmission request (can be NULL)
* @param peer_address ForeignAddressList where we should send this message
* @param priority how important is the message
+ * @param timeout how long do we have to transmit?
* @param message_buf message(s) to send GNUNET_MessageHeader(s)
* @param message_buf_size total size of all messages in message_buf
- * @param is_internal is this an internal message
- * @param neighbor handle to the neighbor for transmission
+ * @param is_internal is this an internal message; these are pre-pended and
+ * also do not count for plugins being "ready" to transmit
+ * @param neighbour handle to the neighbour for transmission
*/
static void
transmit_to_peer (struct TransportClient *client,
struct ForeignAddressList *peer_address,
unsigned int priority,
+ struct GNUNET_TIME_Relative timeout,
const char *message_buf,
size_t message_buf_size,
- int is_internal, struct NeighborList *neighbor)
+ int is_internal, struct NeighbourList *neighbour)
{
struct MessageQueue *mq;
- struct MessageQueue *mqe;
+#if EXTRA_CHECKS
if (client != NULL)
{
/* check for duplicate submission */
- mq = neighbor->messages;
+ mq = neighbour->messages_head;
while (NULL != mq)
{
if (mq->client == client)
mq = mq->next;
}
}
+#endif
mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
mq->specific_address = peer_address;
mq->client = client;
memcpy (&mq[1], message_buf, message_buf_size);
mq->message_buf = (const char*) &mq[1];
mq->message_buf_size = message_buf_size;
- memcpy(&mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity));
+ memcpy(&mq->neighbour_id, &neighbour->id, sizeof(struct GNUNET_PeerIdentity));
mq->internal_msg = is_internal;
mq->priority = priority;
-
- if (is_internal)
- {
- /* append at head */
- mq->next = neighbor->messages;
- neighbor->messages = mq;
- }
+ mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ if (is_internal)
+ GNUNET_CONTAINER_DLL_insert (neighbour->messages_head,
+ neighbour->messages_tail,
+ mq);
else
- {
- /* find tail */
- mqe = neighbor->messages;
- if (mqe != NULL)
- while (mqe->next != NULL)
- mqe = mqe->next;
- if (mqe == NULL)
- {
- /* new head */
- neighbor->messages = mq;
- }
- else
- {
- /* append */
- mqe->next = mq;
- }
- }
- try_transmission_to_peer (neighbor);
+ GNUNET_CONTAINER_DLL_insert_after (neighbour->messages_head,
+ neighbour->messages_tail,
+ neighbour->messages_tail,
+ mq);
+ try_transmission_to_peer (neighbour);
}
{
struct GNUNET_HELLO_Message *hello;
struct TransportClient *cpos;
- struct NeighborList *npos;
+ struct NeighbourList *npos;
struct GeneratorContext gc;
gc.plug_pos = plugins;
our_hello = hello;
our_hello_version++;
GNUNET_PEERINFO_add_peer (cfg, sched, &my_identity, our_hello);
- npos = neighbors;
+ npos = neighbours;
while (npos != NULL)
{
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Transmitting updated `%s' to neighbor `%4s'\n",
+ "Transmitting updated `%s' to neighbour `%4s'\n",
"HELLO", GNUNET_i2s (&npos->id));
#endif
transmit_to_peer (NULL, NULL, 0,
+ HELLO_ADDRESS_EXPIRATION,
(const char *) our_hello,
GNUNET_HELLO_size(our_hello),
GNUNET_NO, npos);
*/
static void
notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_TIME_Relative latency)
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
{
struct ConnectInfoMessage cim;
struct TransportClient *cpos;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notifying clients about connection from `%s'\n",
+ GNUNET_i2s (peer));
+#endif
cim.header.size = htons (sizeof (struct ConnectInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
+ cim.distance = htonl (distance);
cim.latency = GNUNET_TIME_relative_hton (latency);
memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
cpos = clients;
struct DisconnectInfoMessage dim;
struct TransportClient *cpos;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notifying clients about lost connection to `%s'\n",
+ GNUNET_i2s (peer));
+#endif
dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
dim.reserved = htonl (0);
* Find a ForeignAddressList entry for the given neighbour
* that matches the given address and transport.
*
- * @param neighbor which peer we care about
+ * @param neighbour which peer we care about
* @param tname name of the transport plugin
* @param addr binary address
* @param addrlen length of addr
* @return NULL if no such entry exists
*/
static struct ForeignAddressList *
-find_peer_address(struct NeighborList *neighbor,
+find_peer_address(struct NeighbourList *neighbour,
const char *tname,
const char *addr,
size_t addrlen)
struct ReadyList *head;
struct ForeignAddressList *address_head;
- head = neighbor->plugins;
+ head = neighbour->plugins;
while (head != NULL)
{
if (0 == strcmp (tname, head->plugin->short_name))
/**
- * Get the peer address struct for the given neighbor and
+ * Get the peer address struct for the given neighbour and
* address. If it doesn't yet exist, create it.
*
- * @param neighbor which peer we care about
+ * @param neighbour which peer we care about
* @param tname name of the transport plugin
* @param addr binary address
* @param addrlen length of addr
* @return NULL if we do not have a transport plugin for 'tname'
*/
static struct ForeignAddressList *
-add_peer_address(struct NeighborList *neighbor,
+add_peer_address(struct NeighbourList *neighbour,
const char *tname,
const char *addr,
size_t addrlen)
struct ReadyList *head;
struct ForeignAddressList *ret;
- ret = find_peer_address (neighbor, tname, addr, addrlen);
+ ret = find_peer_address (neighbour, tname, addr, addrlen);
if (ret != NULL)
return ret;
- head = neighbor->plugins;
+ head = neighbour->plugins;
while (head != NULL)
{
if (0 == strcmp (tname, head->plugin->short_name))
ret->expires = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
ret->latency = GNUNET_TIME_relative_get_forever();
- ret->transmit_ready = GNUNET_YES;
+ ret->distance = -1;
ret->timeout = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
ret->ready_list = head;
unsigned int challenge = ntohl(pong->challenge);
struct GNUNET_HELLO_Message *hello;
struct GNUNET_PeerIdentity target;
- struct NeighborList *n;
+ struct NeighbourList *n;
struct ForeignAddressList *fal;
if (ve->challenge != challenge)
&target,
hello);
GNUNET_free (hello);
- n = find_neighbor (&target);
+ n = find_neighbour (&target);
if (n != NULL)
{
fal = add_peer_address (n, ve->transport_name,
fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
fal->validated = GNUNET_YES;
fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time);
+ if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
+ n->latency = fal->latency;
+ else
+ n->latency.value = (fal->latency.value + n->latency.value) / 2;
+ n->distance = fal->distance;
+ if (GNUNET_NO == n->received_pong)
+ {
+ notify_clients_connect (&target, n->latency, n->distance);
+ n->received_pong = GNUNET_YES;
+ }
+ if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched,
+ n->retry_task);
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ try_transmission_to_peer (n);
+ }
}
/* clean up validation entry */
* (otherwise we may be seeing a MiM attack).
*
* @param cls closure
- * @param name name of the transport that generated the address
+ * @param message the pong message
* @param peer who responded to our challenge
- * @param challenge the challenge number we presumably used
- * @param sender_addr string describing our sender address (as observed
- * by the other peer in human-readable format)
+ * @param sender_address string describing our sender address (as observed
+ * by the other peer in binary format)
+ * @param sender_address_len number of bytes in 'sender_address'
*/
static void
handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
const char *sender_address,
size_t sender_address_len)
{
-#if DEBUG_TRANSPORT
+#if DEBUG_TRANSPORT > 1
+ /* we get tons of these that just get discarded, only log
+ if we are quite verbose */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Receiving `%s' message from `%4s'.\n", "PONG",
GNUNET_i2s (peer));
per PING, and all but the first PONG will end up
here. So really we should not print anything here
unless we want to be very, very verbose... */
-#if DEBUG_TRANSPORT > 1
+#if DEBUG_TRANSPORT > 2
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received `%s' message from `%4s' but have no record of a matching `%s' message. Ignoring.\n",
"PONG",
#endif
return;
}
+
#if 0
/* FIXME: add given address to potential pool of our addresses
(for voting) */
static void
-neighbor_timeout_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+neighbour_timeout_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct NeighborList *n = cls;
+ struct NeighbourList *n = cls;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Neighbor `%4s' has timed out!\n", GNUNET_i2s (&n->id));
+ "Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id));
#endif
n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- disconnect_neighbor (n, GNUNET_NO);
+ disconnect_neighbour (n, GNUNET_NO);
}
/**
- * Create a fresh entry in our neighbor list for the given peer.
- * Will try to transmit our current HELLO to the new neighbor. Also
+ * Create a fresh entry in our neighbour list for the given peer.
+ * Will try to transmit our current HELLO to the new neighbour. Also
* notifies our clients about the new "connection".
*
* @param peer the peer for which we create the entry
- * @return the new neighbor list entry
+ * @return the new neighbour list entry
*/
-static struct NeighborList *
-setup_new_neighbor (const struct GNUNET_PeerIdentity *peer)
+static struct NeighbourList *
+setup_new_neighbour (const struct GNUNET_PeerIdentity *peer)
{
- struct NeighborList *n;
+ struct NeighbourList *n;
struct TransportPlugin *tp;
struct ReadyList *rl;
GNUNET_assert (our_hello != NULL);
- n = GNUNET_malloc (sizeof (struct NeighborList));
- n->next = neighbors;
- neighbors = n;
+ n = GNUNET_malloc (sizeof (struct NeighbourList));
+ n->next = neighbours;
+ neighbours = n;
n->id = *peer;
n->last_quota_update = GNUNET_TIME_absolute_get ();
n->peer_timeout =
}
tp = tp->next;
}
+ n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
+ n->distance = -1;
n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbor_timeout_task, n);
+ &neighbour_timeout_task, n);
transmit_to_peer (NULL, NULL, 0,
+ HELLO_ADDRESS_EXPIRATION,
(const char *) our_hello, GNUNET_HELLO_size(our_hello),
GNUNET_NO, n);
- notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
return n;
}
struct GNUNET_PeerIdentity id;
struct TransportPlugin *tp;
struct ValidationEntry *va;
- struct NeighborList *neighbor;
+ struct NeighbourList *neighbour;
struct ForeignAddressList *peer_address;
struct TransportPingMessage ping;
struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
&id.hashPubKey,
va,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- neighbor = find_neighbor(&id);
- if (neighbor == NULL)
- neighbor = setup_new_neighbor(&id);
- peer_address = add_peer_address(neighbor, tname, addr, addrlen);
+ neighbour = find_neighbour(&id);
+ if (neighbour == NULL)
+ neighbour = setup_new_neighbour(&id);
+ peer_address = add_peer_address(neighbour, tname, addr, addrlen);
GNUNET_assert(peer_address != NULL);
hello_size = GNUNET_HELLO_size(our_hello);
tsize = sizeof(struct TransportPingMessage) + hello_size;
"HELLO", hello_size,
"PING", sizeof (struct TransportPingMessage));
#endif
- transmit_to_peer(NULL, peer_address,
- GNUNET_SCHEDULER_PRIORITY_DEFAULT,
- message_buf, tsize,
- GNUNET_YES, neighbor);
+ transmit_to_peer (NULL, peer_address,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ HELLO_VERIFICATION_TIMEOUT,
+ message_buf, tsize,
+ GNUNET_YES, neighbour);
GNUNET_free(message_buf);
return GNUNET_OK;
}
* Add the given address to the list of foreign addresses
* available for the given peer (check for duplicates).
*
- * @param cls the respective 'struct NeighborList' to update
+ * @param cls the respective 'struct NeighbourList' to update
* @param tname name of the transport
* @param expiration expiration time
* @param addr the address
struct GNUNET_TIME_Absolute expiration,
const void *addr, size_t addrlen)
{
- struct NeighborList *n = cls;
+ struct NeighbourList *n = cls;
struct ForeignAddressList *fal;
fal = find_peer_address (n, tname, addr, addrlen);
{
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Adding address `%s' (%s) for peer `%4s' due to peerinfo data.\n",
+ "Adding address `%s' (%s) for peer `%4s' due to peerinfo data for %llums.\n",
GNUNET_a2s (addr, addrlen),
tname,
- GNUNET_i2s (&n->id));
+ GNUNET_i2s (&n->id),
+ expiration.value);
#endif
fal = add_peer_address (n, tname, addr, addrlen);
}
+ if (fal == NULL)
+ return GNUNET_OK;
fal->expires = GNUNET_TIME_absolute_max (expiration,
fal->expires);
fal->validated = GNUNET_YES;
*
* @param cls closure
* @param peer id of the peer, NULL for last call
- * @param hello hello message for the peer (can be NULL)
+ * @param h hello message for the peer (can be NULL)
* @param trust amount of trust we have in the peer (not used)
*/
static void
struct GNUNET_HELLO_Message *plain_hello;
struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
struct GNUNET_PeerIdentity target;
- struct NeighborList *n;
+ struct NeighbourList *n;
if (peer == NULL)
{
if (h == NULL)
return;
chvc->hello_known = GNUNET_YES;
- n = find_neighbor (peer);
+ n = find_neighbour (peer);
if (n != NULL)
- GNUNET_HELLO_iterate_addresses (h,
- GNUNET_NO,
- &add_to_foreign_address_list,
- n);
+ {
+ GNUNET_HELLO_iterate_addresses (h,
+ GNUNET_NO,
+ &add_to_foreign_address_list,
+ n);
+ try_transmission_to_peer (n);
+ }
GNUNET_HELLO_iterate_new_addresses (chvc->hello,
h,
- GNUNET_TIME_absolute_get (),
+ GNUNET_TIME_relative_to_absolute (HELLO_REVALIDATION_START_TIME),
&run_validation,
chvc);
}
/**
- * The peer specified by the given neighbor has timed-out or a plugin
+ * The peer specified by the given neighbour has timed-out or a plugin
* has disconnected. We may either need to do nothing (other plugins
* still up), or trigger a full disconnect and clean up. This
* function updates our state and does the necessary notifications.
- * Also notifies our clients that the neighbor is now officially
+ * Also notifies our clients that the neighbour is now officially
* gone.
*
- * @param n the neighbor list entry for the peer
+ * @param n the neighbour list entry for the peer
* @param check should we just check if all plugins
* disconnected or must we ask all plugins to
* disconnect?
*/
static void
-disconnect_neighbor (struct NeighborList *current_handle, int check)
+disconnect_neighbour (struct NeighbourList *n, int check)
{
struct ReadyList *rpos;
- struct NeighborList *npos;
- struct NeighborList *nprev;
- struct NeighborList *n;
+ struct NeighbourList *npos;
+ struct NeighbourList *nprev;
struct MessageQueue *mq;
struct ForeignAddressList *peer_addresses;
struct ForeignAddressList *peer_pos;
- if (neighbors == NULL)
- return; /* We don't have any neighbors, so client has an already removed handle! */
-
- npos = neighbors;
- while ((npos != NULL) && (current_handle != npos))
- npos = npos->next;
-
- if (npos == NULL)
- return; /* Couldn't find neighbor in existing list, must have been already removed! */
- else
- n = npos;
-
if (GNUNET_YES == check)
{
rpos = n->plugins;
rpos = rpos->next;
}
}
-
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id));
+ "Disconnecting from `%4s'\n",
+ GNUNET_i2s (&n->id));
#endif
- /* remove n from neighbors list */
+ /* remove n from neighbours list */
nprev = NULL;
- npos = neighbors;
+ npos = neighbours;
while ((npos != NULL) && (npos != n))
{
nprev = npos;
}
GNUNET_assert (npos != NULL);
if (nprev == NULL)
- neighbors = n->next;
+ neighbours = n->next;
else
nprev->next = n->next;
/* notify all clients about disconnect */
- notify_clients_disconnect (&n->id);
+ if (GNUNET_YES == n->received_pong)
+ notify_clients_disconnect (&n->id);
/* clean up all plugins, cancel connections and pending transmissions */
while (NULL != (rpos = n->plugins))
{
n->plugins = rpos->next;
- if (GNUNET_YES == rpos->connected)
- rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
+ rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
while (rpos->addresses != NULL)
{
}
/* free all messages on the queue */
- while (NULL != (mq = n->messages))
+ while (NULL != (mq = n->messages_head))
{
- n->messages = mq->next;
- GNUNET_assert (0 == memcmp(&mq->neighbor_id,
+ GNUNET_CONTAINER_DLL_remove (n->messages_head,
+ n->messages_tail,
+ mq);
+ GNUNET_assert (0 == memcmp(&mq->neighbour_id,
&n->id,
sizeof(struct GNUNET_PeerIdentity)));
GNUNET_free (mq);
}
if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+ {
+ GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+ n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (n->retry_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (sched, n->retry_task);
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ }
/* finally, free n itself */
GNUNET_free (n);
}
struct TransportPingMessage *ping;
struct TransportPongMessage *pong;
uint16_t msize;
- struct NeighborList *n;
+ struct NeighbourList *n;
struct ReadyList *rl;
struct ForeignAddressList *fal;
GNUNET_CRYPTO_rsa_sign (my_private_key,
&pong->purpose, &pong->signature));
- n = find_neighbor(peer);
+ n = find_neighbour(peer);
if (n == NULL)
- n = setup_new_neighbor(peer);
+ n = setup_new_neighbour(peer);
/* broadcast 'PONG' to all available addresses */
rl = n->plugins;
while (rl != NULL)
while (fal != NULL)
{
transmit_to_peer(NULL, fal,
- TRANSPORT_DEFAULT_PRIORITY,
+ TRANSPORT_PONG_PRIORITY,
+ HELLO_VERIFICATION_TIMEOUT,
(const char *)pong,
ntohs(pong->header.size),
GNUNET_YES,
}
+/**
+ * Calculate how long we should delay reading from the TCP socket to
+ * ensure that we stay within our bandwidth limits (push back).
+ *
+ * @param n for which neighbour should this be calculated
+ * @return how long to delay receiving more data
+ */
+static struct GNUNET_TIME_Relative
+calculate_throttle_delay (struct NeighbourList *n)
+{
+ struct GNUNET_TIME_Relative ret;
+ struct GNUNET_TIME_Absolute now;
+ uint64_t del;
+ uint64_t avail;
+ uint64_t excess;
+
+ now = GNUNET_TIME_absolute_get ();
+ del = now.value - n->last_quota_update.value;
+ if (del > MAX_BANDWIDTH_CARRY)
+ {
+ update_quota (n, GNUNET_YES);
+ del = now.value - n->last_quota_update.value;
+ GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
+ }
+ if (n->quota_in == 0)
+ n->quota_in = 1; /* avoid divison by zero */
+ avail = del * n->quota_in;
+ if (avail > n->last_received)
+ return GNUNET_TIME_UNIT_ZERO; /* can receive right now */
+ excess = n->last_received - avail;
+ ret.value = excess / n->quota_in;
+ if (ret.value > 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Throttling read (%llu bytes excess at %llu b/ms), waiting %llums before reading more.\n",
+ (unsigned long long) excess,
+ (unsigned long long) n->quota_in,
+ (unsigned long long) ret.value);
+ return ret;
+}
+
+
/**
* Function called by the plugin for each received message.
* Update data volumes, possibly notify plugins about
* and generally forward to our receive callback.
*
* @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- * (opaque to transport service)
- * @param sender_address_len the length of the sender address
* @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- * for future receive calls for messages from this
- * particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ * (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
unsigned int distance, const char *sender_address,
struct InboundMessage *im;
struct ForeignAddressList *peer_address;
uint16_t msize;
- struct NeighborList *n;
+ struct NeighbourList *n;
- n = find_neighbor (peer);
+ n = find_neighbour (peer);
if (n == NULL)
- {
- if (message == NULL)
- return; /* disconnect of peer already marked down */
- n = setup_new_neighbor (peer);
-
- }
+ n = setup_new_neighbour (peer);
+ update_quota (n, GNUNET_NO);
service_context = n->plugins;
while ((service_context != NULL) && (plugin != service_context->plugin))
service_context = service_context->next;
GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
- if (message == NULL)
+ if (message != NULL)
{
+ peer_address = add_peer_address(n,
+ plugin->short_name,
+ sender_address,
+ sender_address_len);
+ if (peer_address != NULL)
+ {
+ peer_address->distance = distance;
+ if (peer_address->connected == GNUNET_NO)
+ {
+ peer_address->connected = GNUNET_YES;
+ peer_address->connect_attempts++;
+ }
+ peer_address->timeout
+ =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ }
+ /* update traffic received amount ... */
+ msize = ntohs (message->size);
+ n->distance = distance;
+ n->peer_timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_SCHEDULER_cancel (sched,
+ n->timeout_task);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+ {
+ /* dropping message due to frequent inbound volume violations! */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"),
+ n->quota_violation_count);
+ return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+ }
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_HELLO:
+ process_hello (plugin, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+ handle_ping(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+ handle_pong(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ default:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Receive failed from `%4s', triggering disconnect\n",
- GNUNET_i2s (&n->id));
-#endif
- /* TODO: call stats */
- if (service_context != NULL)
- service_context->connected = GNUNET_NO;
- disconnect_neighbor (n, GNUNET_YES);
- return;
- }
- peer_address = add_peer_address(n,
- plugin->short_name,
- sender_address,
- sender_address_len);
- if (service_context != NULL)
- {
- if (service_context->connected == GNUNET_NO)
- {
- service_context->connected = GNUNET_YES;
- /* FIXME: What to do here? Should we use these as well,
- to specify some Address in the AddressList should be
- available? */
- peer_address->transmit_ready = GNUNET_YES;
- peer_address->connect_attempts++;
- }
- peer_address->timeout
- =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- }
- /* update traffic received amount ... */
- msize = ntohs (message->size);
- n->last_received += msize;
- GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- n->timeout_task =
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbor_timeout_task, n);
- update_quota (n);
- if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
- {
- /* dropping message due to frequent inbound volume violations! */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
- GNUNET_ERROR_TYPE_BULK,
- _
- ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
- /* TODO: call stats */
- return;
- }
- switch (ntohs (message->type))
- {
- case GNUNET_MESSAGE_TYPE_HELLO:
- process_hello (plugin, message);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
- handle_ping(plugin, message, peer, sender_address, sender_address_len);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
- handle_pong(plugin, message, peer, sender_address, sender_address_len);
- break;
- default:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from `%4s', sending to all clients.\n",
- ntohs (message->type), GNUNET_i2s (peer));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s', sending to all clients.\n",
+ ntohs (message->type), GNUNET_i2s (peer));
#endif
- /* transmit message to all clients */
- im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
- im->header.size = htons (sizeof (struct InboundMessage) + msize);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = n->latency;
- im->peer = *peer;
- memcpy (&im[1], message, msize);
-
- cpos = clients;
- while (cpos != NULL)
- {
- transmit_to_client (cpos, &im->header, GNUNET_YES);
- cpos = cpos->next;
- }
- GNUNET_free (im);
- }
+ /* transmit message to all clients */
+ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+ im->header.size = htons (sizeof (struct InboundMessage) + msize);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->latency = GNUNET_TIME_relative_hton (n->latency);
+ im->peer = *peer;
+ memcpy (&im[1], message, msize);
+ cpos = clients;
+ while (cpos != NULL)
+ {
+ transmit_to_client (cpos, &im->header, GNUNET_YES);
+ cpos = cpos->next;
+ }
+ GNUNET_free (im);
+ }
+ }
+ return calculate_throttle_delay (n);
}
{
struct TransportClient *c;
struct ConnectInfoMessage cim;
- struct NeighborList *n;
- struct InboundMessage *im;
- struct GNUNET_MessageHeader *ack;
+ struct NeighbourList *n;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
/* tell new client about all existing connections */
cim.header.size = htons (sizeof (struct ConnectInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim.quota_out =
- htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
- cim.latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */
- im = GNUNET_malloc (sizeof (struct InboundMessage) +
- sizeof (struct GNUNET_MessageHeader));
- im->header.size = htons (sizeof (struct InboundMessage) +
- sizeof (struct GNUNET_MessageHeader));
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); /* FIXME? */
- ack = (struct GNUNET_MessageHeader *) &im[1];
- ack->size = htons (sizeof (struct GNUNET_MessageHeader));
- ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
- for (n = neighbors; n != NULL; n = n->next)
- {
- cim.id = n->id;
- transmit_to_client (c, &cim.header, GNUNET_NO);
- if (n->received_pong)
- {
- im->peer = n->id;
- transmit_to_client (c, &im->header, GNUNET_NO);
+ n = neighbours;
+ while (n != NULL)
+ {
+ if (GNUNET_YES == n->received_pong)
+ {
+ cim.id = n->id;
+ cim.latency = GNUNET_TIME_relative_hton (n->latency);
+ cim.distance = htonl (n->distance);
+ transmit_to_client (c, &cim.header, GNUNET_NO);
}
+ n = n->next;
}
- GNUNET_free (im);
- }
- else
- {
- fprintf(stderr, "Our hello is NULL!\n");
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
{
int ret;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client\n", "HELLO");
-#endif
ret = process_hello (NULL, message);
GNUNET_SERVER_receive_done (client, ret);
}
const struct GNUNET_MessageHeader *message)
{
struct TransportClient *tc;
- struct NeighborList *n;
+ struct NeighbourList *n;
const struct OutboundMessage *obm;
const struct GNUNET_MessageHeader *obmm;
uint16_t size;
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- n = find_neighbor (&obm->peer);
+ n = find_neighbour (&obm->peer);
if (n == NULL)
- n = setup_new_neighbor (&obm->peer); /* But won't ever add address, we have none! */
+ n = setup_new_neighbour (&obm->peer);
tc = clients;
while ((tc != NULL) && (tc->client != client))
tc = tc->next;
ntohs (obmm->size),
ntohs (obmm->type), GNUNET_i2s (&obm->peer));
#endif
- transmit_to_peer (tc, NULL, ntohl (obm->priority), (char *)obmm,
+ transmit_to_peer (tc, NULL, ntohl (obm->priority),
+ GNUNET_TIME_relative_ntoh (obm->timeout),
+ (char *)obmm,
ntohs (obmm->size), GNUNET_NO, n);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
{
const struct QuotaSetMessage *qsm =
(const struct QuotaSetMessage *) message;
- struct NeighborList *n;
- struct TransportPlugin *p;
- struct ReadyList *rl;
+ struct NeighbourList *n;
+ uint32_t qin;
- n = find_neighbor (&qsm->peer);
+ n = find_neighbour (&qsm->peer);
if (n == NULL)
{
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return;
}
-
+ qin = ntohl (qsm->quota_in);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
- "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer));
+ "SET_QUOTA", qin, n->quota_in, GNUNET_i2s (&qsm->peer));
#endif
-
- update_quota (n);
- if (n->quota_in < ntohl (qsm->quota_in))
+ update_quota (n, GNUNET_YES);
+ if (n->quota_in < qin)
n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->quota_in = ntohl (qsm->quota_in);
- rl = n->plugins;
- while (rl != NULL)
- {
- p = rl->plugin;
- p->api->set_receive_quota (p->api->cls,
- &qsm->peer, ntohl (qsm->quota_in));
- rl = rl->next;
- }
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
- * Handle TRY_CONNECT-message.
- *
- * @param cls closure (always NULL)
- * @param client identification of the client
- * @param message the actual message
- */
-static void
-handle_try_connect (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
-{
- const struct TryConnectMessage *tcm;
- struct NeighborList *neighbor;
- tcm = (const struct TryConnectMessage *) message;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client %p asking to connect to `%4s'\n",
- "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer));
-#endif
- neighbor = find_neighbor(&tcm->peer);
- if (neighbor == NULL)
- setup_new_neighbor (&tcm->peer);
+ n->quota_in = qin;
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
{&handle_set_quota, NULL,
GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
- {&handle_try_connect, NULL,
- GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
- sizeof (struct TryConnectMessage)},
{&handle_address_lookup, NULL,
GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP,
0},
plug->env.cls = plug;
plug->env.receive = &plugin_env_receive;
plug->env.notify_address = &plugin_env_notify_address;
- plug->env.default_quota_in =
- (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
plug->env.max_connections = max_connect_per_transport;
}
return;
while (NULL != (mqe = pos->message_queue_head))
{
- pos->message_queue_head = mqe->next;
+ GNUNET_CONTAINER_DLL_remove (pos->message_queue_head,
+ pos->message_queue_tail,
+ mqe);
+ pos->message_count--;
GNUNET_free (mqe);
}
- pos->message_queue_head = NULL;
if (prev == NULL)
clients = pos->next;
else
pos->client = NULL;
return;
}
+ if (pos->th != NULL)
+ {
+ GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
+ pos->th = NULL;
+ }
+ GNUNET_break (0 == pos->message_count);
GNUNET_free (pos);
}
struct OwnAddressList *al;
struct CheckHelloValidatedContext *chvc;
+ while (neighbours != NULL)
+ disconnect_neighbour (neighbours, GNUNET_NO);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transport service is unloading plugins...\n");