#include "plugin_transport.h"
#include "transport.h"
+/**
+ * Should we do some additional checks (to validate behavior
+ * of clients)?
+ */
+#define EXTRA_CHECKS GNUNET_YES
+
/**
* How many messages can we have pending for a given client process
* before we start to drop incoming messages? We typically should
struct GNUNET_TIME_Absolute timeout;
/**
- * Is this plugin currently connected? The first time we transmit
- * or send data to a peer via a particular plugin, we set this to
- * GNUNET_YES. If we later get an error (disconnect notification or
- * transmission failure), we set it back to GNUNET_NO. Each time
- * the value is set to GNUNET_YES, we increment the
- * "connect_attempts" counter. If that one reaches a particular
- * threshold, we consider the address to not be working properly at
- * this time and remove it from the eligible list.
+ * Are we currently connected via this address? The first time we
+ * successfully transmit or receive data to a peer via a particular
+ * address, we set this to GNUNET_YES. If we later get an error
+ * (disconnect notification, transmission failure, timeout), we set
+ * it back to GNUNET_NO.
*/
int connected;
/**
- * Is this plugin ready to transmit to the specific target?
- * GNUNET_NO if not. Initially, all plugins are marked ready. If a
- * transmission is in progress, "transmit_ready" is set to
- * GNUNET_NO.
+ * Is this plugin currently busy transmitting to the specific target?
+ * GNUNET_NO if not (initial, default state is GNUNET_NO). Internal
+ * messages do not count as 'in transmit'.
*/
- int transmit_ready;
+ int in_transmit;
/**
* Has this address been validated yet?
int validated;
/**
- * How often have we tried to connect using this plugin?
+ * How often have we tried to connect using this plugin? Used to
+ * discriminate against addresses that do not work well.
+ * FIXME: not yet used, but should be!
*/
unsigned int connect_attempts;
{
/**
- * This is a linked list.
+ * This is a doubly linked list.
*/
struct MessageQueue *next;
+ /**
+ * This is a doubly linked list.
+ */
+ struct MessageQueue *prev;
+
/**
* The message(s) we want to transmit, GNUNET_MessageHeader(s)
* stuck together in memory. Allocated at the end of this struct.
*/
struct ForeignAddressList *addresses;
- /**
- * Is the plugin represented by this entry currently connected to
- * the respective peer?
- */
- int connected;
-
};
struct ReadyList *plugins;
/**
- * List of messages we would like to send to this peer;
+ * Head of list of messages we would like to send to this peer;
* must contain at most one message per client.
*/
- struct MessageQueue *messages;
+ struct MessageQueue *messages_head;
+
+ /**
+ * Tail of list of messages we would like to send to this peer; must
+ * contain at most one message per client.
+ */
+ struct MessageQueue *messages_tail;
/**
* Identity of this neighbor.
* over multiple transports. This value reflects how long it took
* us to receive a response when SENDING via this particular
* transport/neighbor/address combination!
- * FIXME: why is this NBO?
+ *
+ * FIXME: we need to periodically send PINGs to update this
+ * latency (at least more often than the current "huge" (11h?)
+ * update interval).
*/
- struct GNUNET_TIME_RelativeNBO latency;
+ struct GNUNET_TIME_Relative latency;
/**
* How many bytes have we received since the "last_quota_update"
if (result == GNUNET_OK)
{
- mq->specific_address->timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ if (mq->specific_address != NULL)
+ {
+ mq->specific_address->timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ mq->specific_address->connected = GNUNET_YES;
+ }
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmission to peer `%s' failed, marking connection as down.\n",
GNUNET_i2s (target));
- mq->specific_address->connected = GNUNET_NO;
- }
- if (!mq->internal_msg)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Setting transmit_ready on transport!\n");
-#endif
- mq->specific_address->transmit_ready = GNUNET_YES;
+ if (mq->specific_address != NULL)
+ mq->specific_address->connected = GNUNET_NO;
}
+ if ( (! mq->internal_msg) &&
+ (mq->specific_address != NULL) )
+ mq->specific_address->in_transmit = GNUNET_NO;
if (mq->client != NULL)
{
if ( ( (best_address == NULL) ||
(addresses->connected == GNUNET_YES) ||
(best_address->connected == GNUNET_NO) ) &&
- (addresses->transmit_ready == GNUNET_YES) &&
+ (addresses->in_transmit == GNUNET_NO) &&
( (best_address == NULL) ||
(addresses->latency.value < best_address->latency.value)) )
best_address = addresses;
struct ReadyList *rl;
struct MessageQueue *mq;
- if (neighbor->messages == NULL)
+ if (neighbor->messages_head == NULL)
return; /* nothing to do */
min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
rl = NULL;
- mq = neighbor->messages;
+ mq = neighbor->messages_head;
if (mq->specific_address == NULL)
mq->specific_address = find_ready_address(neighbor);
if (mq->specific_address == NULL)
#endif
return; /* nobody ready */
}
+ if (mq->specific_address->connected == GNUNET_NO)
+ mq->specific_address->connect_attempts++;
rl = mq->specific_address->ready_list;
- neighbor->messages = mq->next;
+ GNUNET_CONTAINER_DLL_remove (neighbor->messages_head,
+ neighbor->messages_tail,
+ mq);
mq->plugin = rl->plugin;
if (!mq->internal_msg)
- mq->specific_address->transmit_ready = GNUNET_NO;
+ mq->specific_address->in_transmit = GNUNET_YES;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending message of size %u for `%4s' to `%s' via plugin `%s'\n",
* @param priority how important is the message
* @param message_buf message(s) to send GNUNET_MessageHeader(s)
* @param message_buf_size total size of all messages in message_buf
- * @param is_internal is this an internal message
+ * @param is_internal is this an internal message; these are pre-pended and
+ * also do not count for plugins being "ready" to transmit
* @param neighbor handle to the neighbor for transmission
*/
static void
int is_internal, struct NeighborList *neighbor)
{
struct MessageQueue *mq;
- struct MessageQueue *mqe;
+#if EXTRA_CHECKS
if (client != NULL)
{
/* check for duplicate submission */
- mq = neighbor->messages;
+ mq = neighbor->messages_head;
while (NULL != mq)
{
if (mq->client == client)
mq = mq->next;
}
}
+#endif
mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
mq->specific_address = peer_address;
mq->client = client;
mq->internal_msg = is_internal;
mq->priority = priority;
- if (is_internal)
- {
- /* append at head */
- mq->next = neighbor->messages;
- neighbor->messages = mq;
- }
+ if (is_internal)
+ GNUNET_CONTAINER_DLL_insert (neighbor->messages_head,
+ neighbor->messages_tail,
+ mq);
else
- {
- /* find tail */
- mqe = neighbor->messages;
- if (mqe != NULL)
- while (mqe->next != NULL)
- mqe = mqe->next;
- if (mqe == NULL)
- {
- /* new head */
- neighbor->messages = mq;
- }
- else
- {
- /* append */
- mqe->next = mq;
- }
- }
+ GNUNET_CONTAINER_DLL_insert_after (neighbor->messages_head,
+ neighbor->messages_tail,
+ neighbor->messages_tail,
+ mq);
try_transmission_to_peer (neighbor);
}
ret->expires = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
ret->latency = GNUNET_TIME_relative_get_forever();
- ret->transmit_ready = GNUNET_YES;
ret->timeout = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
ret->ready_list = head;
fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
fal->validated = GNUNET_YES;
fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time);
+ if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
+ n->latency = fal->latency;
+ else
+ n->latency.value = (fal->latency.value + n->latency.value) / 2;
}
/* clean up validation entry */
}
tp = tp->next;
}
+ n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
&neighbor_timeout_task, n);
"HELLO", hello_size,
"PING", sizeof (struct TransportPingMessage));
#endif
- transmit_to_peer(NULL, peer_address,
- GNUNET_SCHEDULER_PRIORITY_DEFAULT,
- message_buf, tsize,
- GNUNET_YES, neighbor);
+ transmit_to_peer (NULL, peer_address,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ message_buf, tsize,
+ GNUNET_YES, neighbor);
GNUNET_free(message_buf);
return GNUNET_OK;
}
#endif
fal = add_peer_address (n, tname, addr, addrlen);
}
+ if (fal == NULL)
+ return GNUNET_OK;
fal->expires = GNUNET_TIME_absolute_max (expiration,
fal->expires);
fal->validated = GNUNET_YES;
while (NULL != (rpos = n->plugins))
{
n->plugins = rpos->next;
- if (GNUNET_YES == rpos->connected)
- rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
+ rpos->plugin->api->disconnect (rpos->plugin->api->cls, &n->id);
while (rpos->addresses != NULL)
{
}
/* free all messages on the queue */
- while (NULL != (mq = n->messages))
+ while (NULL != (mq = n->messages_head))
{
- n->messages = mq->next;
+ GNUNET_CONTAINER_DLL_remove (n->messages_head,
+ n->messages_tail,
+ mq);
GNUNET_assert (0 == memcmp(&mq->neighbor_id,
&n->id,
sizeof(struct GNUNET_PeerIdentity)));
if (message == NULL)
return; /* disconnect of peer already marked down */
n = setup_new_neighbor (peer);
-
}
service_context = n->plugins;
while ((service_context != NULL) && (plugin != service_context->plugin))
GNUNET_i2s (&n->id));
#endif
/* TODO: call stats */
- if (service_context != NULL)
- service_context->connected = GNUNET_NO;
disconnect_neighbor (n, GNUNET_YES);
return;
}
plugin->short_name,
sender_address,
sender_address_len);
- if (service_context != NULL)
+ if (peer_address != NULL)
{
- if (service_context->connected == GNUNET_NO)
+ if (peer_address->connected == GNUNET_NO)
{
- service_context->connected = GNUNET_YES;
- /* FIXME: What to do here? Should we use these as well,
- to specify some Address in the AddressList should be
- available? */
- peer_address->transmit_ready = GNUNET_YES;
+ peer_address->connected = GNUNET_YES;
peer_address->connect_attempts++;
}
peer_address->timeout
im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
im->header.size = htons (sizeof (struct InboundMessage) + msize);
im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = n->latency;
+ im->latency = GNUNET_TIME_relative_hton (n->latency);
im->peer = *peer;
memcpy (&im[1], message, msize);