*
* TODO:
* Implement next:
- * - dv hop-by-hop signature verification (at least at initiator)
- * - change transport-core API to provide proper flow control in both
- * directions, allow multiple messages per peer simultaneously (tag
- * confirmations with unique message ID), and replace quota-out with
- * proper flow control; specify transmission preferences (latency,
+ * - add (more) logging
+ * - change transport-core API to specify transmission preferences (latency,
* reliability, etc.) per message!
- * - add logging
- *
- * Later:
* - review retransmission logic, right now there is no smartness there!
- * => congestion control, flow control, etc
+ * => congestion control, flow control, etc [PERFORMANCE-BASICS]
*
* Optimizations:
* - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
- * => Need 128 bit hash map though!
+ * => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
* - queue_send_msg and route_message both by API design have to make copies
* of the payload, and route_message on top of that requires a malloc/free.
- * Change design to approximate "zero" copy better...
+ * Change design to approximate "zero" copy better... [CPU]
* - could avoid copying body of message into each fragment and keep
* fragments as just pointers into the original message and only
* fully build fragments just before transmission (optimization, should
- * reduce CPU and memory use)
+ * reduce CPU and memory use) [CPU, MEMORY]
* - if messages are below MTU, consider adding ACKs and other stuff
- * (requires planning at receiver, and additional MST-style demultiplex
- * at receiver!)
+ * to the same transmission to avoid tiny messages (requires planning at
+ * receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
* - When we passively learned DV (with unconfirmed freshness), we
* right now add the path to our list but with a zero path_valid_until
* time and only use it for unconfirmed routes. However, we could consider
* triggering an explicit validation mechansim ourselves, specifically routing
- * a challenge-response message over the path (OPTIMIZATION-FIXME).
+ * a challenge-response message over the path [ROUTING]
+ * - Track ACK losses based on ACK-counter [ROUTING]
*
* Design realizations / discussion:
* - communicators do flow control by calling MQ "notify sent"
*/
#define MAX_DV_DISCOVERY_SELECTION 16
+/**
+ * Window size. How many messages to the same target do we pass
+ * to CORE without a RECV_OK in between? Small values limit
+ * thoughput, large values will increase latency.
+ *
+ * FIXME-OPTIMIZE: find out what good values are experimentally,
+ * maybe set adaptively (i.e. to observed available bandwidth).
+ */
+#define RECV_WINDOW_SIZE 4
+
/**
* Minimum number of hops we should forward DV learn messages
* even if they are NOT useful for us in hope of looping
struct DistanceVectorHop;
+/**
+ * Context from #handle_incoming_msg(). Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
+{
+
+ /**
+ * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+ * flow control to unchoke.
+ */
+ struct CommunicatorMessageContext *next;
+
+ /**
+ * Kept in a DLL of `struct VirtualLink` if waiting for CORE
+ * flow control to unchoke.
+ */
+ struct CommunicatorMessageContext *prev;
+
+ /**
+ * Which communicator provided us with the message.
+ */
+ struct TransportClient *tc;
+
+ /**
+ * Additional information for flow control and about the sender.
+ */
+ struct GNUNET_TRANSPORT_IncomingMessage im;
+
+ /**
+ * Number of hops the message has travelled (if DV-routed).
+ * FIXME: make use of this in ACK handling!
+ */
+ uint16_t total_hops;
+};
+
+
+/**
+ * A virtual link is another reachable peer that is known to CORE. It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`. With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink
+{
+ /**
+ * Identity of the peer at the other end of the link.
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * Communicators blocked for receiving on @e target as we are waiting
+ * on the @e core_recv_window to increase.
+ */
+ struct CommunicatorMessageContext *cmc_head;
+
+ /**
+ * Communicators blocked for receiving on @e target as we are waiting
+ * on the @e core_recv_window to increase.
+ */
+ struct CommunicatorMessageContext *cmc_tail;
+
+ /**
+ * Task scheduled to possibly notfiy core that this peer is no
+ * longer counting as confirmed. Runs the #core_visibility_check(),
+ * which checks that some DV-path or a queue exists that is still
+ * considered confirmed.
+ */
+ struct GNUNET_SCHEDULER_Task *visibility_task;
+
+ /**
+ * Neighbour used by this virtual link, NULL if @e dv is used.
+ */
+ struct Neighbour *n;
+
+ /**
+ * Distance vector used by this virtual link, NULL if @e n is used.
+ */
+ struct DistanceVector *dv;
+
+ /**
+ * How many more messages can we send to core before we exhaust
+ * the receive window of CORE for this peer? If this hits zero,
+ * we must tell communicators to stop providing us more messages
+ * for this peer. In fact, the window can go negative as we
+ * have multiple communicators, so per communicator we can go
+ * down by one into the negative range.
+ */
+ int core_recv_window;
+};
+
+
/**
* Data structure kept when we are waiting for an acknowledgement.
*/
struct GNUNET_SCHEDULER_Task *timeout_task;
/**
- * Task scheduled to possibly notfiy core that this queue is no longer
- * counting as confirmed. Runs the #core_queue_visibility_check().
- */
- struct GNUNET_SCHEDULER_Task *visibility_task;
-
- /**
- * Quota at which CORE is allowed to transmit to this peer
- * (note that the value CORE should actually be told is this
- * value plus the respective value in `struct Neighbour`).
- * Should match the sum of the quotas of all of the paths.
- *
- * FIXME: not yet set, tricky to get right given multiple paths,
- * many of which may be inactive! (=> Idea: measure???)
- * FIXME: how do we set this value initially when we tell CORE?
- * Options: start at a minimum value or at literally zero?
- * (=> Current thought: clean would be zero!)
- */
- struct GNUNET_BANDWIDTH_Value32NBO quota_out;
-
- /**
- * Is one of the DV paths in this struct 'confirmed' and thus
- * the cause for CORE to see this peer as connected? (Note that
- * the same may apply to a `struct Neighbour` at the same time.)
+ * Do we have a confirmed working queue and are thus visible to
+ * CORE? If so, this is the virtual link, otherwise NULL.
*/
- int core_visible;
+ struct VirtualLink *link;
};
*/
struct GNUNET_SCHEDULER_Task *transmit_task;
- /**
- * Task scheduled to possibly notfiy core that this queue is no longer
- * counting as confirmed. Runs the #core_queue_visibility_check().
- */
- struct GNUNET_SCHEDULER_Task *visibility_task;
-
/**
* How long do *we* consider this @e address to be valid? In the past or
* zero if we have not yet validated it. Can be updated based on
*/
struct Queue *queue_tail;
- /**
- * Task run to cleanup pending messages that have exceeded their timeout.
- */
- struct GNUNET_SCHEDULER_Task *timeout_task;
-
/**
* Handle for an operation to fetch @e last_dv_learn_monotime information from
* the PEERSTORE, or NULL.
struct GNUNET_PEERSTORE_StoreContext *sc;
/**
- * Quota at which CORE is allowed to transmit to this peer
- * (note that the value CORE should actually be told is this
- * value plus the respective value in `struct DistanceVector`).
- * Should match the sum of the quotas of all of the queues.
- *
- * FIXME: not yet set, tricky to get right given multiple queues!
- * (=> Idea: measure???)
- * FIXME: how do we set this value initially when we tell CORE?
- * Options: start at a minimum value or at literally zero?
- * (=> Current thought: clean would be zero!)
+ * Do we have a confirmed working queue and are thus visible to
+ * CORE? If so, this is the virtual link, otherwise NULL.
*/
- struct GNUNET_BANDWIDTH_Value32NBO quota_out;
+ struct VirtualLink *link;
/**
* Latest DVLearn monotonic time seen from this peer. Initialized only
*/
struct GNUNET_TIME_Absolute last_dv_learn_monotime;
- /**
- * What is the earliest timeout of any message in @e pending_msg_tail?
- */
- struct GNUNET_TIME_Absolute earliest_timeout;
-
- /**
- * Do we have a confirmed working queue and are thus visible to
- * CORE?
- */
- int core_visible;
-
/**
* Do we have the lastest value for @e last_dv_learn_monotime from
* PEERSTORE yet, or are we still waiting for a reply of PEERSTORE?
*/
static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
+/**
+ * Map from PIDs to `struct VirtualLink` entries describing
+ * links CORE knows to exist.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *links;
+
/**
* Map from challenges to `struct LearnLaunchEntry` values.
*/
}
+/**
+ * Free virtual link.
+ *
+ * @param vl link data to free
+ */
+static void
+free_virtual_link (struct VirtualLink *vl)
+{
+ GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
+ if (NULL != vl->visibility_task)
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ vl->visibility_task = NULL;
+ }
+ GNUNET_break (NULL == vl->n);
+ GNUNET_break (NULL == vl->dv);
+ GNUNET_free (vl);
+}
+
+
/**
* Free validation state.
*
GNUNET_assert (
GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
- if (NULL != dv->visibility_task)
- GNUNET_SCHEDULER_cancel (dv->visibility_task);
if (NULL != dv->timeout_task)
GNUNET_SCHEDULER_cancel (dv->timeout_task);
GNUNET_free (dv);
GNUNET_CONTAINER_multipeermap_remove (neighbours,
&neighbour->pid,
neighbour));
- if (NULL != neighbour->timeout_task)
- GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
if (NULL != neighbour->reassembly_map)
{
GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map,
*
* @param tc client to inform (must be CORE client)
* @param pid peer the connection is for
- * @param quota_out current quota for the peer
*/
static void
core_send_connect_info (struct TransportClient *tc,
- const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+ const struct GNUNET_PeerIdentity *pid)
{
struct GNUNET_MQ_Envelope *env;
struct ConnectInfoMessage *cim;
GNUNET_assert (CT_CORE == tc->type);
env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim->quota_out = quota_out;
cim->id = *pid;
GNUNET_MQ_send (tc->mq, env);
}
* Send message to CORE clients that we gained a connection
*
* @param pid peer the queue was for
- * @param quota_out current quota for the peer
*/
static void
-cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Informing CORE clients about connection to %s\n",
{
if (CT_CORE != tc->type)
continue;
- core_send_connect_info (tc, pid, quota_out);
+ core_send_connect_info (tc, pid);
}
}
/**
- * Check whether the CORE visibility of @a n changed. If so,
- * check whether we need to notify CORE.
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
*
- * @param n neighbour to perform the check for
+ * @param cls a `struct VirtualLink`
*/
static void
-update_neighbour_core_visibility (struct Neighbour *n);
+check_link_down (void *cls)
+{
+ struct VirtualLink *vl = cls;
+ struct DistanceVector *dv = vl->dv;
+ struct Neighbour *n = vl->n;
+ struct GNUNET_TIME_Absolute dvh_timeout;
+ struct GNUNET_TIME_Absolute q_timeout;
+
+ vl->visibility_task = NULL;
+ dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
+ if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ vl->dv = NULL;
+ q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+ for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+ q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
+ if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ vl->n = NULL;
+ if ((NULL == vl->n) && (NULL == vl->dv))
+ {
+ cores_send_disconnect_info (&dv->target);
+ free_virtual_link (vl);
+ return;
+ }
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
+ &check_link_down,
+ vl);
+}
/**
struct QueueEntry *qe;
int maxxed;
struct PendingAcknowledgement *pa;
+ struct VirtualLink *vl;
if (NULL != queue->transmit_task)
{
GNUNET_SCHEDULER_cancel (queue->transmit_task);
queue->transmit_task = NULL;
}
- if (NULL != queue->visibility_task)
- {
- GNUNET_SCHEDULER_cancel (queue->visibility_task);
- queue->visibility_task = NULL;
- }
while (NULL != (pa = queue->pa_head))
{
GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
GNUNET_free (queue);
- update_neighbour_core_visibility (neighbour);
- cores_send_disconnect_info (&neighbour->pid);
-
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
+ if ((NULL != vl) && (neighbour == vl->n))
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ check_link_down (vl);
+ }
if (NULL == neighbour->queue_head)
{
free_neighbour (neighbour);
void *value)
{
struct TransportClient *tc = cls;
- struct Neighbour *neighbour = value;
+ (void) value;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Telling new CORE client about existing connection to %s\n",
GNUNET_i2s (pid));
- core_send_connect_info (tc, pid, neighbour->quota_out);
+ core_send_connect_info (tc, pid);
return GNUNET_OK;
}
/**
- * Send a response to the @a pm that we have processed a
- * "send" request with status @a success. We
- * transmitted @a bytes_physical on the actual wire.
- * Sends a confirmation to the "core" client responsible
- * for the original request and free's @a pm.
+ * Send a response to the @a pm that we have processed a "send"
+ * request. Sends a confirmation to the "core" client responsible for
+ * the original request and free's @a pm.
*
* @param pm handle to the original pending message
- * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
- * for transmission failure
- * @param bytes_physical amount of bandwidth consumed
*/
static void
-client_send_response (struct PendingMessage *pm,
- int success,
- uint32_t bytes_physical)
+client_send_response (struct PendingMessage *pm)
{
struct TransportClient *tc = pm->client;
struct Neighbour *target = pm->target;
if (NULL != tc)
{
env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- som->success = htonl ((uint32_t) success);
- som->bytes_msg = htons (pm->bytes_msg);
- som->bytes_physical = htonl (bytes_physical);
som->peer = target->pid;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Confirming %s transmission of %u/%u bytes to %s\n",
- (GNUNET_OK == success) ? "successful" : "failed",
- (unsigned int) pm->bytes_msg,
- (unsigned int) bytes_physical,
+ "Confirming transmission to %s\n",
GNUNET_i2s (&pm->target->pid));
GNUNET_MQ_send (tc->mq, env);
}
}
-/**
- * Checks the message queue for a neighbour for messages that have timed
- * out and purges them.
- *
- * @param cls a `struct Neighbour`
- */
-static void
-check_queue_timeouts (void *cls)
-{
- struct Neighbour *n = cls;
- struct PendingMessage *pm;
- struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Absolute earliest_timeout;
-
- n->timeout_task = NULL;
- earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
- now = GNUNET_TIME_absolute_get ();
- for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm)
- {
- pm = pos->next_neighbour;
- if (pos->timeout.abs_value_us <= now.abs_value_us)
- {
- GNUNET_STATISTICS_update (GST_stats,
- "# messages dropped (timeout before confirmation)",
- 1,
- GNUNET_NO);
- client_send_response (pm, GNUNET_NO, 0);
- continue;
- }
- earliest_timeout =
- GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout);
- }
- n->earliest_timeout = earliest_timeout;
- if (NULL != n->pending_msg_head)
- n->timeout_task =
- GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n);
-}
-
-
/**
* Create a DV Box message.
*
const void *payload;
size_t payload_size;
struct TransportDVBoxMessage *dvb;
+ struct VirtualLink *vl;
GNUNET_assert (CT_CORE == tc->type);
obmm = (const struct GNUNET_MessageHeader *) &obm[1];
bytes_msg = ntohs (obmm->size);
- target = lookup_neighbour (&obm->peer);
- if (NULL == target)
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
- else
- dv = NULL;
- if ((NULL == target) && ((NULL == dv) || (GNUNET_NO == dv->core_visible)))
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
+ if (NULL == vl)
{
/* Failure: don't have this peer as a neighbour (anymore).
Might have gone down asynchronously, so this is NOT
a protocol violation by CORE. Still count the event,
as this should be rare. */
- struct GNUNET_MQ_Envelope *env;
- struct SendOkMessage *som;
-
- env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- som->success = htonl (GNUNET_SYSERR);
- som->bytes_msg = htonl (bytes_msg);
- som->bytes_physical = htonl (0);
- som->peer = obm->peer;
- GNUNET_MQ_send (tc->mq, env);
GNUNET_SERVICE_client_continue (tc->client);
GNUNET_STATISTICS_update (GST_stats,
"# messages dropped (neighbour unknown)",
GNUNET_NO);
return;
}
+ target = lookup_neighbour (&obm->peer);
+ if (NULL == target)
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
+ else
+ dv = NULL;
+ GNUNET_assert ((NULL != target) || (NULL != dv));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending %u bytes to %s using %s\n",
bytes_msg,
pm->client = tc;
pm->target = target;
pm->bytes_msg = payload_size;
- pm->timeout =
- GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
memcpy (&pm[1], payload, payload_size);
GNUNET_free_non_null (dvb);
dvb = NULL;
tc->details.core.pending_msg_head,
tc->details.core.pending_msg_tail,
pm);
- if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
- {
- target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
- if (NULL != target->timeout_task)
- GNUNET_SCHEDULER_cancel (target->timeout_task);
- target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
- &check_queue_timeouts,
- target);
- }
if (! was_empty)
return; /* all queues must already be busy */
for (struct Queue *queue = target->queue_head; NULL != queue;
}
+/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+ if (0 != ntohl (cmc->im.fc_on))
+ {
+ /* send ACK when done to communicator for flow control! */
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
+
+ env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
+ ack->reserved = htonl (0);
+ ack->fc_id = cmc->im.fc_id;
+ ack->sender = cmc->im.sender;
+ GNUNET_MQ_send (cmc->tc->mq, env);
+ }
+ GNUNET_SERVICE_client_continue (cmc->tc->client);
+ GNUNET_free (cmc);
+}
+
+
+/**
+ * Client confirms that it is done handling message(s) to a particular
+ * peer. We may now provide more messages to CORE for this peer.
+ *
+ * Notifies the respective queues that more messages can now be received.
+ *
+ * @param cls the client
+ * @param rom the message that was sent
+ */
+static void
+handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
+{
+ struct TransportClient *tc = cls;
+ struct VirtualLink *vl;
+ uint32_t delta;
+ struct CommunicatorMessageContext *cmc;
+
+ if (CT_CORE != tc->type)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
+ if (NULL == vl)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# RECV_OK dropped: virtual link unknown",
+ 1,
+ GNUNET_NO);
+ GNUNET_SERVICE_client_continue (tc->client);
+ return;
+ }
+ delta = ntohl (rom->increase_window_delta);
+ vl->core_recv_window += delta;
+ if (vl->core_recv_window <= 0)
+ return;
+ /* resume communicators */
+ while (NULL != (cmc = vl->cmc_tail))
+ {
+ GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc);
+ finish_cmc_handling (cmc);
+ }
+}
+
+
/**
* Communicator started. Process the request.
*
for (struct Queue *pos = n->queue_head; NULL != pos;
pos = pos->next_neighbour)
{
- /* Count the queue with the visibility task in all cases, as
- otherwise we may end up with no queues just because the
- time for the visibility task just expired but the scheduler
- just ran this task first */
if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
- (pos->validated_until.abs_value_us > now.abs_value_us) ||
- (NULL != pos->visibility_task))
+ (pos->validated_until.abs_value_us > now.abs_value_us))
candidates++;
}
if (0 == candidates)
{
- /* Given that we above check for pos->visibility task,
- this should be strictly impossible. */
- GNUNET_break (0);
+ /* This can happen rarely if the last confirmed queue timed
+ out just as we were beginning to process this message. */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# route selection failed (all no valid queue)",
+ 1,
+ GNUNET_NO);
return;
}
sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
for (struct Queue *pos = n->queue_head; NULL != pos;
pos = pos->next_neighbour)
{
- /* Count the queue with the visibility task in all cases, as
- otherwise we may end up with no queues just because the
- time for the visibility task just expired but the scheduler
- just ran this task first */
- if ((pos->validated_until.abs_value_us > now.abs_value_us) ||
- (NULL != pos->visibility_task))
+ if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
+ (pos->validated_until.abs_value_us > now.abs_value_us))
{
if ((sel1 == candidates) || (sel2 == candidates))
queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
struct GNUNET_MessageHeader *hdr,
enum RouteMessageOptions options)
{
+ struct VirtualLink *vl;
struct Neighbour *n;
struct DistanceVector *dv;
- n = lookup_neighbour (target);
- dv = (0 != (options & RMO_DV_ALLOWED))
- ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target)
- : NULL;
+ vl = GNUNET_CONTAINER_multipeermap_get (links, target);
+ n = vl->n;
+ dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
{
/* if confirmed is required, and we do not have anything
confirmed, drop respective options */
- if ((NULL != n) && (GNUNET_NO == n->core_visible))
- n = NULL;
- if ((NULL != dv) && (GNUNET_NO == dv->core_visible))
- dv = NULL;
+ if (NULL == n)
+ n = lookup_neighbour (target);
+ if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
}
if ((NULL == n) && (NULL == dv))
{
}
-/**
- * Context from #handle_incoming_msg(). Closure for many
- * message handlers below.
- */
-struct CommunicatorMessageContext
-{
- /**
- * Which communicator provided us with the message.
- */
- struct TransportClient *tc;
-
- /**
- * Additional information for flow control and about the sender.
- */
- struct GNUNET_TRANSPORT_IncomingMessage im;
-
- /**
- * Number of hops the message has travelled (if DV-routed).
- * FIXME: make use of this in ACK handling!
- */
- uint16_t total_hops;
-};
-
-
/**
* Given an inbound message @a msg from a communicator @a cmc,
* demultiplex it based on the type calling the right handler.
const struct GNUNET_MessageHeader *msg);
-/**
- * Send ACK to communicator (if requested) and free @a cmc.
- *
- * @param cmc context for which we are done handling the message
- */
-static void
-finish_cmc_handling (struct CommunicatorMessageContext *cmc)
-{
- if (0 != ntohl (cmc->im.fc_on))
- {
- /* send ACK when done to communicator for flow control! */
- struct GNUNET_MQ_Envelope *env;
- struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
-
- env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
- ack->reserved = htonl (0);
- ack->fc_id = cmc->im.fc_id;
- ack->sender = cmc->im.sender;
- GNUNET_MQ_send (cmc->tc->mq, env);
- }
- GNUNET_SERVICE_client_continue (cmc->tc->client);
- GNUNET_free (cmc);
-}
-
-
/**
* Communicator gave us an unencapsulated message to pass as-is to
* CORE. Process the request.
handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
{
struct CommunicatorMessageContext *cmc = cls;
+ struct VirtualLink *vl;
uint16_t size = ntohs (mh->size);
if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
GNUNET_SERVICE_client_drop (client);
return;
}
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
+ if (NULL == vl)
+ {
+ /* FIXME: sender is giving us messages for CORE but we don't have
+ the link up yet! I *suspect* this can happen right now (i.e.
+ sender has verified us, but we didn't verify sender), but if
+ we pass this on, CORE would be confused (link down, messages
+ arrive). We should investigate more if this happens often,
+ or in a persistent manner, and possibly do "something" about
+ it. Thus logging as error for now. */
+ GNUNET_break_op (0);
+ GNUNET_STATISTICS_update (GST_stats,
+ "# CORE messages droped (virtual link still down)",
+ 1,
+ GNUNET_NO);
+
+ finish_cmc_handling (cmc);
+ return;
+ }
/* Forward to all CORE clients */
for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
{
memcpy (&im[1], mh, size);
GNUNET_MQ_send (tc->mq, env);
}
- /* FIXME: consider doing this _only_ once the message
- was drained from the CORE MQs to extend flow control to CORE!
- (basically, increment counter in cmc, decrement on MQ send continuation!
- */
- finish_cmc_handling (cmc);
+ vl->core_recv_window--;
+ if (vl->core_recv_window > 0)
+ {
+ finish_cmc_handling (cmc);
+ return;
+ }
+ /* Wait with calling #finish_cmc_handling(cmc) until the message
+ was processed by CORE MQs (for CORE flow control)! */
+ GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc);
}
}
ack_counter = htonl (ra->ack_counter);
- // FIXME: track ACK losses based on ack_counter somewhere!
+ (void) ack_counter; /* silence compiler warning for now */
+ // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
// (DV and/or Neighbour?)
finish_cmc_handling (cmc);
}
GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
}
-/**
- * Task run to check whether the hops of the @a cls still
- * are validated, or if we need to core about disconnection.
- *
- * @param cls a `struct DistanceVector` (with core_visible set!)
- */
-static void
-check_dv_path_down (void *cls)
-{
- struct DistanceVector *dv = cls;
- struct Neighbour *n;
-
- dv->visibility_task = NULL;
- GNUNET_assert (GNUNET_YES == dv->core_visible);
- for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
- pos = pos->next_dv)
- {
- if (0 <
- GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us)
- {
- dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until,
- &check_dv_path_down,
- dv);
- return;
- }
- }
- /* all paths invalid, make dv core-invisible */
- dv->core_visible = GNUNET_NO;
- n = lookup_neighbour (&dv->target);
- if ((NULL != n) && (GNUNET_YES == n->core_visible))
- return; /* no need to tell core, connection still up! */
- cores_send_disconnect_info (&dv->target);
-}
-
/**
* The @a hop is a validated path to the respective target
activate_core_visible_dv_path (struct DistanceVectorHop *hop)
{
struct DistanceVector *dv = hop->dv;
- struct Neighbour *n;
-
- GNUNET_assert (GNUNET_NO == dv->core_visible);
- GNUNET_assert (NULL == dv->visibility_task);
+ struct VirtualLink *vl;
- dv->core_visible = GNUNET_YES;
- dv->visibility_task =
- GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv);
- n = lookup_neighbour (&dv->target);
- if ((NULL != n) && (GNUNET_YES == n->core_visible))
- return; /* no need to tell core, connection already up! */
- cores_send_connect_info (&dv->target,
- (NULL != n)
- ? GNUNET_BANDWIDTH_value_sum (n->quota_out,
- dv->quota_out)
- : dv->quota_out);
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
+ if (NULL != vl)
+ {
+ /* Link was already up, remember dv is also now available and we are done */
+ vl->dv = dv;
+ return;
+ }
+ vl = GNUNET_new (struct VirtualLink);
+ vl->target = dv->target;
+ vl->dv = dv;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ /* We lacked a confirmed connection to the target
+ before, so tell CORE about it (finally!) */
+ cores_send_connect_info (&dv->target);
}
GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
- if ((GNUNET_NO == dv->core_visible) &&
- (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until)
- .rel_value_us))
+ if (0 <
+ GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
activate_core_visible_dv_path (pos);
if (last_timeout.rel_value_us <
GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
next_hop->dv_head,
next_hop->dv_tail,
hop);
- if ((GNUNET_NO == dv->core_visible) &&
- (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us))
+ if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
activate_core_visible_dv_path (hop);
return GNUNET_YES;
}
n);
}
}
- // FIXME: asynchronously (!) verify hop-by-hop signatures!
- // => if signature verification load too high, implement random drop
- // strategy!?
+ /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
+ If signature verification load too high, implement random drop strategy */
+ for (unsigned int i = 0; i < nhops; i++)
+ {
+ struct DvHopPS dhp = {.purpose.purpose =
+ htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP),
+ .purpose.size = htonl (sizeof (dhp)),
+ .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
+ .succ = (nhops - 1 == i) ? GST_my_identity
+ : hops[i + 1].hop,
+ .challenge = dvl->challenge};
+
+ if (GNUNET_OK !=
+ GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DV_HOP,
+ &dhp.purpose,
+ &hops[i].hop_sig,
+ &hops[i].hop.public_key))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ }
do_fwd = GNUNET_YES;
if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
}
-/**
- * Task run periodically to check whether the validity of the given queue has
- * run its course. If so, finds either another queue to take over, or clears
- * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
- * chance to take over, and if that fails, notifies CORE about the disconnect.
- *
- * @param cls a `struct Queue`
- */
-static void
-core_queue_visibility_check (void *cls)
-{
- struct Queue *q = cls;
-
- q->visibility_task = NULL;
- if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
- {
- q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
- &core_queue_visibility_check,
- q);
- return;
- }
- update_neighbour_core_visibility (q->neighbour);
-}
-
-
-/**
- * Check whether the CORE visibility of @a n should change. Finds either a
- * queue to preserve the visibility, or clears the neighbour's `core_visible`
- * flag. In the latter case, gives DV routes a chance to take over, and if
- * that fails, notifies CORE about the disconnect. If so, check whether we
- * need to notify CORE.
- *
- * @param n neighbour to perform the check for
- */
-static void
-update_neighbour_core_visibility (struct Neighbour *n)
-{
- struct DistanceVector *dv;
-
- GNUNET_assert (GNUNET_YES == n->core_visible);
- /* Check if _any_ queue of this neighbour is still valid, if so, schedule
- the #core_queue_visibility_check() task for that queue */
- for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
- {
- if (0 !=
- GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
- {
- /* found a valid queue, use this one */
- q->visibility_task =
- GNUNET_SCHEDULER_add_at (q->validated_until,
- &core_queue_visibility_check,
- q);
- return;
- }
- }
- n->core_visible = GNUNET_NO;
-
- /* Check if _any_ DV route to this neighbour is currently
- valid, if so, do NOT tell core about the loss of direct
- connectivity (DV still counts!) */
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
- if (GNUNET_YES == dv->core_visible)
- return;
- /* Nothing works anymore, need to tell CORE about the loss of
- connectivity! */
- cores_send_disconnect_info (&n->pid);
-}
-
-
/**
* Communicator gave us a transport address validation response. Process the
* request.
.vs = NULL};
struct GNUNET_TIME_Absolute origin_time;
struct Queue *q;
- struct DistanceVector *dv;
struct Neighbour *n;
+ struct VirtualLink *vl;
/* check this is one of our challenges */
(void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
q->validated_until = vs->validated_until;
q->pd.aged_rtt = vs->validation_rtt;
n = q->neighbour;
- if (GNUNET_NO != n->core_visible)
- return; /* nothing changed, we are done here */
- n->core_visible = GNUNET_YES;
- q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
- &core_queue_visibility_check,
- q);
- /* Check if _any_ DV route to this neighbour is
- currently valid, if so, do NOT tell core anything! */
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
- if ((NULL != dv) && (GNUNET_YES == dv->core_visible))
- return; /* nothing changed, done */
- /* We lacked a confirmed connection to the neighbour
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
+ if (NULL != vl)
+ {
+ /* Link was already up, remember n is also now available and we are done */
+ vl->n = n;
+ return;
+ }
+ vl = GNUNET_new (struct VirtualLink);
+ vl->target = n->pid;
+ vl->n = n;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ /* We lacked a confirmed connection to the target
before, so tell CORE about it (finally!) */
- cores_send_connect_info (&n->pid,
- (NULL != dv)
- ? GNUNET_BANDWIDTH_value_sum (dv->quota_out,
- n->quota_out)
- : n->quota_out);
+ cores_send_connect_info (&n->pid);
}
{
/* failed hard */
GNUNET_break (0);
- client_send_response (pm, GNUNET_NO, 0);
+ client_send_response (pm);
return NULL;
}
pa = prepare_pending_acknowledgement (queue, dvh, pm);
(GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
{
/* Full message sent, and over reliabile channel */
- client_send_response (pm, GNUNET_YES, pm->bytes_msg);
+ client_send_response (pm);
}
else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
queue->tc->details.communicator.cc) &&
/* Was this the last applicable fragmment? */
if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
- client_send_response (
- pm,
- GNUNET_YES,
- pm->bytes_msg /* FIXME: calculate and add overheads! */);
+ client_send_response (pm);
}
else if (PMT_CORE != pm->pmt)
{
if (NULL == neighbour)
{
neighbour = GNUNET_new (struct Neighbour);
- neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
neighbour->pid = aqm->receiver;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multipeermap_put (
NULL);
GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
pending_acks = NULL;
+ GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours));
GNUNET_CONTAINER_multipeermap_destroy (neighbours);
neighbours = NULL;
+ GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links));
+ GNUNET_CONTAINER_multipeermap_destroy (links);
+ links = NULL;
GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
&free_backtalker_cb,
NULL);
pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
+ links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);
dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);
ephemeral_heap =
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
struct OutboundMessage,
NULL),
+ GNUNET_MQ_hd_fixed_size (client_recv_ok,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK,
+ struct RecvOkMessage,
+ NULL),
/* communication with communicators */
GNUNET_MQ_hd_var_size (communicator_available,
GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,