X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftestbed%2Ftestbed_api_operations.c;h=a406c0e0b834b01344cec29ac3034173eb85d9b1;hb=d17a17ea785f91c18b5694eab3372c4e4564d95e;hp=194dc2655c63e4bd82035794a693955cdaaf30e1;hpb=d41ed82a4ea0cc8e1674b6d5d2c49fd6462610bb;p=oweals%2Fgnunet.git diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index 194dc2655..a406c0e0b 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c @@ -44,7 +44,8 @@ /** * An entry in the operation queue */ -struct QueueEntry { +struct QueueEntry +{ /** * The next DLL pointer */ @@ -77,7 +78,8 @@ struct OperationQueue; /** * A slot to record time taken by an operation */ -struct TimeSlot { +struct TimeSlot +{ /** * DLL next pointer */ @@ -113,7 +115,8 @@ struct TimeSlot { /** * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE */ -struct FeedbackCtx { +struct FeedbackCtx +{ /** * Handle for calculating standard deviation */ @@ -156,7 +159,8 @@ struct FeedbackCtx { * Queue of operations where we can only support a certain * number of concurrent operations of a particular type. */ -struct OperationQueue { +struct OperationQueue +{ /** * DLL head for the wait queue. Operations which are waiting for this * operation queue are put here @@ -243,7 +247,8 @@ struct OperationQueue { /** * Operation state */ -enum OperationState { +enum OperationState +{ /** * The operation is just created and is in initial state */ @@ -277,7 +282,8 @@ enum OperationState { /** * An entry in the ready queue (implemented as DLL) */ -struct ReadyQueueEntry { +struct ReadyQueueEntry +{ /** * next ptr for DLL */ @@ -298,7 +304,8 @@ struct ReadyQueueEntry { /** * Opaque handle to an abstract operation to be executed by the testing framework. */ -struct GNUNET_TESTBED_Operation { +struct GNUNET_TESTBED_Operation +{ /** * Function to call when we have the resources to begin the operation. */ @@ -403,17 +410,17 @@ struct GNUNET_SCHEDULER_Task *process_rq_task_id; * @return the timeslot */ static void -assign_timeslot(struct GNUNET_TESTBED_Operation *op, - struct OperationQueue *queue) +assign_timeslot (struct GNUNET_TESTBED_Operation *op, + struct OperationQueue *queue) { struct FeedbackCtx *fctx = queue->fctx; struct TimeSlot *tslot; - GNUNET_assert(OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type); + GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type); tslot = fctx->alloc_head; - GNUNET_assert(NULL != tslot); - GNUNET_CONTAINER_DLL_remove(fctx->alloc_head, fctx->alloc_tail, tslot); - GNUNET_CONTAINER_DLL_insert_tail(op->tslots_head, op->tslots_tail, tslot); + GNUNET_assert (NULL != tslot); + GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot); + GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot); tslot->op = op; } @@ -426,7 +433,7 @@ assign_timeslot(struct GNUNET_TESTBED_Operation *op, * @param index the index of the entry in the operation's array of queue entries */ static void -remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index) +remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) { struct OperationQueue *opq; struct QueueEntry *entry; @@ -434,27 +441,27 @@ remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index) opq = op->queues[index]; entry = op->qentries[index]; switch (op->state) - { - case OP_STATE_INIT: - GNUNET_assert(0); - break; - - case OP_STATE_WAITING: - GNUNET_CONTAINER_DLL_remove(opq->wq_head, opq->wq_tail, entry); - break; - - case OP_STATE_READY: - GNUNET_CONTAINER_DLL_remove(opq->rq_head, opq->rq_tail, entry); - break; - - case OP_STATE_ACTIVE: - GNUNET_CONTAINER_DLL_remove(opq->aq_head, opq->aq_tail, entry); - break; - - case OP_STATE_INACTIVE: - GNUNET_CONTAINER_DLL_remove(opq->nq_head, opq->nq_tail, entry); - break; - } + { + case OP_STATE_INIT: + GNUNET_assert (0); + break; + + case OP_STATE_WAITING: + GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); + break; + + case OP_STATE_READY: + GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry); + break; + + case OP_STATE_ACTIVE: + GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); + break; + + case OP_STATE_INACTIVE: + GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry); + break; + } } @@ -466,57 +473,57 @@ remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index) * @param state the state the operation should have. It cannot be OP_STATE_INIT */ static void -change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state) +change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) { struct QueueEntry *entry; struct OperationQueue *opq; unsigned int cnt; unsigned int s; - GNUNET_assert(OP_STATE_INIT != state); - GNUNET_assert(NULL != op->queues); - GNUNET_assert(NULL != op->nres); - GNUNET_assert((OP_STATE_INIT == op->state) || (NULL != op->qentries)); - GNUNET_assert(op->state != state); + GNUNET_assert (OP_STATE_INIT != state); + GNUNET_assert (NULL != op->queues); + GNUNET_assert (NULL != op->nres); + GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries)); + GNUNET_assert (op->state != state); for (cnt = 0; cnt < op->nqueues; cnt++) + { + if (OP_STATE_INIT == op->state) { - if (OP_STATE_INIT == op->state) - { - entry = GNUNET_new(struct QueueEntry); - entry->op = op; - entry->nres = op->nres[cnt]; - s = cnt; - GNUNET_array_append(op->qentries, s, entry); - } - else - { - entry = op->qentries[cnt]; - remove_queue_entry(op, cnt); - } - opq = op->queues[cnt]; - switch (state) - { - case OP_STATE_INIT: - GNUNET_assert(0); - break; + entry = GNUNET_new (struct QueueEntry); + entry->op = op; + entry->nres = op->nres[cnt]; + s = cnt; + GNUNET_array_append (op->qentries, s, entry); + } + else + { + entry = op->qentries[cnt]; + remove_queue_entry (op, cnt); + } + opq = op->queues[cnt]; + switch (state) + { + case OP_STATE_INIT: + GNUNET_assert (0); + break; - case OP_STATE_WAITING: - GNUNET_CONTAINER_DLL_insert_tail(opq->wq_head, opq->wq_tail, entry); - break; + case OP_STATE_WAITING: + GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry); + break; - case OP_STATE_READY: - GNUNET_CONTAINER_DLL_insert_tail(opq->rq_head, opq->rq_tail, entry); - break; + case OP_STATE_READY: + GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry); + break; - case OP_STATE_ACTIVE: - GNUNET_CONTAINER_DLL_insert_tail(opq->aq_head, opq->aq_tail, entry); - break; + case OP_STATE_ACTIVE: + GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry); + break; - case OP_STATE_INACTIVE: - GNUNET_CONTAINER_DLL_insert_tail(opq->nq_head, opq->nq_tail, entry); - break; - } + case OP_STATE_INACTIVE: + GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry); + break; } + } op->state = state; } @@ -528,17 +535,17 @@ change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state) * @param op the operation to be removed */ static void -rq_remove(struct GNUNET_TESTBED_Operation *op) +rq_remove (struct GNUNET_TESTBED_Operation *op) { - GNUNET_assert(NULL != op->rq_entry); - GNUNET_CONTAINER_DLL_remove(rq_head, rq_tail, op->rq_entry); - GNUNET_free(op->rq_entry); + GNUNET_assert (NULL != op->rq_entry); + GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry); + GNUNET_free (op->rq_entry); op->rq_entry = NULL; if ((NULL == rq_head) && (NULL != process_rq_task_id)) - { - GNUNET_SCHEDULER_cancel(process_rq_task_id); - process_rq_task_id = NULL; - } + { + GNUNET_SCHEDULER_cancel (process_rq_task_id); + process_rq_task_id = NULL; + } } @@ -551,28 +558,28 @@ rq_remove(struct GNUNET_TESTBED_Operation *op) * @param cls NULL */ static void -process_rq_task(void *cls) +process_rq_task (void *cls) { struct GNUNET_TESTBED_Operation *op; struct OperationQueue *queue; unsigned int cnt; process_rq_task_id = NULL; - GNUNET_assert(NULL != rq_head); - GNUNET_assert(NULL != (op = rq_head->op)); - rq_remove(op); + GNUNET_assert (NULL != rq_head); + GNUNET_assert (NULL != (op = rq_head->op)); + rq_remove (op); if (NULL != rq_head) - process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL); - change_state(op, OP_STATE_ACTIVE); + process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); + change_state (op, OP_STATE_ACTIVE); for (cnt = 0; cnt < op->nqueues; cnt++) - { - queue = op->queues[cnt]; - if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) - assign_timeslot(op, queue); - } - op->tstart = GNUNET_TIME_absolute_get(); + { + queue = op->queues[cnt]; + if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) + assign_timeslot (op, queue); + } + op->tstart = GNUNET_TIME_absolute_get (); if (NULL != op->start) - op->start(op->cb_cls); + op->start (op->cb_cls); } @@ -582,17 +589,17 @@ process_rq_task(void *cls) * @param op the operation to be queued */ static void -rq_add(struct GNUNET_TESTBED_Operation *op) +rq_add (struct GNUNET_TESTBED_Operation *op) { struct ReadyQueueEntry *rq_entry; - GNUNET_assert(NULL == op->rq_entry); - rq_entry = GNUNET_new(struct ReadyQueueEntry); + GNUNET_assert (NULL == op->rq_entry); + rq_entry = GNUNET_new (struct ReadyQueueEntry); rq_entry->op = op; - GNUNET_CONTAINER_DLL_insert_tail(rq_head, rq_tail, rq_entry); + GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry); op->rq_entry = rq_entry; if (NULL == process_rq_task_id) - process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL); + process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); } @@ -604,7 +611,7 @@ rq_add(struct GNUNET_TESTBED_Operation *op) * otherwise */ static int -is_queue_empty(struct OperationQueue *opq) +is_queue_empty (struct OperationQueue *opq) { if ((NULL != opq->wq_head) || (NULL != opq->rq_head) @@ -632,10 +639,10 @@ is_queue_empty(struct OperationQueue *opq) * be set to NULL and 0 respectively. */ static int -decide_capacity(struct OperationQueue *opq, - struct QueueEntry *entry, - struct GNUNET_TESTBED_Operation ***ops_, - unsigned int *n_ops_) +decide_capacity (struct OperationQueue *opq, + struct QueueEntry *entry, + struct GNUNET_TESTBED_Operation ***ops_, + unsigned int *n_ops_) { struct QueueEntry **evict_entries; struct GNUNET_TESTBED_Operation **ops; @@ -647,53 +654,53 @@ decide_capacity(struct OperationQueue *opq, int deficit; int rval; - GNUNET_assert(NULL != (op = entry->op)); - GNUNET_assert(0 < (need = entry->nres)); + GNUNET_assert (NULL != (op = entry->op)); + GNUNET_assert (0 < (need = entry->nres)); ops = NULL; n_ops = 0; evict_entries = NULL; n_evict_entries = 0; rval = GNUNET_YES; if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) - { - GNUNET_assert(NULL != opq->fctx); - GNUNET_assert(opq->max_active >= opq->overload); - max = opq->max_active - opq->overload; - } + { + GNUNET_assert (NULL != opq->fctx); + GNUNET_assert (opq->max_active >= opq->overload); + max = opq->max_active - opq->overload; + } else max = opq->max_active; if (opq->active > max) - { - rval = GNUNET_NO; - goto ret; - } + { + rval = GNUNET_NO; + goto ret; + } if ((opq->active + need) <= max) goto ret; deficit = need - (max - opq->active); for (entry = opq->nq_head; (0 < deficit) && (NULL != entry); entry = entry->next) - { - GNUNET_array_append(evict_entries, n_evict_entries, entry); - deficit -= entry->nres; - } + { + GNUNET_array_append (evict_entries, n_evict_entries, entry); + deficit -= entry->nres; + } if (0 < deficit) - { - rval = GNUNET_NO; - goto ret; - } + { + rval = GNUNET_NO; + goto ret; + } for (n_ops = 0; n_ops < n_evict_entries;) - { - op = evict_entries[n_ops]->op; - GNUNET_array_append(ops, n_ops, op); /* increments n-ops */ - } + { + op = evict_entries[n_ops]->op; + GNUNET_array_append (ops, n_ops, op); /* increments n-ops */ + } ret: - GNUNET_free_non_null(evict_entries); + GNUNET_free_non_null (evict_entries); if (NULL != ops_) *ops_ = ops; else - GNUNET_free(ops); + GNUNET_free (ops); if (NULL != n_ops_) *n_ops_ = n_ops; return rval; @@ -710,43 +717,42 @@ ret: * @param n_new the number of operations in new array */ static void -merge_ops(struct GNUNET_TESTBED_Operation ***old, - unsigned int *n_old, - struct GNUNET_TESTBED_Operation **new, - unsigned int n_new) +merge_ops (struct GNUNET_TESTBED_Operation ***old, + unsigned int *n_old, + struct GNUNET_TESTBED_Operation **new, + unsigned int n_new) { struct GNUNET_TESTBED_Operation **cur; unsigned int i; unsigned int j; unsigned int n_cur; - GNUNET_assert(NULL != old); + GNUNET_assert (NULL != old); n_cur = *n_old; cur = *old; for (i = 0; i < n_new; i++) + { + for (j = 0; j < *n_old; j++) { - for (j = 0; j < *n_old; j++) - { - if (new[i] == cur[j]) - break; - } - if (j < *n_old) - continue; - GNUNET_array_append(cur, n_cur, new[j]); + if (new[i] == cur[j]) + break; } + if (j < *n_old) + continue; + GNUNET_array_append (cur, n_cur, new[j]); + } *old = cur; *n_old = n_cur; } - /** * Checks for the readiness of an operation and schedules a operation start task * * @param op the operation */ static int -check_readiness(struct GNUNET_TESTBED_Operation *op) +check_readiness (struct GNUNET_TESTBED_Operation *op) { struct GNUNET_TESTBED_Operation **evict_ops; struct GNUNET_TESTBED_Operation **ops; @@ -754,39 +760,39 @@ check_readiness(struct GNUNET_TESTBED_Operation *op) unsigned int n_evict_ops; unsigned int i; - GNUNET_assert(NULL == op->rq_entry); - GNUNET_assert(OP_STATE_WAITING == op->state); + GNUNET_assert (NULL == op->rq_entry); + GNUNET_assert (OP_STATE_WAITING == op->state); evict_ops = NULL; n_evict_ops = 0; for (i = 0; i < op->nqueues; i++) + { + ops = NULL; + n_ops = 0; + if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i], + &ops, &n_ops)) { - ops = NULL; - n_ops = 0; - if (GNUNET_NO == decide_capacity(op->queues[i], op->qentries[i], - &ops, &n_ops)) - { - GNUNET_free_non_null(evict_ops); - return GNUNET_NO; - } - if (NULL == ops) - continue; - merge_ops(&evict_ops, &n_evict_ops, ops, n_ops); - GNUNET_free(ops); + GNUNET_free_non_null (evict_ops); + return GNUNET_NO; } + if (NULL == ops) + continue; + merge_ops (&evict_ops, &n_evict_ops, ops, n_ops); + GNUNET_free (ops); + } if (NULL != evict_ops) - { - for (i = 0; i < n_evict_ops; i++) - GNUNET_TESTBED_operation_release_(evict_ops[i]); - GNUNET_free(evict_ops); - evict_ops = NULL; - /* Evicting the operations should schedule this operation */ - GNUNET_assert(OP_STATE_READY == op->state); - return GNUNET_YES; - } + { + for (i = 0; i < n_evict_ops; i++) + GNUNET_TESTBED_operation_release_ (evict_ops[i]); + GNUNET_free (evict_ops); + evict_ops = NULL; + /* Evicting the operations should schedule this operation */ + GNUNET_assert (OP_STATE_READY == op->state); + return GNUNET_YES; + } for (i = 0; i < op->nqueues; i++) op->queues[i]->active += op->nres[i]; - change_state(op, OP_STATE_READY); - rq_add(op); + change_state (op, OP_STATE_READY); + rq_add (op); return GNUNET_YES; } @@ -797,18 +803,18 @@ check_readiness(struct GNUNET_TESTBED_Operation *op) * @param op the operation to defer */ static void -defer(struct GNUNET_TESTBED_Operation *op) +defer (struct GNUNET_TESTBED_Operation *op) { unsigned int i; - GNUNET_assert(OP_STATE_READY == op->state); - rq_remove(op); + GNUNET_assert (OP_STATE_READY == op->state); + rq_remove (op); for (i = 0; i < op->nqueues; i++) - { - GNUNET_assert(op->queues[i]->active >= op->nres[i]); - op->queues[i]->active -= op->nres[i]; - } - change_state(op, OP_STATE_WAITING); + { + GNUNET_assert (op->queues[i]->active >= op->nres[i]); + op->queues[i]->active -= op->nres[i]; + } + change_state (op, OP_STATE_WAITING); } @@ -820,23 +826,23 @@ defer(struct GNUNET_TESTBED_Operation *op) * @param queue the operation queue */ static void -cleanup_tslots(struct OperationQueue *queue) +cleanup_tslots (struct OperationQueue *queue) { struct FeedbackCtx *fctx = queue->fctx; struct TimeSlot *tslot; struct GNUNET_TESTBED_Operation *op; unsigned int cnt; - GNUNET_assert(NULL != fctx); + GNUNET_assert (NULL != fctx); for (cnt = 0; cnt < queue->max_active; cnt++) - { - tslot = &fctx->tslots_freeptr[cnt]; - op = tslot->op; - if (NULL == op) - continue; - GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot); - } - GNUNET_free_non_null(fctx->tslots_freeptr); + { + tslot = &fctx->tslots_freeptr[cnt]; + op = tslot->op; + if (NULL == op) + continue; + GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); + } + GNUNET_free_non_null (fctx->tslots_freeptr); fctx->tslots_freeptr = NULL; fctx->alloc_head = NULL; fctx->alloc_tail = NULL; @@ -854,23 +860,24 @@ cleanup_tslots(struct OperationQueue *queue) * will be selected as n */ static void -adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n) +adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n) { struct FeedbackCtx *fctx = queue->fctx; struct TimeSlot *tslot; unsigned int cnt; - cleanup_tslots(queue); - n = GNUNET_MIN(n, fctx->max_active_bound); - fctx->tslots_freeptr = GNUNET_malloc(n * sizeof(struct TimeSlot)); + cleanup_tslots (queue); + n = GNUNET_MIN (n, fctx->max_active_bound); + fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot)); fctx->nfailed = 0; for (cnt = 0; cnt < n; cnt++) - { - tslot = &fctx->tslots_freeptr[cnt]; - tslot->queue = queue; - GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail, tslot); - } - GNUNET_TESTBED_operation_queue_reset_max_active_(queue, n); + { + tslot = &fctx->tslots_freeptr[cnt]; + tslot->queue = queue; + GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, + tslot); + } + GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n); } @@ -881,7 +888,7 @@ adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n) * @param queue the queue */ static void -adapt_parallelism(struct OperationQueue *queue) +adapt_parallelism (struct OperationQueue *queue) { struct GNUNET_TIME_Relative avg; struct FeedbackCtx *fctx; @@ -895,32 +902,32 @@ adapt_parallelism(struct OperationQueue *queue) nvals = 0; fctx = queue->fctx; for (cnt = 0; cnt < queue->max_active; cnt++) - { - tslot = &fctx->tslots_freeptr[cnt]; - avg = GNUNET_TIME_relative_add(avg, tslot->tsum); - nvals += tslot->nvals; - } - GNUNET_assert(nvals >= queue->max_active); - GNUNET_assert(fctx->nfailed <= nvals); + { + tslot = &fctx->tslots_freeptr[cnt]; + avg = GNUNET_TIME_relative_add (avg, tslot->tsum); + nvals += tslot->nvals; + } + GNUNET_assert (nvals >= queue->max_active); + GNUNET_assert (fctx->nfailed <= nvals); nvals -= fctx->nfailed; if (0 == nvals) - { - if (1 == queue->max_active) - adaptive_queue_set_max_active(queue, 1); - else - adaptive_queue_set_max_active(queue, queue->max_active / 2); - return; - } - avg = GNUNET_TIME_relative_divide(avg, nvals); - GNUNET_TESTBED_SD_add_data_(fctx->sd, (unsigned int)avg.rel_value_us); + { + if (1 == queue->max_active) + adaptive_queue_set_max_active (queue, 1); + else + adaptive_queue_set_max_active (queue, queue->max_active / 2); + return; + } + avg = GNUNET_TIME_relative_divide (avg, nvals); + GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); if (GNUNET_SYSERR == - GNUNET_TESTBED_SD_deviation_factor_(fctx->sd, - (unsigned int)avg.rel_value_us, - &sd)) - { - adaptive_queue_set_max_active(queue, queue->max_active); /* no change */ - return; - } + GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, + (unsigned int) avg.rel_value_us, + &sd)) + { + adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ + return; + } parallelism = 0; if (-1 == sd) @@ -931,36 +938,36 @@ adapt_parallelism(struct OperationQueue *queue) parallelism = queue->max_active - 1; if (2 <= sd) parallelism = queue->max_active / 2; - parallelism = GNUNET_MAX(parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); - adaptive_queue_set_max_active(queue, parallelism); + parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); + adaptive_queue_set_max_active (queue, parallelism); #if 0 /* old algorithm */ if (sd < 0) sd = 0; - GNUNET_assert(0 <= sd); - //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); + GNUNET_assert (0 <= sd); + // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); if (0 == sd) - { - adaptive_queue_set_max_active(queue, queue->max_active * 2); - return; - } + { + adaptive_queue_set_max_active (queue, queue->max_active * 2); + return; + } if (1 == sd) - { - adaptive_queue_set_max_active(queue, queue->max_active + 1); - return; - } + { + adaptive_queue_set_max_active (queue, queue->max_active + 1); + return; + } if (1 == queue->max_active) - { - adaptive_queue_set_max_active(queue, 1); - return; - } + { + adaptive_queue_set_max_active (queue, 1); + return; + } if (2 == sd) - { - adaptive_queue_set_max_active(queue, queue->max_active - 1); - return; - } - adaptive_queue_set_max_active(queue, queue->max_active / 2); + { + adaptive_queue_set_max_active (queue, queue->max_active - 1); + return; + } + adaptive_queue_set_max_active (queue, queue->max_active / 2); #endif } @@ -973,7 +980,7 @@ adapt_parallelism(struct OperationQueue *queue) * @param op the operation */ static void -update_tslots(struct GNUNET_TESTBED_Operation *op) +update_tslots (struct GNUNET_TESTBED_Operation *op) { struct OperationQueue *queue; struct GNUNET_TIME_Relative t; @@ -981,31 +988,31 @@ update_tslots(struct GNUNET_TESTBED_Operation *op) struct FeedbackCtx *fctx; unsigned int i; - t = GNUNET_TIME_absolute_get_duration(op->tstart); + t = GNUNET_TIME_absolute_get_duration (op->tstart); while (NULL != (tslot = op->tslots_head)) /* update time slots */ + { + queue = tslot->queue; + fctx = queue->fctx; + GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); + tslot->op = NULL; + GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, + tslot); + if (op->failed) { - queue = tslot->queue; - fctx = queue->fctx; - GNUNET_CONTAINER_DLL_remove(op->tslots_head, op->tslots_tail, tslot); - tslot->op = NULL; - GNUNET_CONTAINER_DLL_insert_tail(fctx->alloc_head, fctx->alloc_tail, - tslot); - if (op->failed) - { - fctx->nfailed++; - for (i = 0; i < op->nqueues; i++) - if (queue == op->queues[i]) - break; - GNUNET_assert(i != op->nqueues); - op->queues[i]->overload += op->nres[i]; - } - tslot->tsum = GNUNET_TIME_relative_add(tslot->tsum, t); - if (0 != tslot->nvals++) - continue; - fctx->tslots_filled++; - if (queue->max_active == fctx->tslots_filled) - adapt_parallelism(queue); + fctx->nfailed++; + for (i = 0; i < op->nqueues; i++) + if (queue == op->queues[i]) + break; + GNUNET_assert (i != op->nqueues); + op->queues[i]->overload += op->nres[i]; } + tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t); + if (0 != tslot->nvals++) + continue; + fctx->tslots_filled++; + if (queue->max_active == fctx->tslots_filled) + adapt_parallelism (queue); + } } @@ -1018,12 +1025,12 @@ update_tslots(struct GNUNET_TESTBED_Operation *op) * @return handle to the operation */ struct GNUNET_TESTBED_Operation * -GNUNET_TESTBED_operation_create_(void *cls, OperationStart start, - OperationRelease release) +GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start, + OperationRelease release) { struct GNUNET_TESTBED_Operation *op; - op = GNUNET_new(struct GNUNET_TESTBED_Operation); + op = GNUNET_new (struct GNUNET_TESTBED_Operation); op->start = start; op->state = OP_STATE_INIT; op->release = release; @@ -1041,26 +1048,26 @@ GNUNET_TESTBED_operation_create_(void *cls, OperationStart start, * @return handle to the queue */ struct OperationQueue * -GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type, - unsigned int max_active) +GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type, + unsigned int max_active) { struct OperationQueue *queue; struct FeedbackCtx *fctx; - queue = GNUNET_new(struct OperationQueue); + queue = GNUNET_new (struct OperationQueue); queue->type = type; if (OPERATION_QUEUE_TYPE_FIXED == type) - { - queue->max_active = max_active; - } + { + queue->max_active = max_active; + } else - { - fctx = GNUNET_new(struct FeedbackCtx); - fctx->max_active_bound = max_active; - fctx->sd = GNUNET_TESTBED_SD_init_(ADAPTIVE_QUEUE_DEFAULT_HISTORY); - queue->fctx = fctx; - adaptive_queue_set_max_active(queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); - } + { + fctx = GNUNET_new (struct FeedbackCtx); + fctx->max_active_bound = max_active; + fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY); + queue->fctx = fctx; + adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); + } return queue; } @@ -1071,18 +1078,18 @@ GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type, * @param queue the operation queue to destroy */ static void -queue_destroy(struct OperationQueue *queue) +queue_destroy (struct OperationQueue *queue) { struct FeedbackCtx *fctx; if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) - { - cleanup_tslots(queue); - fctx = queue->fctx; - GNUNET_TESTBED_SD_destroy_(fctx->sd); - GNUNET_free(fctx); - } - GNUNET_free(queue); + { + cleanup_tslots (queue); + fctx = queue->fctx; + GNUNET_TESTBED_SD_destroy_ (fctx->sd); + GNUNET_free (fctx); + } + GNUNET_free (queue); } @@ -1094,16 +1101,16 @@ queue_destroy(struct OperationQueue *queue) * @param queue queue to destroy */ void -GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue) +GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) { - if (GNUNET_YES != is_queue_empty(queue)) - { - GNUNET_assert(0 == queue->expired); /* Are you calling twice on same queue? */ - queue->expired = 1; - GNUNET_array_append(expired_opqs, n_expired_opqs, queue); - return; - } - queue_destroy(queue); + if (GNUNET_YES != is_queue_empty (queue)) + { + GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */ + queue->expired = 1; + GNUNET_array_append (expired_opqs, n_expired_opqs, queue); + return; + } + queue_destroy (queue); } @@ -1115,11 +1122,11 @@ GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue) * is not empty) */ int -GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue) +GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue) { - if (GNUNET_NO == is_queue_empty(queue)) + if (GNUNET_NO == is_queue_empty (queue)) return GNUNET_NO; - GNUNET_TESTBED_operation_queue_destroy_(queue); + GNUNET_TESTBED_operation_queue_destroy_ (queue); return GNUNET_YES; } @@ -1131,19 +1138,19 @@ GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue) * @param opq the operation queue */ static void -recheck_waiting(struct OperationQueue *opq) +recheck_waiting (struct OperationQueue *opq) { struct QueueEntry *entry; struct QueueEntry *entry2; entry = opq->wq_head; while (NULL != entry) - { - entry2 = entry->next; - if (GNUNET_NO == check_readiness(entry->op)) - break; - entry = entry2; - } + { + entry2 = entry->next; + if (GNUNET_NO == check_readiness (entry->op)) + break; + entry = entry2; + } } @@ -1156,8 +1163,8 @@ recheck_waiting(struct OperationQueue *opq) * @param max_active the new maximum number of active operations */ void -GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, - unsigned int max_active) +GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, + unsigned int max_active) { struct QueueEntry *entry; @@ -1165,8 +1172,8 @@ GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, queue->overload = 0; while ((queue->active > queue->max_active) && (NULL != (entry = queue->rq_head))) - defer(entry->op); - recheck_waiting(queue); + defer (entry->op); + recheck_waiting (queue); } @@ -1182,17 +1189,17 @@ GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, * operation. Should be greater than 0. */ void -GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue, - struct GNUNET_TESTBED_Operation *op, - unsigned int nres) +GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, + struct GNUNET_TESTBED_Operation *op, + unsigned int nres) { unsigned int qsize; - GNUNET_assert(0 < nres); + GNUNET_assert (0 < nres); qsize = op->nqueues; - GNUNET_array_append(op->queues, op->nqueues, queue); - GNUNET_array_append(op->nres, qsize, nres); - GNUNET_assert(qsize == op->nqueues); + GNUNET_array_append (op->queues, op->nqueues, queue); + GNUNET_array_append (op->nres, qsize, nres); + GNUNET_assert (qsize == op->nqueues); } @@ -1208,10 +1215,10 @@ GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue, * @param op operation to add to the queue */ void -GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue, - struct GNUNET_TESTBED_Operation *op) +GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue, + struct GNUNET_TESTBED_Operation *op) { - return GNUNET_TESTBED_operation_queue_insert2_(queue, op, 1); + return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1); } @@ -1225,11 +1232,11 @@ GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue, * @param op the operation to marks as waiting */ void -GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op) +GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) { - GNUNET_assert(NULL == op->rq_entry); - change_state(op, OP_STATE_WAITING); - (void)check_readiness(op); + GNUNET_assert (NULL == op->rq_entry); + change_state (op, OP_STATE_WAITING); + (void) check_readiness (op); } @@ -1243,24 +1250,24 @@ GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op) * it as inactive. */ void -GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op) +GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) { struct OperationQueue **queues; size_t ms; unsigned int nqueues; unsigned int i; - GNUNET_assert(OP_STATE_ACTIVE == op->state); - change_state(op, OP_STATE_INACTIVE); + GNUNET_assert (OP_STATE_ACTIVE == op->state); + change_state (op, OP_STATE_INACTIVE); nqueues = op->nqueues; ms = sizeof(struct OperationQueue *) * nqueues; - queues = GNUNET_malloc(ms); + queues = GNUNET_malloc (ms); /* Cloning is needed as the operation be released by waiting operations and hence its nqueues memory ptr will be freed */ - GNUNET_memcpy(queues, op->queues, ms); + GNUNET_memcpy (queues, op->queues, ms); for (i = 0; i < nqueues; i++) - recheck_waiting(queues[i]); - GNUNET_free(queues); + recheck_waiting (queues[i]); + GNUNET_free (queues); } @@ -1272,10 +1279,10 @@ GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op) * @param op the operation to be marked as active */ void -GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op) +GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) { - GNUNET_assert(OP_STATE_INACTIVE == op->state); - change_state(op, OP_STATE_ACTIVE); + GNUNET_assert (OP_STATE_INACTIVE == op->state); + change_state (op, OP_STATE_ACTIVE); } @@ -1286,56 +1293,56 @@ GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op) * @param op operation that finished */ void -GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op) +GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) { struct QueueEntry *entry; struct OperationQueue *opq; unsigned int i; if (OP_STATE_INIT == op->state) - { - GNUNET_free(op); - return; - } + { + GNUNET_free (op); + return; + } if (OP_STATE_READY == op->state) - rq_remove(op); + rq_remove (op); if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ - GNUNET_TESTBED_operation_activate_(op); + GNUNET_TESTBED_operation_activate_ (op); if (OP_STATE_ACTIVE == op->state) - update_tslots(op); - GNUNET_assert(NULL != op->queues); - GNUNET_assert(NULL != op->qentries); + update_tslots (op); + GNUNET_assert (NULL != op->queues); + GNUNET_assert (NULL != op->qentries); for (i = 0; i < op->nqueues; i++) + { + entry = op->qentries[i]; + remove_queue_entry (op, i); + opq = op->queues[i]; + switch (op->state) { - entry = op->qentries[i]; - remove_queue_entry(op, i); - opq = op->queues[i]; - switch (op->state) - { - case OP_STATE_INIT: - case OP_STATE_INACTIVE: - GNUNET_assert(0); - break; + case OP_STATE_INIT: + case OP_STATE_INACTIVE: + GNUNET_assert (0); + break; - case OP_STATE_WAITING: - break; + case OP_STATE_WAITING: + break; - case OP_STATE_ACTIVE: - case OP_STATE_READY: - GNUNET_assert(0 != opq->active); - GNUNET_assert(opq->active >= entry->nres); - opq->active -= entry->nres; - recheck_waiting(opq); - break; - } - GNUNET_free(entry); + case OP_STATE_ACTIVE: + case OP_STATE_READY: + GNUNET_assert (0 != opq->active); + GNUNET_assert (opq->active >= entry->nres); + opq->active -= entry->nres; + recheck_waiting (opq); + break; } - GNUNET_free_non_null(op->qentries); - GNUNET_free(op->queues); - GNUNET_free(op->nres); + GNUNET_free (entry); + } + GNUNET_free_non_null (op->qentries); + GNUNET_free (op->queues); + GNUNET_free (op->nres); if (NULL != op->release) - op->release(op->cb_cls); - GNUNET_free(op); + op->release (op->cb_cls); + GNUNET_free (op); } @@ -1345,7 +1352,7 @@ GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op) * @param op the operation to be marked as failed */ void -GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op) +GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op) { op->failed = GNUNET_YES; } @@ -1356,23 +1363,25 @@ GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op) * operations which are not completed and warn about them. */ void __attribute__ ((destructor)) -GNUNET_TESTBED_operations_fini() +GNUNET_TESTBED_operations_fini () { struct OperationQueue *queue; unsigned int i; int warn = 0; for (i = 0; i < n_expired_opqs; i++) - { - queue = expired_opqs[i]; - if (GNUNET_NO == is_queue_empty(queue)) - warn = 1; - queue_destroy(queue); - } - GNUNET_free_non_null(expired_opqs); + { + queue = expired_opqs[i]; + if (GNUNET_NO == is_queue_empty (queue)) + warn = 1; + queue_destroy (queue); + } + GNUNET_free_non_null (expired_opqs); n_expired_opqs = 0; if (warn) - GNUNET_log(GNUNET_ERROR_TYPE_WARNING, - "Be disciplined. Some operations were not marked as done.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Be disciplined. Some operations were not marked as done.\n"); } + + /* end of testbed_api_operations.c */