* Head for DLL of time slots which are free to be allocated to operations
*/
struct TimeSlot *alloc_head;
-
+
/**
* Tail for DLL of time slots which are free to be allocated to operations
*/
* Number of time slots filled so far
*/
unsigned int tslots_filled;
-
+
/**
* Bound on the maximum number of operations which can be active
*/
unsigned int max_active_bound;
+ /**
+ * Number of operations that have failed
+ */
+ unsigned int nfailed;
+
};
* next ptr for DLL
*/
struct ReadyQueueEntry *next;
-
+
/**
* prev ptr for DLL
*/
{
struct OperationQueue *opq;
struct QueueEntry *entry;
-
+
opq = op->queues[index];
entry = op->qentries[index];
switch (op->state)
struct OperationQueue *opq;
unsigned int cnt;
unsigned int s;
-
+
GNUNET_assert (OP_STATE_INIT != state);
GNUNET_assert (NULL != op->queues);
GNUNET_assert (NULL != op->nres);
entry->op = op;
entry->nres = op->nres[cnt];
s = cnt;
- GNUNET_array_append (op->qentries, s, entry);
+ GNUNET_array_append (op->qentries, s, entry);
}
else
{
*/
static void
rq_remove (struct GNUNET_TESTBED_Operation *op)
-{
+{
GNUNET_assert (NULL != op->rq_entry);
GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
GNUNET_free (op->rq_entry);
}
ret:
- GNUNET_free_non_null (evict_entries);
+ GNUNET_free_non_null (evict_entries);
if (NULL != ops_)
*ops_ = ops;
else
unsigned int i;
unsigned int j;
unsigned int n_cur;
-
+
GNUNET_assert (NULL != old);
n_cur = *n_old;
cur = *old;
for (i = 0; i < n_new; i++)
- {
+ {
for (j = 0; j < *n_old; j++)
{
if (new[i] == cur[j])
*old = cur;
*n_old = n_cur;
}
-
+
/**
if (NULL == ops)
continue;
merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
- GNUNET_free (ops);
+ GNUNET_free (ops);
}
if (NULL != evict_ops)
{
for (i = 0; i < n_evict_ops; i++)
- GNUNET_TESTBED_operation_release_ (evict_ops[i]);
+ GNUNET_TESTBED_operation_release_ (evict_ops[i]);
GNUNET_free (evict_ops);
evict_ops = NULL;
/* Evicting the operations should schedule this operation */
for (i = 0; i < op->nqueues; i++)
{
GNUNET_assert (op->queues[i]->active >= op->nres[i]);
- op->queues[i]->active -= op->nres[i];
+ op->queues[i]->active -= op->nres[i];
}
change_state (op, OP_STATE_WAITING);
}
/**
- * Initializes the operation queue for parallel overlay connects
+ * Cleansup the existing timing slots and sets new timing slots in the given
+ * queue to accommodate given number of max active operations.
*
- * @param h the host handle
- * @param npoc the number of parallel overlay connects - the queue size
+ * @param queue the queue
+ * @param n the number of maximum active operations. If n is greater than the
+ * maximum limit set while creating the queue, then the minimum of these two
+ * will be selected as n
*/
static void
adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
struct FeedbackCtx *fctx = queue->fctx;
struct TimeSlot *tslot;
unsigned int cnt;
-
+
cleanup_tslots (queue);
n = GNUNET_MIN (n ,fctx->max_active_bound);
fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
+ fctx->nfailed = 0;
for (cnt = 0; cnt < n; cnt++)
{
tslot = &fctx->tslots_freeptr[cnt];
* the feedback context.
*
* @param queue the queue
- * @param fail GNUNET_YES if the last operation failed; GNUNET_NO if not;
*/
static void
-adapt_parallelism (struct OperationQueue *queue, int fail)
+adapt_parallelism (struct OperationQueue *queue)
{
struct GNUNET_TIME_Relative avg;
struct FeedbackCtx *fctx;
nvals += tslot->nvals;
}
GNUNET_assert (nvals >= queue->max_active);
+ GNUNET_assert (fctx->nfailed <= nvals);
+ nvals -= fctx->nfailed;
+ if (0 == nvals)
+ {
+ if (1 == queue->max_active)
+ adaptive_queue_set_max_active (queue, 1);
+ else
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
+ return;
+ }
avg = GNUNET_TIME_relative_divide (avg, nvals);
- sd = GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, (unsigned int)
- avg.rel_value_us);
- if ( (sd <= 5) ||
- (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- queue->max_active)) )
- GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
- if (GNUNET_SYSERR == sd)
+ if (GNUNET_SYSERR ==
+ GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
+ (unsigned int) avg.rel_value_us,
+ &sd))
{
+ GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
return;
}
+ if (sd < 0)
+ sd = 0;
GNUNET_assert (0 <= sd);
- if ((0 == sd) && (! fail))
+ GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ if (0 == sd)
{
adaptive_queue_set_max_active (queue, queue->max_active * 2);
return;
}
- if ((1 == sd) && (! fail))
+ if (1 == sd)
{
adaptive_queue_set_max_active (queue, queue->max_active + 1);
return;
adaptive_queue_set_max_active (queue, 1);
return;
}
- if (((sd < 2) && (fail)) || (2 == sd))
+ if (2 == sd)
{
adaptive_queue_set_max_active (queue, queue->max_active - 1);
return;
struct GNUNET_TIME_Relative t;
struct TimeSlot *tslot;
struct FeedbackCtx *fctx;
-
+
t = GNUNET_TIME_absolute_get_duration (op->tstart);
while (NULL != (tslot = op->tslots_head)) /* update time slots */
{
queue = tslot->queue;
fctx = queue->fctx;
- tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
- tslot->op = NULL;
+ tslot->op = NULL;
GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
- tslot);
+ tslot);
+ if (op->failed)
+ fctx->nfailed++;
+ tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
if (0 != tslot->nvals++)
continue;
fctx->tslots_filled++;
if (queue->max_active == fctx->tslots_filled)
- adapt_parallelism (queue, op->failed);
+ adapt_parallelism (queue);
}
}
fctx->max_active_bound = max_active;
fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
queue->fctx = fctx;
- adaptive_queue_set_max_active (queue, 1); /* start with 1 */
+ adaptive_queue_set_max_active (queue, 4); /* start with 4 */
}
return queue;
}
GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
{
struct FeedbackCtx *fctx;
-
+
GNUNET_break (GNUNET_YES == is_queue_empty (queue));
if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
{
void
GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
{
- struct QueueEntry *entry;
+ struct QueueEntry *entry;
struct OperationQueue *opq;
unsigned int i;
if (OP_STATE_ACTIVE == op->state)
update_tslots (op);
GNUNET_assert (NULL != op->queues);
- GNUNET_assert (NULL != op->qentries);
+ GNUNET_assert (NULL != op->qentries);
for (i = 0; i < op->nqueues; i++)
{
entry = op->qentries[i];
remove_queue_entry (op, i);
opq = op->queues[i];
switch (op->state)
- {
+ {
case OP_STATE_INIT:
case OP_STATE_INACTIVE:
GNUNET_assert (0);
break;
- case OP_STATE_WAITING:
+ case OP_STATE_WAITING:
break;
case OP_STATE_ACTIVE:
case OP_STATE_READY:
opq->active -= entry->nres;
recheck_waiting (opq);
break;
- }
+ }
GNUNET_free (entry);
}
GNUNET_free_non_null (op->qentries);