#include "platform.h"
#include "testbed_api_operations.h"
+#include "testbed_api_sd.h"
/**
};
+/**
+ * Queue of operations where we can only support a certain
+ * number of concurrent operations of a particular type.
+ */
+struct OperationQueue;
+
+
+/**
+ * A slot to record time taken by an operation
+ */
+struct TimeSlot
+{
+ /**
+ * DLL next pointer
+ */
+ struct TimeSlot *next;
+
+ /**
+ * DLL prev pointer
+ */
+ struct TimeSlot *prev;
+
+ /**
+ * This operation queue to which this time slot belongs to
+ */
+ struct OperationQueue *queue;
+
+ /**
+ * The operation to which this timeslot is currently allocated to
+ */
+ struct GNUNET_TESTBED_Operation *op;
+
+ /**
+ * Accumulated time
+ */
+ struct GNUNET_TIME_Relative tsum;
+
+ /**
+ * Number of timing values accumulated
+ */
+ unsigned int nvals;
+};
+
+
+/**
+ * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
+ */
+struct FeedbackCtx
+{
+ /**
+ * Handle for calculating standard deviation
+ */
+ struct SDHandle *sd;
+
+ /**
+ * Head for DLL of time slots which are free to be allocated to operations
+ */
+ struct TimeSlot *alloc_head;
+
+ /**
+ * Tail for DLL of time slots which are free to be allocated to operations
+ */
+ struct TimeSlot *alloc_tail;
+
+ /**
+ * Pointer to the chunk of time slots. Free all time slots at a time using
+ * this pointer.
+ */
+ struct TimeSlot *tslots_freeptr;
+
+ /**
+ * Number of time slots filled so far
+ */
+ unsigned int tslots_filled;
+
+ /**
+ * Bound on the maximum number of operations which can be active
+ */
+ unsigned int max_active_bound;
+
+ /**
+ * Number of operations that have failed
+ */
+ unsigned int nfailed;
+
+};
+
+
/**
* Queue of operations where we can only support a certain
* number of concurrent operations of a particular type.
*/
struct QueueEntry *nq_tail;
+ /**
+ * Feedback context; only relevant for adaptive operation queues. NULL for
+ * fixed operation queues
+ */
+ struct FeedbackCtx *fctx;
+
+ /**
+ * The type of this opeartion queue
+ */
+ enum OperationQueueType type;
+
/**
* Number of operations that are currently active in this queue.
*/
unsigned int active;
/**
- * Max number of operations which can be active at any time in this queue
+ * Max number of operations which can be active at any time in this queue.
+ * This value can be changed either by calling
+ * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
+ * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
*/
unsigned int max_active;
* next ptr for DLL
*/
struct ReadyQueueEntry *next;
-
+
/**
* prev ptr for DLL
*/
*/
struct ReadyQueueEntry *rq_entry;
+ /**
+ * Head pointer for DLL of tslots allocated to this operation
+ */
+ struct TimeSlot *tslots_head;
+
+ /**
+ * Tail pointer for DLL of tslots allocated to this operation
+ */
+ struct TimeSlot *tslots_tail;
+
+ /**
+ * The time at which the operation is started
+ */
+ struct GNUNET_TIME_Absolute tstart;
+
/**
* Number of queues in the operation queues array
*/
*/
enum OperationState state;
+ /**
+ * Is this a failed operation?
+ */
+ int failed;
+
};
/**
GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
+/**
+ * Assigns the given operation a time slot from the given operation queue
+ *
+ * @param op the operation
+ * @param queue the operation queue
+ * @return the timeslot
+ */
+static void
+assign_timeslot (struct GNUNET_TESTBED_Operation *op,
+ struct OperationQueue *queue)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+
+ GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
+ tslot = fctx->alloc_head;
+ GNUNET_assert (NULL != tslot);
+ GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
+ GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
+ tslot->op = op;
+}
+
+
/**
* Removes a queue entry of an operation from one of the operation queues' lists
* depending on the state of the operation
{
struct OperationQueue *opq;
struct QueueEntry *entry;
-
+
opq = op->queues[index];
entry = op->qentries[index];
switch (op->state)
struct OperationQueue *opq;
unsigned int cnt;
unsigned int s;
-
+
GNUNET_assert (OP_STATE_INIT != state);
GNUNET_assert (NULL != op->queues);
GNUNET_assert (NULL != op->nres);
entry->op = op;
entry->nres = op->nres[cnt];
s = cnt;
- GNUNET_array_append (op->qentries, s, entry);
+ GNUNET_array_append (op->qentries, s, entry);
}
else
{
*/
static void
rq_remove (struct GNUNET_TESTBED_Operation *op)
-{
+{
GNUNET_assert (NULL != op->rq_entry);
GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
GNUNET_free (op->rq_entry);
process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_TESTBED_Operation *op;
+ struct OperationQueue *queue;
+ unsigned int cnt;
process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (NULL != rq_head);
if (NULL != rq_head)
process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
change_state (op, OP_STATE_ACTIVE);
+ for (cnt = 0; cnt < op->nqueues; cnt++)
+ {
+ queue = op->queues[cnt];
+ if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
+ assign_timeslot (op, queue);
+ }
+ op->tstart = GNUNET_TIME_absolute_get ();
if (NULL != op->start)
- op->start (op->cb_cls);
+ op->start (op->cb_cls);
}
rval = GNUNET_YES;
if (opq->active > opq->max_active)
{
- need += opq->active - opq->max_active;
rval = GNUNET_NO;
goto ret;
}
}
ret:
- GNUNET_free_non_null (evict_entries);
- if (NULL != ops_) *ops_ = ops;
- if (NULL != n_ops_) *n_ops_ = n_ops;
+ GNUNET_free_non_null (evict_entries);
+ if (NULL != ops_)
+ *ops_ = ops;
+ else
+ GNUNET_free (ops);
+ if (NULL != n_ops_)
+ *n_ops_ = n_ops;
return rval;
}
unsigned int i;
unsigned int j;
unsigned int n_cur;
-
+
GNUNET_assert (NULL != old);
n_cur = *n_old;
cur = *old;
for (i = 0; i < n_new; i++)
- {
+ {
for (j = 0; j < *n_old; j++)
{
if (new[i] == cur[j])
*old = cur;
*n_old = n_cur;
}
-
+
/**
* Checks for the readiness of an operation and schedules a operation start task
*
* @param op the operation
- * @param GNUNET_NO if the given operation cannot be made active; GNUNET_YES if
- * it can be activated (has enough resources) and is kept in ready
- * queue
*/
static int
check_readiness (struct GNUNET_TESTBED_Operation *op)
if (NULL == ops)
continue;
merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
- GNUNET_free (ops);
+ GNUNET_free (ops);
}
if (NULL != evict_ops)
{
GNUNET_assert (OP_STATE_READY == op->state);
rq_remove (op);
for (i = 0; i < op->nqueues; i++)
- op->queues[i]->active--;
+ {
+ GNUNET_assert (op->queues[i]->active >= op->nres[i]);
+ op->queues[i]->active -= op->nres[i];
+ }
change_state (op, OP_STATE_WAITING);
}
+/**
+ * Cleanups the array of timeslots of an operation queue. For each time slot in
+ * the array, if it is allocated to an operation, it will be deallocated from
+ * the operation
+ *
+ * @param queue the operation queue
+ */
+static void
+cleanup_tslots (struct OperationQueue *queue)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+ struct GNUNET_TESTBED_Operation *op;
+ unsigned int cnt;
+
+ GNUNET_assert (NULL != fctx);
+ for (cnt = 0; cnt < queue->max_active; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ op = tslot->op;
+ if (NULL == op)
+ continue;
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ }
+ GNUNET_free_non_null (fctx->tslots_freeptr);
+ fctx->tslots_freeptr = NULL;
+ fctx->alloc_head = NULL;
+ fctx->alloc_tail = NULL;
+ fctx->tslots_filled = 0;
+}
+
+
+/**
+ * Cleansup the existing timing slots and sets new timing slots in the given
+ * queue to accommodate given number of max active operations.
+ *
+ * @param queue the queue
+ * @param n the number of maximum active operations. If n is greater than the
+ * maximum limit set while creating the queue, then the minimum of these two
+ * will be selected as n
+ */
+static void
+adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+ unsigned int cnt;
+
+ cleanup_tslots (queue);
+ n = GNUNET_MIN (n ,fctx->max_active_bound);
+ fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
+ fctx->nfailed = 0;
+ for (cnt = 0; cnt < n; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ tslot->queue = queue;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot);
+ }
+ GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
+}
+
+
+/**
+ * Adapts parallelism in an adaptive queue by using the statistical data from
+ * the feedback context.
+ *
+ * @param queue the queue
+ */
+static void
+adapt_parallelism (struct OperationQueue *queue)
+{
+ struct GNUNET_TIME_Relative avg;
+ struct FeedbackCtx *fctx;
+ struct TimeSlot *tslot;
+ int sd;
+ unsigned int nvals;
+ unsigned int cnt;
+
+ avg = GNUNET_TIME_UNIT_ZERO;
+ nvals = 0;
+ fctx = queue->fctx;
+ for (cnt = 0; cnt < queue->max_active; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
+ nvals += tslot->nvals;
+ }
+ GNUNET_assert (nvals >= queue->max_active);
+ GNUNET_assert (fctx->nfailed <= nvals);
+ nvals -= fctx->nfailed;
+ if (0 == nvals)
+ {
+ if (1 == queue->max_active)
+ adaptive_queue_set_max_active (queue, 1);
+ else
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
+ return;
+ }
+ avg = GNUNET_TIME_relative_divide (avg, nvals);
+ if (GNUNET_SYSERR ==
+ GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
+ (unsigned int) avg.rel_value_us,
+ &sd))
+ {
+ GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
+ return;
+ }
+ if (sd < 0)
+ sd = 0;
+ GNUNET_assert (0 <= sd);
+ GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ if (0 == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active * 2);
+ return;
+ }
+ if (1 == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active + 1);
+ return;
+ }
+ if (1 == queue->max_active)
+ {
+ adaptive_queue_set_max_active (queue, 1);
+ return;
+ }
+ if (2 == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active - 1);
+ return;
+ }
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
+}
+
+
+/**
+ * update tslots with the operation's completion time. Additionally, if
+ * updating a timeslot makes all timeslots filled in an adaptive operation
+ * queue, call adapt_parallelism() for that queue.
+ *
+ * @param op the operation
+ */
+static void
+update_tslots (struct GNUNET_TESTBED_Operation *op)
+{
+ struct OperationQueue *queue;
+ struct GNUNET_TIME_Relative t;
+ struct TimeSlot *tslot;
+ struct FeedbackCtx *fctx;
+
+ t = GNUNET_TIME_absolute_get_duration (op->tstart);
+ while (NULL != (tslot = op->tslots_head)) /* update time slots */
+ {
+ queue = tslot->queue;
+ fctx = queue->fctx;
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ tslot->op = NULL;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
+ tslot);
+ if (op->failed)
+ fctx->nfailed++;
+ tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
+ if (0 != tslot->nvals++)
+ continue;
+ fctx->tslots_filled++;
+ if (queue->max_active == fctx->tslots_filled)
+ adapt_parallelism (queue);
+ }
+}
+
+
/**
* Create an 'operation' to be performed.
*
/**
* Create an operation queue.
*
+ * @param type the type of operation queue
* @param max_active maximum number of operations in this
* queue that can be active in parallel at the same time
* @return handle to the queue
*/
struct OperationQueue *
-GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
+GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
+ unsigned int max_active)
{
struct OperationQueue *queue;
+ struct FeedbackCtx *fctx;
queue = GNUNET_malloc (sizeof (struct OperationQueue));
- queue->max_active = max_active;
+ queue->type = type;
+ if (OPERATION_QUEUE_TYPE_FIXED == type)
+ {
+ queue->max_active = max_active;
+ }
+ else
+ {
+ fctx = GNUNET_malloc (sizeof (struct FeedbackCtx));
+ fctx->max_active_bound = max_active;
+ fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
+ queue->fctx = fctx;
+ adaptive_queue_set_max_active (queue, 4); /* start with 4 */
+ }
return queue;
}
void
GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
{
+ struct FeedbackCtx *fctx;
+
GNUNET_break (GNUNET_YES == is_queue_empty (queue));
+ if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
+ {
+ cleanup_tslots (queue);
+ fctx = queue->fctx;
+ GNUNET_TESTBED_SD_destroy_ (fctx->sd);
+ GNUNET_free (fctx);
+ }
GNUNET_free (queue);
}
void
GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
{
- struct QueueEntry *entry;
+ struct QueueEntry *entry;
struct OperationQueue *opq;
unsigned int i;
rq_remove (op);
if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
GNUNET_TESTBED_operation_activate_ (op);
+ if (OP_STATE_ACTIVE == op->state)
+ update_tslots (op);
GNUNET_assert (NULL != op->queues);
GNUNET_assert (NULL != op->qentries);
for (i = 0; i < op->nqueues; i++)
remove_queue_entry (op, i);
opq = op->queues[i];
switch (op->state)
- {
+ {
case OP_STATE_INIT:
case OP_STATE_INACTIVE:
GNUNET_assert (0);
break;
- case OP_STATE_WAITING:
+ case OP_STATE_WAITING:
break;
- case OP_STATE_READY:
case OP_STATE_ACTIVE:
+ case OP_STATE_READY:
GNUNET_assert (0 != opq->active);
GNUNET_assert (opq->active >= entry->nres);
opq->active -= entry->nres;
recheck_waiting (opq);
break;
- }
+ }
GNUNET_free (entry);
}
GNUNET_free_non_null (op->qentries);
}
+/**
+ * Marks an operation as failed
+ *
+ * @param op the operation to be marked as failed
+ */
+void
+GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
+{
+ op->failed = GNUNET_YES;
+}
+
+
/* end of testbed_api_operations.c */