* communicators do not offer flow control).
* We do transmit FC window sizes now. Left:
* for SENDING)
- * - Throttle sending if "outbound_fc_window_size_used" reaches limit
- * - Send *new* challenge when we get close to the limit (including
- * at the beginning when the limit is zero!)
- * - Retransmit challenge if it goes unanswered!
+ * - need to call consider_sending_fc() periodically if it goes unanswered!
*
* for DV)
* - send challenges via DV (when DVH is confirmed *and* we care about
* and high-latency links *if* we have the RAM [GOODPUT / utilization / stalls]
* - Set last_window_consum_limit promise properly based on
* latency and bandwidth of the respective connection [GOODPUT / utilization / stalls]
- * - re-sending challenge response without a challenge when we have
- * significantly increased the FC window (upon CORE being done with messages)
- * so as to avoid the sender having to give us a fresh challenge [BANDWIDTH]
- * Also can re-use signature in this case [CPU]. Marked with "TODO-M1"
*
* Design realizations / discussion:
* - communicators do flow control by calling MQ "notify sent"
*/
struct GNUNET_TIME_Absolute n_challenge_time;
+ /**
+ * When did we last send a
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message?
+ * Used to determine whether it is time to re-transmit the message.
+ */
+ struct GNUNET_TIME_Absolute last_fc_transmission;
+
/**
* Sender timestamp of the last
* #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message we have
/**
* Our current flow control window size in bytes. We
- * are allowed to transmit this many bytes to @a n as per
- * our @e my_challenge "account".
+ * are allowed to transmit this many bytes to @a n.
*/
uint64_t outbound_fc_window_size;
}
-/**
- * There is a message at the head of the pending messages for @a vl
- * which may be ready for transmission. Check if a queue is ready to
- * take it.
- *
- * This function must (1) check for flow control to ensure that we can
- * right now send to @a vl, (2) check that the pending message in the
- * queue is actually eligible, (3) determine if any applicable queue
- * (direct neighbour or DVH path) is ready to accept messages, and
- * (4) prioritize based on the preferences associated with the
- * pending message.
- *
- * So yeah, easy.
- *
- * @param vl virtual link where we should check for transmission
- */
-static void
-check_vl_transmission (struct VirtualLink *vl)
-{
- struct Neighbour *n = vl->n;
- struct DistanceVector *dv = vl->dv;
- struct GNUNET_TIME_Absolute now;
- int elig;
-
- /* FIXME-FC: need to implement virtual link flow control! */
-
- /* Check that we have an eligible pending message!
- (cheaper than having #transmit_on_queue() find out!) */
- elig = GNUNET_NO;
- for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
- pm = pm->next_vl)
- {
- if (NULL != pm->qe)
- continue; /* not eligible, is in a queue! */
- elig = GNUNET_YES;
- break;
- }
- if (GNUNET_NO == elig)
- return;
-
- /* Notify queues at direct neighbours that we are interested */
- now = GNUNET_TIME_absolute_get ();
- if (NULL != n)
- {
- for (struct Queue *queue = n->queue_head; NULL != queue;
- queue = queue->next_neighbour)
- if ((GNUNET_YES == queue->idle) &&
- (queue->validated_until.abs_value_us > now.abs_value_us))
- schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
- }
- /* Notify queues via DV that we are interested */
- if (NULL != dv)
- {
- /* Do DV with lower scheduler priority, which effectively means that
- IF a neighbour exists and is available, we prefer it. */
- for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
- pos = pos->next_dv)
- {
- struct Neighbour *nh = pos->next_hop;
-
- if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
- continue; /* skip this one: path not validated */
- for (struct Queue *queue = nh->queue_head; NULL != queue;
- queue = queue->next_neighbour)
- if ((GNUNET_YES == queue->idle) &&
- (queue->validated_until.abs_value_us > now.abs_value_us))
- schedule_transmit_on_queue (queue,
- GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
- }
- }
-}
-
-
-/**
- * Client asked for transmission to a peer. Process the request.
- *
- * @param cls the client
- * @param obm the send message that was sent
- */
-static void
-handle_client_send (void *cls, const struct OutboundMessage *obm)
-{
- struct TransportClient *tc = cls;
- struct PendingMessage *pm;
- const struct GNUNET_MessageHeader *obmm;
- uint32_t bytes_msg;
- struct VirtualLink *vl;
- enum GNUNET_MQ_PriorityPreferences pp;
-
- GNUNET_assert (CT_CORE == tc->type);
- obmm = (const struct GNUNET_MessageHeader *) &obm[1];
- bytes_msg = ntohs (obmm->size);
- pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
- vl = lookup_virtual_link (&obm->peer);
- if (NULL == vl)
- {
- /* Failure: don't have this peer as a neighbour (anymore).
- Might have gone down asynchronously, so this is NOT
- a protocol violation by CORE. Still count the event,
- as this should be rare. */
- GNUNET_SERVICE_client_continue (tc->client);
- GNUNET_STATISTICS_update (GST_stats,
- "# messages dropped (neighbour unknown)",
- 1,
- GNUNET_NO);
- return;
- }
-
- pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
- pm->logging_uuid = logging_uuid_gen++;
- pm->prefs = pp;
- pm->client = tc;
- pm->vl = vl;
- pm->bytes_msg = bytes_msg;
- memcpy (&pm[1], obmm, bytes_msg);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending %u bytes as <%llu> to %s\n",
- bytes_msg,
- pm->logging_uuid,
- GNUNET_i2s (&obm->peer));
- GNUNET_CONTAINER_MDLL_insert (client,
- tc->details.core.pending_msg_head,
- tc->details.core.pending_msg_tail,
- pm);
- GNUNET_CONTAINER_MDLL_insert (vl,
- vl->pending_msg_head,
- vl->pending_msg_tail,
- pm);
- check_vl_transmission (vl);
-}
-
-
/**
* Communicator started. Test message is well-formed.
*
}
+/**
+ * Something changed on the virtual link with respect to flow
+ * control. Consider retransmitting the FC window size.
+ *
+ * @param vl virtual link to work with
+ */
+static void
+consider_sending_fc (struct VirtualLink *vl)
+{
+ struct GNUNET_TIME_Absolute monotime;
+ struct TransportFlowControlMessage fc;
+ struct GNUNET_TIME_Relative duration;
+
+ duration = GNUNET_TIME_absolute_get_duration (vl->last_fc_transmission);
+ /* FIXME: decide sane criteria on when to do this, instead of doing
+ it always! */
+ /* For example, we should probably ONLY do this if a bit more than
+ an RTT has passed, or if the window changed "significantly" since
+ then. */
+ (void) duration;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending FC seq %u to %s with new window %llu\n",
+ (unsigned int) vl->fc_seq_gen,
+ GNUNET_i2s (&vl->target),
+ (unsigned long long) vl->incoming_fc_window_size);
+ monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
+ vl->last_fc_transmission = monotime;
+ fc.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL);
+ fc.header.size = htons (sizeof (fc));
+ fc.seq = htonl (vl->fc_seq_gen++);
+ fc.inbound_window_size = GNUNET_htonll (vl->incoming_fc_window_size);
+ fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used);
+ fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size);
+ fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
+ route_control_message_without_fc (&vl->target, &fc.header, RMO_NONE);
+}
+
+
+/**
+ * There is a message at the head of the pending messages for @a vl
+ * which may be ready for transmission. Check if a queue is ready to
+ * take it.
+ *
+ * This function must (1) check for flow control to ensure that we can
+ * right now send to @a vl, (2) check that the pending message in the
+ * queue is actually eligible, (3) determine if any applicable queue
+ * (direct neighbour or DVH path) is ready to accept messages, and
+ * (4) prioritize based on the preferences associated with the
+ * pending message.
+ *
+ * So yeah, easy.
+ *
+ * @param vl virtual link where we should check for transmission
+ */
+static void
+check_vl_transmission (struct VirtualLink *vl)
+{
+ struct Neighbour *n = vl->n;
+ struct DistanceVector *dv = vl->dv;
+ struct GNUNET_TIME_Absolute now;
+ int elig;
+
+ /* Check that we have an eligible pending message!
+ (cheaper than having #transmit_on_queue() find out!) */
+ elig = GNUNET_NO;
+ for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
+ pm = pm->next_vl)
+ {
+ if (NULL != pm->qe)
+ continue; /* not eligible, is in a queue! */
+ if (pm->bytes_msg + vl->outbound_fc_window_size_used >
+ vl->outbound_fc_window_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Stalled transmision on VL %s due to flow control: %llu < %llu\n",
+ GNUNET_i2s (&vl->target),
+ (unsigned long long) vl->outbound_fc_window_size,
+ (unsigned long long) (pm->bytes_msg +
+ vl->outbound_fc_window_size_used));
+ consider_sending_fc (vl);
+ return; /* We have a message, but flow control says "nope" */
+ }
+ elig = GNUNET_YES;
+ break;
+ }
+ if (GNUNET_NO == elig)
+ return;
+
+ /* Notify queues at direct neighbours that we are interested */
+ now = GNUNET_TIME_absolute_get ();
+ if (NULL != n)
+ {
+ for (struct Queue *queue = n->queue_head; NULL != queue;
+ queue = queue->next_neighbour)
+ if ((GNUNET_YES == queue->idle) &&
+ (queue->validated_until.abs_value_us > now.abs_value_us))
+ schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+ }
+ /* Notify queues via DV that we are interested */
+ if (NULL != dv)
+ {
+ /* Do DV with lower scheduler priority, which effectively means that
+ IF a neighbour exists and is available, we prefer it. */
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ {
+ struct Neighbour *nh = pos->next_hop;
+
+ if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
+ continue; /* skip this one: path not validated */
+ for (struct Queue *queue = nh->queue_head; NULL != queue;
+ queue = queue->next_neighbour)
+ if ((GNUNET_YES == queue->idle) &&
+ (queue->validated_until.abs_value_us > now.abs_value_us))
+ schedule_transmit_on_queue (queue,
+ GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+ }
+ }
+}
+
+
+/**
+ * Client asked for transmission to a peer. Process the request.
+ *
+ * @param cls the client
+ * @param obm the send message that was sent
+ */
+static void
+handle_client_send (void *cls, const struct OutboundMessage *obm)
+{
+ struct TransportClient *tc = cls;
+ struct PendingMessage *pm;
+ const struct GNUNET_MessageHeader *obmm;
+ uint32_t bytes_msg;
+ struct VirtualLink *vl;
+ enum GNUNET_MQ_PriorityPreferences pp;
+
+ GNUNET_assert (CT_CORE == tc->type);
+ obmm = (const struct GNUNET_MessageHeader *) &obm[1];
+ bytes_msg = ntohs (obmm->size);
+ pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
+ vl = lookup_virtual_link (&obm->peer);
+ if (NULL == vl)
+ {
+ /* Failure: don't have this peer as a neighbour (anymore).
+ Might have gone down asynchronously, so this is NOT
+ a protocol violation by CORE. Still count the event,
+ as this should be rare. */
+ GNUNET_SERVICE_client_continue (tc->client);
+ GNUNET_STATISTICS_update (GST_stats,
+ "# messages dropped (neighbour unknown)",
+ 1,
+ GNUNET_NO);
+ return;
+ }
+
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
+ pm->logging_uuid = logging_uuid_gen++;
+ pm->prefs = pp;
+ pm->client = tc;
+ pm->vl = vl;
+ pm->bytes_msg = bytes_msg;
+ memcpy (&pm[1], obmm, bytes_msg);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending %u bytes as <%llu> to %s\n",
+ bytes_msg,
+ pm->logging_uuid,
+ GNUNET_i2s (&obm->peer));
+ GNUNET_CONTAINER_MDLL_insert (client,
+ tc->details.core.pending_msg_head,
+ tc->details.core.pending_msg_tail,
+ pm);
+ GNUNET_CONTAINER_MDLL_insert (vl,
+ vl->pending_msg_head,
+ vl->pending_msg_tail,
+ pm);
+ check_vl_transmission (vl);
+}
+
+
/**
* Communicator requests backchannel transmission. Process the request.
* Just repacks it into our `struct TransportBackchannelEncapsulationMessage *`
GNUNET_assert (vl->incoming_fc_window_size_ram >= ctx->size);
vl->incoming_fc_window_size_ram -= ctx->size;
vl->incoming_fc_window_size_used += ctx->isize;
- /* TODO-M1 */
+ consider_sending_fc (vl);
GNUNET_free (ctx);
}
&vl->target,
vl,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ consider_sending_fc (vl);
/* We lacked a confirmed connection to the target
before, so tell CORE about it (finally!) */
cores_send_connect_info (&dv->target);
&vl->target,
vl,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ consider_sending_fc (vl);
/* We lacked a confirmed connection to the target
before, so tell CORE about it (finally!) */
cores_send_connect_info (&n->pid);
}
+/**
+ * Communicator gave us a transport address validation response. Process the
+ * request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call
+ * #finish_cmc_handling() when done)
+ * @param fc the message that was received
+ */
+static void
+handle_flow_control (void *cls, const struct TransportFlowControlMessage *fc)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+ struct VirtualLink *vl;
+ uint32_t seq;
+ struct GNUNET_TIME_Absolute st;
+ uint64_t os;
+ uint64_t wnd;
+
+ vl = lookup_virtual_link (&cmc->im.sender);
+ if (NULL == vl)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# FC dropped: virtual link unknown",
+ 1,
+ GNUNET_NO);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ st = GNUNET_TIME_absolute_ntoh (fc->sender_time);
+ if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us)
+ {
+ /* out of order, drop */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# FC dropped: message out of order",
+ 1,
+ GNUNET_NO);
+ finish_cmc_handling (cmc);
+ return;
+ }
+ seq = ntohl (fc->seq);
+ if (seq < vl->last_fc_seq)
+ {
+ /* Wrap-around/reset of other peer; start all counters from zero */
+ vl->outbound_fc_window_size_used = 0;
+ }
+ vl->last_fc_seq = seq;
+ vl->last_fc_timestamp = st;
+ vl->outbound_fc_window_size = GNUNET_ntohll (fc->inbound_window_size);
+ os = GNUNET_ntohll (fc->outbound_sent);
+ vl->incoming_fc_window_size_loss =
+ (int64_t) (os - vl->incoming_fc_window_size_used);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received FC from %s, seq %u, new window %llu (loss at %lld)\n",
+ GNUNET_i2s (&vl->target),
+ (unsigned int) seq,
+ (unsigned long long) vl->outbound_fc_window_size,
+ (long long) vl->incoming_fc_window_size_loss);
+ wnd = GNUNET_ntohll (fc->outbound_window_size);
+ if (wnd < vl->incoming_fc_window_size)
+ {
+ /* Consider re-sending our FC message, as clearly the
+ other peer's idea of the window is not up-to-date */
+ consider_sending_fc (vl);
+ }
+ /* FC window likely increased, check transmission possibilities! */
+ check_vl_transmission (vl);
+ finish_cmc_handling (cmc);
+}
+
+
/**
* Given an inbound message @a msg from a communicator @a cmc,
* demultiplex it based on the type calling the right handler.
GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_CHALLENGE,
struct TransportValidationChallengeMessage,
&cmc),
+ GNUNET_MQ_hd_fixed_size (flow_control,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL,
+ struct TransportFlowControlMessage,
+ &cmc),
GNUNET_MQ_hd_fixed_size (
validation_response,
GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_VALIDATION_RESPONSE,