*/
struct Queue;
+/**
+ * Message awaiting transmission. See detailed comments below.
+ */
+struct PendingMessage;
/**
* Entry identifying transmission in one of our `struct
*/
struct Queue *queue;
+ /**
+ * Pending message this entry is for, or NULL for none.
+ */
+ struct PendingMessage *pm;
+
/**
* Message ID used for this message with the queue used for transmission.
*/
*/
struct Neighbour *target;
+ /**
+ * Set to non-NULL value if this message is currently being given to a
+ * communicator and we are awaiting that communicator's acknowledgement.
+ * Note that we must not retransmit a pending message while we're still
+ * in the process of giving it to a communicator. If a pending message
+ * is free'd while this entry is non-NULL, the @e qe reference to us
+ * should simply be set to NULL.
+ */
+ struct QueueEntry *qe;
+
/**
* Client that issued the transmission request, if @e pmt is #PMT_CORE.
*/
* be called if the message queue is non-empty!
*
* @param queue the queue to do scheduling for
+ * @param inside_job set to #GNUNET_YES if called from
+ * #transmit_on_queue() itself and NOT setting
+ * the task means running immediately
*/
static void
-schedule_transmit_on_queue (struct Queue *queue)
+schedule_transmit_on_queue (struct Queue *queue, int inside_job)
{
struct Neighbour *n = queue->neighbour;
struct PendingMessage *pm = n->pending_msg_head;
out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (
pm->next_attempt),
out_delay);
- if (0 == out_delay.rel_value_us)
+ if ((GNUNET_YES == inside_job) && (0 == out_delay.rel_value_us))
return; /* we should run immediately! */
/* queue has changed since we were scheduled, reschedule again */
queue->transmit_task =
GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
queue->queue_length--;
tc->details.communicator.total_queue_length--;
+ if (NULL != qe->pm)
+ {
+ GNUNET_assert (qe == qe->pm->qe);
+ qe->pm->qe = NULL;
+ }
GNUNET_free (qe);
}
GNUNET_assert (0 == queue->queue_length);
GNUNET_NO);
for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
s = s->next_client)
- schedule_transmit_on_queue (s);
+ schedule_transmit_on_queue (s, GNUNET_NO);
}
notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
target->pending_msg_tail,
pm);
free_fragment_tree (pm);
+ if (NULL != pm->qe)
+ {
+ GNUNET_assert (pm == pm->qe->pm);
+ pm->qe->pm = NULL;
+ }
GNUNET_free_non_null (pm->bpm);
GNUNET_free (pm);
}
qe = GNUNET_new (struct QueueEntry);
qe->mid = queue->mid_gen++;
qe->queue = queue;
- // qe->pm = pm; // FIXME: not so easy, reference management on 'free(s)'!
- // (also, note that pm may be NULL!)
+ if (NULL != pm)
+ {
+ qe->pm = pm;
+ GNUNET_assert (NULL == pm->qe);
+ pm->qe = qe;
+ }
GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
queue->queue_length++;
}
+/**
+ * Change the value of the `next_attempt` field of @a pm
+ * to @a next_attempt and re-order @a pm in the transmission
+ * list as required by the new timestmap.
+ *
+ * @param pm a pending message to update
+ * @param next_attempt timestamp to use
+ */
+static void
+update_pm_next_attempt (struct PendingMessage *pm,
+ struct GNUNET_TIME_Absolute next_attempt)
+{
+ struct Neighbour *neighbour = pm->target;
+
+ pm->next_attempt = next_attempt;
+ if (NULL == pm->frag_parent)
+ {
+ struct PendingMessage *pos;
+
+ /* re-insert sort in neighbour list */
+ GNUNET_CONTAINER_MDLL_remove (neighbour,
+ neighbour->pending_msg_head,
+ neighbour->pending_msg_tail,
+ pm);
+ pos = neighbour->pending_msg_tail;
+ while ((NULL != pos) &&
+ (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
+ pos = pos->prev_neighbour;
+ GNUNET_CONTAINER_MDLL_insert_after (neighbour,
+ neighbour->pending_msg_head,
+ neighbour->pending_msg_tail,
+ pos,
+ pm);
+ }
+ else
+ {
+ /* re-insert sort in fragment list */
+ struct PendingMessage *fp = pm->frag_parent;
+ struct PendingMessage *pos;
+
+ GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, pm);
+ pos = fp->tail_frag;
+ while ((NULL != pos) &&
+ (next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
+ pos = pos->prev_frag;
+ GNUNET_CONTAINER_MDLL_insert_after (frag,
+ fp->head_frag,
+ fp->tail_frag,
+ pos,
+ pm);
+ }
+}
+
+
/**
* We believe we are ready to transmit a message on a queue. Double-checks
* with the queue's "tracker_out" and then gives the message to the
/* no message pending, nothing to do here! */
return;
}
- schedule_transmit_on_queue (queue);
+ if (NULL != pm->qe)
+ {
+ /* message still pending with communciator!
+ LOGGING-FIXME: Use stats? logging? Should this not be rare? */
+ return;
+ }
+ schedule_transmit_on_queue (queue, GNUNET_YES);
if (NULL != queue->transmit_task)
return; /* do it later */
overhead = 0;
if (NULL == s)
{
/* Fragmentation failed, try next message... */
- schedule_transmit_on_queue (queue);
+ schedule_transmit_on_queue (queue, GNUNET_NO);
return;
}
if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
if (NULL == s)
{
/* Reliability boxing failed, try next message... */
- schedule_transmit_on_queue (queue);
+ schedule_transmit_on_queue (queue, GNUNET_NO);
return;
}
}
else
{
- /* message not finished, waiting for acknowledgement */
- struct Neighbour *neighbour = pm->target;
- /* Update time by which we might retransmit 's' based on queue
+ /* Message not finished, waiting for acknowledgement.
+ Update time by which we might retransmit 's' based on queue
characteristics (i.e. RTT); it takes one RTT for the message to
arrive and the ACK to come back in the best case; but the other
side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
retransmitting. Note that in the future this heuristic should
likely be improved further (measure RTT stability, consider
message urgency and size when delaying ACKs, etc.) */
- s->next_attempt = GNUNET_TIME_relative_to_absolute (
- GNUNET_TIME_relative_multiply (queue->rtt, 4));
- if (s == pm)
- {
- struct PendingMessage *pos;
-
- /* re-insert sort in neighbour list */
- GNUNET_CONTAINER_MDLL_remove (neighbour,
- neighbour->pending_msg_head,
- neighbour->pending_msg_tail,
- pm);
- pos = neighbour->pending_msg_tail;
- while ((NULL != pos) &&
- (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
- pos = pos->prev_neighbour;
- GNUNET_CONTAINER_MDLL_insert_after (neighbour,
- neighbour->pending_msg_head,
- neighbour->pending_msg_tail,
- pos,
- pm);
- }
- else
- {
- /* re-insert sort in fragment list */
- struct PendingMessage *fp = s->frag_parent;
- struct PendingMessage *pos;
-
- GNUNET_CONTAINER_MDLL_remove (frag, fp->head_frag, fp->tail_frag, s);
- pos = fp->tail_frag;
- while ((NULL != pos) &&
- (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us))
- pos = pos->prev_frag;
- GNUNET_CONTAINER_MDLL_insert_after (frag,
- fp->head_frag,
- fp->tail_frag,
- pos,
- s);
- }
+ update_pm_next_attempt (s,
+ GNUNET_TIME_relative_to_absolute (
+ GNUNET_TIME_relative_multiply (queue->rtt, 4)));
}
/* finally, re-schedule queue transmission task itself */
- schedule_transmit_on_queue (queue);
+ schedule_transmit_on_queue (queue, GNUNET_NO);
}
}
GNUNET_SCHEDULER_cancel (queue->transmit_task);
queue->transmit_task = NULL;
- schedule_transmit_on_queue (queue);
+ schedule_transmit_on_queue (queue, GNUNET_NO);
}
{
struct TransportClient *tc = cls;
struct QueueEntry *qe;
+ struct PendingMessage *pm;
if (CT_COMMUNICATOR != tc->type)
{
if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 ==
tc->details.communicator.total_queue_length)
{
- /* Communicator dropped below threshold, resume all queues */
+ /* Communicator dropped below threshold, resume all queues
+ incident with this client! */
GNUNET_STATISTICS_update (
GST_stats,
"# Transmission throttled due to communicator queue limit",
for (struct Queue *queue = tc->details.communicator.queue_head;
NULL != queue;
queue = queue->next_client)
- schedule_transmit_on_queue (queue);
+ schedule_transmit_on_queue (queue, GNUNET_NO);
}
else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
{
"# Transmission throttled due to queue queue limit",
-1,
GNUNET_NO);
- schedule_transmit_on_queue (qe->queue);
+ schedule_transmit_on_queue (qe->queue, GNUNET_NO);
}
- /* TODO: we also should react on the status! */
- // FIXME: this probably requires queue->pm = s assignment!
- // FIXME: react to communicator status about transmission request. We got:
- sma->status; // OK success, SYSERR failure
+ if (NULL != (pm = qe->pm))
+ {
+ struct Neighbour *n;
+ GNUNET_assert (qe == pm->qe);
+ pm->qe = NULL;
+ /* If waiting for this communicator may have blocked transmission
+ of pm on other queues for this neighbour, force schedule
+ transmit on queue for queues of the neighbour */
+ n = pm->target;
+ if (n->pending_msg_head == pm)
+ {
+ for (struct Queue *queue = n->queue_head; NULL != queue;
+ queue = queue->next_neighbour)
+ schedule_transmit_on_queue (queue, GNUNET_NO);
+ }
+ if (GNUNET_OK != ntohl (sma->status))
+ {
+ GNUNET_log (
+ GNUNET_ERROR_TYPE_INFO,
+ "Queue failed in transmission, will try retransmission immediately\n");
+ update_pm_next_attempt (pm, GNUNET_TIME_UNIT_ZERO_ABS);
+ }
+ }
GNUNET_free (qe);
}