/**
* An entry in the operation queue
*/
-struct QueueEntry {
+struct QueueEntry
+{
/**
* The next DLL pointer
*/
/**
* A slot to record time taken by an operation
*/
-struct TimeSlot {
+struct TimeSlot
+{
/**
* DLL next pointer
*/
/**
* Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
*/
-struct FeedbackCtx {
+struct FeedbackCtx
+{
/**
* Handle for calculating standard deviation
*/
* Queue of operations where we can only support a certain
* number of concurrent operations of a particular type.
*/
-struct OperationQueue {
+struct OperationQueue
+{
/**
* DLL head for the wait queue. Operations which are waiting for this
* operation queue are put here
/**
* Operation state
*/
-enum OperationState {
+enum OperationState
+{
/**
* The operation is just created and is in initial state
*/
/**
* An entry in the ready queue (implemented as DLL)
*/
-struct ReadyQueueEntry {
+struct ReadyQueueEntry
+{
/**
* next ptr for DLL
*/
/**
* Opaque handle to an abstract operation to be executed by the testing framework.
*/
-struct GNUNET_TESTBED_Operation {
+struct GNUNET_TESTBED_Operation
+{
/**
* Function to call when we have the resources to begin the operation.
*/
* @return the timeslot
*/
static void
-assign_timeslot(struct GNUNET_TESTBED_Operation *op,
- struct OperationQueue *queue)
+assign_timeslot (struct GNUNET_TESTBED_Operation *op,
+ struct OperationQueue *queue)
{
struct FeedbackCtx *fctx = queue->fctx;
struct TimeSlot *tslot;
- GNUNET_assert(OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
+ GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
tslot = fctx->alloc_head;
- GNUNET_assert(NULL != tslot);
- GNUNET_CONTAINER_DLL_remove(fctx->alloc_head, fctx->alloc_tail, tslot);
- GNUNET_CONTAINER_DLL_insert_tail(op->tslots_head, op->tslots_tail, tslot);
+ GNUNET_assert (NULL != tslot);
+ GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
+ GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
tslot->op = op;
}
* @param index the index of the entry in the operation's array of queue entries
*/
static void
-remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index)
+remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
{
struct OperationQueue *opq;
struct QueueEntry *entry;
opq = op->queues[index];
entry = op->qentries[index];
switch (op->state)
- {
- case OP_STATE_INIT:
- GNUNET_assert(0);
- break;
-
- case OP_STATE_WAITING:
- GNUNET_CONTAINER_DLL_remove(opq->wq_head, opq->wq_tail, entry);
- break;
-
- case OP_STATE_READY:
- GNUNET_CONTAINER_DLL_remove(opq->rq_head, opq->rq_tail, entry);
- break;
-
- case OP_STATE_ACTIVE:
- GNUNET_CONTAINER_DLL_remove(opq->aq_head, opq->aq_tail, entry);
- break;
-
- case OP_STATE_INACTIVE:
- GNUNET_CONTAINER_DLL_remove(opq->nq_head, opq->nq_tail, entry);
- break;
- }
+ {
+ case OP_STATE_INIT:
+ GNUNET_assert (0);
+ break;
+
+ case OP_STATE_WAITING:
+ GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
+ break;
+
+ case OP_STATE_READY:
+ GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
+ break;
+
+ case OP_STATE_ACTIVE:
+ GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
+ break;
+
+ case OP_STATE_INACTIVE:
+ GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
+ break;
+ }
}
* @param state the state the operation should have. It cannot be OP_STATE_INIT
*/
static void
-change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state)
+change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
{
struct QueueEntry *entry;
struct OperationQueue *opq;
unsigned int cnt;
unsigned int s;
- GNUNET_assert(OP_STATE_INIT != state);
- GNUNET_assert(NULL != op->queues);
- GNUNET_assert(NULL != op->nres);
- GNUNET_assert((OP_STATE_INIT == op->state) || (NULL != op->qentries));
- GNUNET_assert(op->state != state);
+ GNUNET_assert (OP_STATE_INIT != state);
+ GNUNET_assert (NULL != op->queues);
+ GNUNET_assert (NULL != op->nres);
+ GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
+ GNUNET_assert (op->state != state);
for (cnt = 0; cnt < op->nqueues; cnt++)
+ {
+ if (OP_STATE_INIT == op->state)
{
- if (OP_STATE_INIT == op->state)
- {
- entry = GNUNET_new(struct QueueEntry);
- entry->op = op;
- entry->nres = op->nres[cnt];
- s = cnt;
- GNUNET_array_append(op->qentries, s, entry);
- }
- else
- {
- entry = op->qentries[cnt];
- remove_queue_entry(op, cnt);
- }
- opq = op->queues[cnt];
- switch (state)
- {
- case OP_STATE_INIT:
- GNUNET_assert(0);
- break;
+ entry = GNUNET_new (struct QueueEntry);
+ entry->op = op;
+ entry->nres = op->nres[cnt];
+ s = cnt;
+ GNUNET_array_append (op->qentries, s, entry);
+ }
+ else
+ {
+ entry = op->qentries[cnt];
+ remove_queue_entry (op, cnt);
+ }
+ opq = op->queues[cnt];
+ switch (state)
+ {
+ case OP_STATE_INIT:
+ GNUNET_assert (0);
+ break;
- case OP_STATE_WAITING:
- GNUNET_CONTAINER_DLL_insert_tail(opq->wq_head, opq->wq_tail, entry);
- break;
+ case OP_STATE_WAITING:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
+ break;
- case OP_STATE_READY:
- GNUNET_CONTAINER_DLL_insert_tail(opq->rq_head, opq->rq_tail, entry);
- break;
+ case OP_STATE_READY:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
+ break;
- case OP_STATE_ACTIVE:
- GNUNET_CONTAINER_DLL_insert_tail(opq->aq_head, opq->aq_tail, entry);
- break;
+ case OP_STATE_ACTIVE:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
+ break;
- case OP_STATE_INACTIVE:
- GNUNET_CONTAINER_DLL_insert_tail(opq->nq_head, opq->nq_tail, entry);
- break;
- }
+ case OP_STATE_INACTIVE:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
+ break;
}
+ }
op->state = state;
}
* @param op the operation to be removed
*/
static void
-rq_remove(struct GNUNET_TESTBED_Operation *op)
+rq_remove (struct GNUNET_TESTBED_Operation *op)
{
- GNUNET_assert(NULL != op->rq_entry);
- GNUNET_CONTAINER_DLL_remove(rq_head, rq_tail, op->rq_entry);
- GNUNET_free(op->rq_entry);
+ GNUNET_assert (NULL != op->rq_entry);
+ GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
+ GNUNET_free (op->rq_entry);
op->rq_entry = NULL;
if ((NULL == rq_head) && (NULL != process_rq_task_id))
- {
- GNUNET_SCHEDULER_cancel(process_rq_task_id);
- process_rq_task_id = NULL;
- }
+ {
+ GNUNET_SCHEDULER_cancel (process_rq_task_id);
+ process_rq_task_id = NULL;
+ }
}
* @param cls NULL
*/
static void
-process_rq_task(void *cls)
+process_rq_task (void *cls)
{
struct GNUNET_TESTBED_Operation *op;
struct OperationQueue *queue;
unsigned int cnt;
process_rq_task_id = NULL;
- GNUNET_assert(NULL != rq_head);
- GNUNET_assert(NULL != (op = rq_head->op));
- rq_remove(op);
+ GNUNET_assert (NULL != rq_head);
+ GNUNET_assert (NULL != (op = rq_head->op));
+ rq_remove (op);
if (NULL != rq_head)
- process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL);
- change_state(op, OP_STATE_ACTIVE);
+ process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
+ change_state (op, OP_STATE_ACTIVE);
for (cnt = 0; cnt < op->nqueues; cnt++)
- {
- queue = op->queues[cnt];
- if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
- assign_timeslot(op, queue);
- }
- op->tstart = GNUNET_TIME_absolute_get();
+ {
+ queue = op->queues[cnt];
+ if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
+ assign_timeslot (op, queue);
+ }
+ op->tstart = GNUNET_TIME_absolute_get ();
if (NULL != op->start)
- op->start(op->cb_cls);
+ op->start (op->cb_cls);
}
* @param op the operation to be queued
*/
static void
-rq_add(struct GNUNET_TESTBED_Operation *op)
+rq_add (struct GNUNET_TESTBED_Operation *op)
{
struct ReadyQueueEntry *rq_entry;
- GNUNET_assert(NULL == op->rq_entry);
- rq_entry = GNUNET_new(struct ReadyQueueEntry);
+ GNUNET_assert (NULL == op->rq_entry);
+ rq_entry = GNUNET_new (struct ReadyQueueEntry);
rq_entry->op = op;
- GNUNET_CONTAINER_DLL_insert_tail(rq_head, rq_tail, rq_entry);
+ GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
op->rq_entry = rq_entry;
if (NULL == process_rq_task_id)
- process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL);
+ process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
}
* otherwise
*/
static int
-is_queue_empty(struct OperationQueue *opq)
+is_queue_empty (struct OperationQueue *opq)
{
if ((NULL != opq->wq_head)
|| (NULL != opq->rq_head)
* be set to NULL and 0 respectively.
*/
static int
-decide_capacity(struct OperationQueue *opq,
- struct QueueEntry *entry,
- struct GNUNET_TESTBED_Operation ***ops_,
- unsigned int *n_ops_)
+decide_capacity (struct OperationQueue *opq,
+ struct QueueEntry *entry,
+ struct GNUNET_TESTBED_Operation ***ops_,
+ unsigned int *n_ops_)
{
struct QueueEntry **evict_entries;
struct GNUNET_TESTBED_Operation **ops;
int deficit;
int rval;
- GNUNET_assert(NULL != (op = entry->op));
- GNUNET_assert(0 < (need = entry->nres));
+ GNUNET_assert (NULL != (op = entry->op));
+ GNUNET_assert (0 < (need = entry->nres));
ops = NULL;
n_ops = 0;
evict_entries = NULL;
n_evict_entries = 0;
rval = GNUNET_YES;
if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
- {
- GNUNET_assert(NULL != opq->fctx);
- GNUNET_assert(opq->max_active >= opq->overload);
- max = opq->max_active - opq->overload;
- }
+ {
+ GNUNET_assert (NULL != opq->fctx);
+ GNUNET_assert (opq->max_active >= opq->overload);
+ max = opq->max_active - opq->overload;
+ }
else
max = opq->max_active;
if (opq->active > max)
- {
- rval = GNUNET_NO;
- goto ret;
- }
+ {
+ rval = GNUNET_NO;
+ goto ret;
+ }
if ((opq->active + need) <= max)
goto ret;
deficit = need - (max - opq->active);
for (entry = opq->nq_head;
(0 < deficit) && (NULL != entry);
entry = entry->next)
- {
- GNUNET_array_append(evict_entries, n_evict_entries, entry);
- deficit -= entry->nres;
- }
+ {
+ GNUNET_array_append (evict_entries, n_evict_entries, entry);
+ deficit -= entry->nres;
+ }
if (0 < deficit)
- {
- rval = GNUNET_NO;
- goto ret;
- }
+ {
+ rval = GNUNET_NO;
+ goto ret;
+ }
for (n_ops = 0; n_ops < n_evict_entries;)
- {
- op = evict_entries[n_ops]->op;
- GNUNET_array_append(ops, n_ops, op); /* increments n-ops */
- }
+ {
+ op = evict_entries[n_ops]->op;
+ GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
+ }
ret:
- GNUNET_free_non_null(evict_entries);
+ GNUNET_free_non_null (evict_entries);
if (NULL != ops_)
*ops_ = ops;
else
- GNUNET_free(ops);
+ GNUNET_free (ops);
if (NULL != n_ops_)
*n_ops_ = n_ops;
return rval;
* @param n_new the number of operations in new array
*/
static void
-merge_ops(struct GNUNET_TESTBED_Operation ***old,
- unsigned int *n_old,
- struct GNUNET_TESTBED_Operation **new,
- unsigned int n_new)
+merge_ops (struct GNUNET_TESTBED_Operation ***old,
+ unsigned int *n_old,
+ struct GNUNET_TESTBED_Operation **new,
+ unsigned int n_new)
{
struct GNUNET_TESTBED_Operation **cur;
unsigned int i;
unsigned int j;
unsigned int n_cur;
- GNUNET_assert(NULL != old);
+ GNUNET_assert (NULL != old);
n_cur = *n_old;
cur = *old;
for (i = 0; i < n_new; i++)
+ {
+ for (j = 0; j < *n_old; j++)
{
- for (j = 0; j < *n_old; j++)
- {
- if (new[i] == cur[j])
- break;
- }
- if (j < *n_old)
- continue;
- GNUNET_array_append(cur, n_cur, new[j]);
+ if (new[i] == cur[j])
+ break;
}
+ if (j < *n_old)
+ continue;
+ GNUNET_array_append (cur, n_cur, new[j]);
+ }
*old = cur;
*n_old = n_cur;
}
-
/**
* Checks for the readiness of an operation and schedules a operation start task
*
* @param op the operation
*/
static int
-check_readiness(struct GNUNET_TESTBED_Operation *op)
+check_readiness (struct GNUNET_TESTBED_Operation *op)
{
struct GNUNET_TESTBED_Operation **evict_ops;
struct GNUNET_TESTBED_Operation **ops;
unsigned int n_evict_ops;
unsigned int i;
- GNUNET_assert(NULL == op->rq_entry);
- GNUNET_assert(OP_STATE_WAITING == op->state);
+ GNUNET_assert (NULL == op->rq_entry);
+ GNUNET_assert (OP_STATE_WAITING == op->state);
evict_ops = NULL;
n_evict_ops = 0;
for (i = 0; i < op->nqueues; i++)
+ {
+ ops = NULL;
+ n_ops = 0;
+ if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
+ &ops, &n_ops))
{
- ops = NULL;
- n_ops = 0;
- if (GNUNET_NO == decide_capacity(op->queues[i], op->qentries[i],
- &ops, &n_ops))
- {
- GNUNET_free_non_null(evict_ops);
- return GNUNET_NO;
- }
- if (NULL == ops)
- continue;
- merge_ops(&evict_ops, &n_evict_ops, ops, n_ops);
- GNUNET_free(ops);
+ GNUNET_free_non_null (evict_ops);
+ return GNUNET_NO;
}
+ if (NULL == ops)
+ continue;
+ merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
+ GNUNET_free (ops);
+ }
if (NULL != evict_ops)
- {
- for (i = 0; i < n_evict_ops; i++)
- GNUNET_TESTBED_operation_release_(evict_ops[i]);
- GNUNET_free(evict_ops);
- evict_ops = NULL;
- /* Evicting the operations should schedule this operation */
- GNUNET_assert(OP_STATE_READY == op->state);
- return GNUNET_YES;
- }
+ {
+ for (i = 0; i < n_evict_ops; i++)
+ GNUNET_TESTBED_operation_release_ (evict_ops[i]);
+ GNUNET_free (evict_ops);
+ evict_ops = NULL;
+ /* Evicting the operations should schedule this operation */
+ GNUNET_assert (OP_STATE_READY == op->state);
+ return GNUNET_YES;
+ }
for (i = 0; i < op->nqueues; i++)
op->queues[i]->active += op->nres[i];
- change_state(op, OP_STATE_READY);
- rq_add(op);
+ change_state (op, OP_STATE_READY);
+ rq_add (op);
return GNUNET_YES;
}
* @param op the operation to defer
*/
static void
-defer(struct GNUNET_TESTBED_Operation *op)
+defer (struct GNUNET_TESTBED_Operation *op)
{
unsigned int i;
- GNUNET_assert(OP_STATE_READY == op->state);
- rq_remove(op);
+ GNUNET_assert (OP_STATE_READY == op->state);
+ rq_remove (op);
for (i = 0; i < op->nqueues; i++)
- {
- GNUNET_assert(op->queues[i]->active >= op->nres[i]);
- op->queues[i]->active -= op->nres[i];
- }
- change_state(op, OP_STATE_WAITING);
+ {
+ GNUNET_assert (op->queues[i]->active >= op->nres[i]);
+ op->queues[i]->active -= op->nres[i];
+ }
+ change_state (op, OP_STATE_WAITING);
}
* @param queue the operation queue
*/
static void
-cleanup_tslots(struct OperationQueue *queue)
+cleanup_tslots (struct OperationQueue *queue)
{
struct FeedbackCtx *fctx = queue->fctx;
struct TimeSlot *tslot;
struct GNUNET_TESTBED_Operation *op;
unsigned int cnt;
- GNUNET_assert(NULL != fctx);
+ GNUNET_assert (NULL != fctx);
for (cnt = 0; cnt < queue->max_active; cnt++)
- {
- tslot = &fctx->tslots_freeptr[cnt];
- op = tslot->op;
- if (NULL == op)
- continue;
- GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot);
- }
- GNUNET_free_non_null(fctx->tslots_freeptr);
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ op = tslot->op;
+ if (NULL == op)
+ continue;
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ }
+ GNUNET_free_non_null (fctx->tslots_freeptr);
fctx->tslots_freeptr = NULL;
fctx->alloc_head = NULL;
fctx->alloc_tail = NULL;
* will be selected as n
*/
static void
-adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n)
+adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
{
struct FeedbackCtx *fctx = queue->fctx;
struct TimeSlot *tslot;
unsigned int cnt;
- cleanup_tslots(queue);
- n = GNUNET_MIN(n, fctx->max_active_bound);
- fctx->tslots_freeptr = GNUNET_malloc(n * sizeof(struct TimeSlot));
+ cleanup_tslots (queue);
+ n = GNUNET_MIN (n, fctx->max_active_bound);
+ fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot));
fctx->nfailed = 0;
for (cnt = 0; cnt < n; cnt++)
- {
- tslot = &fctx->tslots_freeptr[cnt];
- tslot->queue = queue;
- GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail, tslot);
- }
- GNUNET_TESTBED_operation_queue_reset_max_active_(queue, n);
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ tslot->queue = queue;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
+ tslot);
+ }
+ GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
}
* @param queue the queue
*/
static void
-adapt_parallelism(struct OperationQueue *queue)
+adapt_parallelism (struct OperationQueue *queue)
{
struct GNUNET_TIME_Relative avg;
struct FeedbackCtx *fctx;
nvals = 0;
fctx = queue->fctx;
for (cnt = 0; cnt < queue->max_active; cnt++)
- {
- tslot = &fctx->tslots_freeptr[cnt];
- avg = GNUNET_TIME_relative_add(avg, tslot->tsum);
- nvals += tslot->nvals;
- }
- GNUNET_assert(nvals >= queue->max_active);
- GNUNET_assert(fctx->nfailed <= nvals);
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
+ nvals += tslot->nvals;
+ }
+ GNUNET_assert (nvals >= queue->max_active);
+ GNUNET_assert (fctx->nfailed <= nvals);
nvals -= fctx->nfailed;
if (0 == nvals)
- {
- if (1 == queue->max_active)
- adaptive_queue_set_max_active(queue, 1);
- else
- adaptive_queue_set_max_active(queue, queue->max_active / 2);
- return;
- }
- avg = GNUNET_TIME_relative_divide(avg, nvals);
- GNUNET_TESTBED_SD_add_data_(fctx->sd, (unsigned int)avg.rel_value_us);
+ {
+ if (1 == queue->max_active)
+ adaptive_queue_set_max_active (queue, 1);
+ else
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
+ return;
+ }
+ avg = GNUNET_TIME_relative_divide (avg, nvals);
+ GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
if (GNUNET_SYSERR ==
- GNUNET_TESTBED_SD_deviation_factor_(fctx->sd,
- (unsigned int)avg.rel_value_us,
- &sd))
- {
- adaptive_queue_set_max_active(queue, queue->max_active); /* no change */
- return;
- }
+ GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
+ (unsigned int) avg.rel_value_us,
+ &sd))
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
+ return;
+ }
parallelism = 0;
if (-1 == sd)
parallelism = queue->max_active - 1;
if (2 <= sd)
parallelism = queue->max_active / 2;
- parallelism = GNUNET_MAX(parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
- adaptive_queue_set_max_active(queue, parallelism);
+ parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
+ adaptive_queue_set_max_active (queue, parallelism);
#if 0
/* old algorithm */
if (sd < 0)
sd = 0;
- GNUNET_assert(0 <= sd);
- //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ GNUNET_assert (0 <= sd);
+ // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
if (0 == sd)
- {
- adaptive_queue_set_max_active(queue, queue->max_active * 2);
- return;
- }
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active * 2);
+ return;
+ }
if (1 == sd)
- {
- adaptive_queue_set_max_active(queue, queue->max_active + 1);
- return;
- }
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active + 1);
+ return;
+ }
if (1 == queue->max_active)
- {
- adaptive_queue_set_max_active(queue, 1);
- return;
- }
+ {
+ adaptive_queue_set_max_active (queue, 1);
+ return;
+ }
if (2 == sd)
- {
- adaptive_queue_set_max_active(queue, queue->max_active - 1);
- return;
- }
- adaptive_queue_set_max_active(queue, queue->max_active / 2);
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active - 1);
+ return;
+ }
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
#endif
}
* @param op the operation
*/
static void
-update_tslots(struct GNUNET_TESTBED_Operation *op)
+update_tslots (struct GNUNET_TESTBED_Operation *op)
{
struct OperationQueue *queue;
struct GNUNET_TIME_Relative t;
struct FeedbackCtx *fctx;
unsigned int i;
- t = GNUNET_TIME_absolute_get_duration(op->tstart);
+ t = GNUNET_TIME_absolute_get_duration (op->tstart);
while (NULL != (tslot = op->tslots_head)) /* update time slots */
+ {
+ queue = tslot->queue;
+ fctx = queue->fctx;
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ tslot->op = NULL;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
+ tslot);
+ if (op->failed)
{
- queue = tslot->queue;
- fctx = queue->fctx;
- GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot);
- tslot->op = NULL;
- GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail,
- tslot);
- if (op->failed)
- {
- fctx->nfailed++;
- for (i = 0; i < op->nqueues; i++)
- if (queue == op->queues[i])
- break;
- GNUNET_assert(i != op->nqueues);
- op->queues[i]->overload += op->nres[i];
- }
- tslot->tsum = GNUNET_TIME_relative_add(tslot->tsum, t);
- if (0 != tslot->nvals++)
- continue;
- fctx->tslots_filled++;
- if (queue->max_active == fctx->tslots_filled)
- adapt_parallelism(queue);
+ fctx->nfailed++;
+ for (i = 0; i < op->nqueues; i++)
+ if (queue == op->queues[i])
+ break;
+ GNUNET_assert (i != op->nqueues);
+ op->queues[i]->overload += op->nres[i];
}
+ tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
+ if (0 != tslot->nvals++)
+ continue;
+ fctx->tslots_filled++;
+ if (queue->max_active == fctx->tslots_filled)
+ adapt_parallelism (queue);
+ }
}
* @return handle to the operation
*/
struct GNUNET_TESTBED_Operation *
-GNUNET_TESTBED_operation_create_(void *cls, OperationStart start,
- OperationRelease release)
+GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
+ OperationRelease release)
{
struct GNUNET_TESTBED_Operation *op;
- op = GNUNET_new(struct GNUNET_TESTBED_Operation);
+ op = GNUNET_new (struct GNUNET_TESTBED_Operation);
op->start = start;
op->state = OP_STATE_INIT;
op->release = release;
* @return handle to the queue
*/
struct OperationQueue *
-GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type,
- unsigned int max_active)
+GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
+ unsigned int max_active)
{
struct OperationQueue *queue;
struct FeedbackCtx *fctx;
- queue = GNUNET_new(struct OperationQueue);
+ queue = GNUNET_new (struct OperationQueue);
queue->type = type;
if (OPERATION_QUEUE_TYPE_FIXED == type)
- {
- queue->max_active = max_active;
- }
+ {
+ queue->max_active = max_active;
+ }
else
- {
- fctx = GNUNET_new(struct FeedbackCtx);
- fctx->max_active_bound = max_active;
- fctx->sd = GNUNET_TESTBED_SD_init_(ADAPTIVE_QUEUE_DEFAULT_HISTORY);
- queue->fctx = fctx;
- adaptive_queue_set_max_active(queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
- }
+ {
+ fctx = GNUNET_new (struct FeedbackCtx);
+ fctx->max_active_bound = max_active;
+ fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
+ queue->fctx = fctx;
+ adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
+ }
return queue;
}
* @param queue the operation queue to destroy
*/
static void
-queue_destroy(struct OperationQueue *queue)
+queue_destroy (struct OperationQueue *queue)
{
struct FeedbackCtx *fctx;
if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
- {
- cleanup_tslots(queue);
- fctx = queue->fctx;
- GNUNET_TESTBED_SD_destroy_(fctx->sd);
- GNUNET_free(fctx);
- }
- GNUNET_free(queue);
+ {
+ cleanup_tslots (queue);
+ fctx = queue->fctx;
+ GNUNET_TESTBED_SD_destroy_ (fctx->sd);
+ GNUNET_free (fctx);
+ }
+ GNUNET_free (queue);
}
* @param queue queue to destroy
*/
void
-GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue)
+GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
{
- if (GNUNET_YES != is_queue_empty(queue))
- {
- GNUNET_assert(0 == queue->expired); /* Are you calling twice on same queue? */
- queue->expired = 1;
- GNUNET_array_append(expired_opqs, n_expired_opqs, queue);
- return;
- }
- queue_destroy(queue);
+ if (GNUNET_YES != is_queue_empty (queue))
+ {
+ GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */
+ queue->expired = 1;
+ GNUNET_array_append (expired_opqs, n_expired_opqs, queue);
+ return;
+ }
+ queue_destroy (queue);
}
* is not empty)
*/
int
-GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue)
+GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
{
- if (GNUNET_NO == is_queue_empty(queue))
+ if (GNUNET_NO == is_queue_empty (queue))
return GNUNET_NO;
- GNUNET_TESTBED_operation_queue_destroy_(queue);
+ GNUNET_TESTBED_operation_queue_destroy_ (queue);
return GNUNET_YES;
}
* @param opq the operation queue
*/
static void
-recheck_waiting(struct OperationQueue *opq)
+recheck_waiting (struct OperationQueue *opq)
{
struct QueueEntry *entry;
struct QueueEntry *entry2;
entry = opq->wq_head;
while (NULL != entry)
- {
- entry2 = entry->next;
- if (GNUNET_NO == check_readiness(entry->op))
- break;
- entry = entry2;
- }
+ {
+ entry2 = entry->next;
+ if (GNUNET_NO == check_readiness (entry->op))
+ break;
+ entry = entry2;
+ }
}
* @param max_active the new maximum number of active operations
*/
void
-GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue,
- unsigned int max_active)
+GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
+ unsigned int max_active)
{
struct QueueEntry *entry;
queue->overload = 0;
while ((queue->active > queue->max_active)
&& (NULL != (entry = queue->rq_head)))
- defer(entry->op);
- recheck_waiting(queue);
+ defer (entry->op);
+ recheck_waiting (queue);
}
* operation. Should be greater than 0.
*/
void
-GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue,
- struct GNUNET_TESTBED_Operation *op,
- unsigned int nres)
+GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
+ struct GNUNET_TESTBED_Operation *op,
+ unsigned int nres)
{
unsigned int qsize;
- GNUNET_assert(0 < nres);
+ GNUNET_assert (0 < nres);
qsize = op->nqueues;
- GNUNET_array_append(op->queues, op->nqueues, queue);
- GNUNET_array_append(op->nres, qsize, nres);
- GNUNET_assert(qsize == op->nqueues);
+ GNUNET_array_append (op->queues, op->nqueues, queue);
+ GNUNET_array_append (op->nres, qsize, nres);
+ GNUNET_assert (qsize == op->nqueues);
}
* @param op operation to add to the queue
*/
void
-GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue,
- struct GNUNET_TESTBED_Operation *op)
+GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
+ struct GNUNET_TESTBED_Operation *op)
{
- return GNUNET_TESTBED_operation_queue_insert2_(queue, op, 1);
+ return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
}
* @param op the operation to marks as waiting
*/
void
-GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op)
+GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
{
- GNUNET_assert(NULL == op->rq_entry);
- change_state(op, OP_STATE_WAITING);
- (void)check_readiness(op);
+ GNUNET_assert (NULL == op->rq_entry);
+ change_state (op, OP_STATE_WAITING);
+ (void) check_readiness (op);
}
* it as inactive.
*/
void
-GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op)
+GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
{
struct OperationQueue **queues;
size_t ms;
unsigned int nqueues;
unsigned int i;
- GNUNET_assert(OP_STATE_ACTIVE == op->state);
- change_state(op, OP_STATE_INACTIVE);
+ GNUNET_assert (OP_STATE_ACTIVE == op->state);
+ change_state (op, OP_STATE_INACTIVE);
nqueues = op->nqueues;
ms = sizeof(struct OperationQueue *) * nqueues;
- queues = GNUNET_malloc(ms);
+ queues = GNUNET_malloc (ms);
/* Cloning is needed as the operation be released by waiting operations and
hence its nqueues memory ptr will be freed */
- GNUNET_memcpy(queues, op->queues, ms);
+ GNUNET_memcpy (queues, op->queues, ms);
for (i = 0; i < nqueues; i++)
- recheck_waiting(queues[i]);
- GNUNET_free(queues);
+ recheck_waiting (queues[i]);
+ GNUNET_free (queues);
}
* @param op the operation to be marked as active
*/
void
-GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op)
+GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
{
- GNUNET_assert(OP_STATE_INACTIVE == op->state);
- change_state(op, OP_STATE_ACTIVE);
+ GNUNET_assert (OP_STATE_INACTIVE == op->state);
+ change_state (op, OP_STATE_ACTIVE);
}
* @param op operation that finished
*/
void
-GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op)
+GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
{
struct QueueEntry *entry;
struct OperationQueue *opq;
unsigned int i;
if (OP_STATE_INIT == op->state)
- {
- GNUNET_free(op);
- return;
- }
+ {
+ GNUNET_free (op);
+ return;
+ }
if (OP_STATE_READY == op->state)
- rq_remove(op);
+ rq_remove (op);
if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
- GNUNET_TESTBED_operation_activate_(op);
+ GNUNET_TESTBED_operation_activate_ (op);
if (OP_STATE_ACTIVE == op->state)
- update_tslots(op);
- GNUNET_assert(NULL != op->queues);
- GNUNET_assert(NULL != op->qentries);
+ update_tslots (op);
+ GNUNET_assert (NULL != op->queues);
+ GNUNET_assert (NULL != op->qentries);
for (i = 0; i < op->nqueues; i++)
+ {
+ entry = op->qentries[i];
+ remove_queue_entry (op, i);
+ opq = op->queues[i];
+ switch (op->state)
{
- entry = op->qentries[i];
- remove_queue_entry(op, i);
- opq = op->queues[i];
- switch (op->state)
- {
- case OP_STATE_INIT:
- case OP_STATE_INACTIVE:
- GNUNET_assert(0);
- break;
+ case OP_STATE_INIT:
+ case OP_STATE_INACTIVE:
+ GNUNET_assert (0);
+ break;
- case OP_STATE_WAITING:
- break;
+ case OP_STATE_WAITING:
+ break;
- case OP_STATE_ACTIVE:
- case OP_STATE_READY:
- GNUNET_assert(0 != opq->active);
- GNUNET_assert(opq->active >= entry->nres);
- opq->active -= entry->nres;
- recheck_waiting(opq);
- break;
- }
- GNUNET_free(entry);
+ case OP_STATE_ACTIVE:
+ case OP_STATE_READY:
+ GNUNET_assert (0 != opq->active);
+ GNUNET_assert (opq->active >= entry->nres);
+ opq->active -= entry->nres;
+ recheck_waiting (opq);
+ break;
}
- GNUNET_free_non_null(op->qentries);
- GNUNET_free(op->queues);
- GNUNET_free(op->nres);
+ GNUNET_free (entry);
+ }
+ GNUNET_free_non_null (op->qentries);
+ GNUNET_free (op->queues);
+ GNUNET_free (op->nres);
if (NULL != op->release)
- op->release(op->cb_cls);
- GNUNET_free(op);
+ op->release (op->cb_cls);
+ GNUNET_free (op);
}
* @param op the operation to be marked as failed
*/
void
-GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op)
+GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
{
op->failed = GNUNET_YES;
}
* operations which are not completed and warn about them.
*/
void __attribute__ ((destructor))
-GNUNET_TESTBED_operations_fini()
+GNUNET_TESTBED_operations_fini ()
{
struct OperationQueue *queue;
unsigned int i;
int warn = 0;
for (i = 0; i < n_expired_opqs; i++)
- {
- queue = expired_opqs[i];
- if (GNUNET_NO == is_queue_empty(queue))
- warn = 1;
- queue_destroy(queue);
- }
- GNUNET_free_non_null(expired_opqs);
+ {
+ queue = expired_opqs[i];
+ if (GNUNET_NO == is_queue_empty (queue))
+ warn = 1;
+ queue_destroy (queue);
+ }
+ GNUNET_free_non_null (expired_opqs);
n_expired_opqs = 0;
if (warn)
- GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
- "Be disciplined. Some operations were not marked as done.\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Be disciplined. Some operations were not marked as done.\n");
}
+
+
/* end of testbed_api_operations.c */