X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftestbed%2Ftestbed_api_operations.c;h=de24517d6a959c2eba983f2c749880707a21aa05;hb=f40acfa927bb605c81c99ed250277d51bf951e25;hp=fc5da29b59945151b76357dba61337623cf5fc0c;hpb=d65fc468de7a61f016e476cfe3fc471901f32e8a;p=oweals%2Fgnunet.git diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index fc5da29b5..de24517d6 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2008--2012 Christian Grothoff (and other contributing authors) + Copyright (C) 2008--2013 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,18 +14,32 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file testbed/testbed_api_operations.c * @brief functions to manage operation queues * @author Christian Grothoff + * @author Sree Harsha Totakura */ + #include "platform.h" #include "testbed_api_operations.h" +#include "testbed_api_sd.h" +/** + * The number of readings containing past operation's timing information that we + * keep track of for adaptive queues + */ +#define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40 + +/** + * The number of parallel opeartions we start with by default for adaptive + * queues + */ +#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4 /** * An entry in the operation queue @@ -54,6 +68,93 @@ struct QueueEntry }; +/** + * Queue of operations where we can only support a certain + * number of concurrent operations of a particular type. + */ +struct OperationQueue; + + +/** + * A slot to record time taken by an operation + */ +struct TimeSlot +{ + /** + * DLL next pointer + */ + struct TimeSlot *next; + + /** + * DLL prev pointer + */ + struct TimeSlot *prev; + + /** + * This operation queue to which this time slot belongs to + */ + struct OperationQueue *queue; + + /** + * The operation to which this timeslot is currently allocated to + */ + struct GNUNET_TESTBED_Operation *op; + + /** + * Accumulated time + */ + struct GNUNET_TIME_Relative tsum; + + /** + * Number of timing values accumulated + */ + unsigned int nvals; +}; + + +/** + * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE + */ +struct FeedbackCtx +{ + /** + * Handle for calculating standard deviation + */ + struct SDHandle *sd; + + /** + * Head for DLL of time slots which are free to be allocated to operations + */ + struct TimeSlot *alloc_head; + + /** + * Tail for DLL of time slots which are free to be allocated to operations + */ + struct TimeSlot *alloc_tail; + + /** + * Pointer to the chunk of time slots. Free all time slots at a time using + * this pointer. + */ + struct TimeSlot *tslots_freeptr; + + /** + * Number of time slots filled so far + */ + unsigned int tslots_filled; + + /** + * Bound on the maximum number of operations which can be active + */ + unsigned int max_active_bound; + + /** + * Number of operations that have failed + */ + unsigned int nfailed; +}; + + /** * Queue of operations where we can only support a certain * number of concurrent operations of a particular type. @@ -105,16 +206,41 @@ struct OperationQueue */ struct QueueEntry *nq_tail; + /** + * Feedback context; only relevant for adaptive operation queues. NULL for + * fixed operation queues + */ + struct FeedbackCtx *fctx; + + /** + * The type of this opeartion queue + */ + enum OperationQueueType type; + /** * Number of operations that are currently active in this queue. */ unsigned int active; /** - * Max number of operations which can be active at any time in this queue + * Max number of operations which can be active at any time in this queue. + * This value can be changed either by calling + * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive + * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE */ unsigned int max_active; + /** + * The number of resources occupied by failed operations in the current shot. + * This is only relavant if the operation queue is of type + * #OPERATION_QUEUE_TYPE_ADAPTIVE + */ + unsigned int overload; + + /** + * Is this queue marked for expiry? + */ + unsigned int expired; }; @@ -139,9 +265,9 @@ enum OperationState OP_STATE_READY, /** - * The operation has started + * The operation has started and is active */ - OP_STATE_STARTED, + OP_STATE_ACTIVE, /** * The operation is inactive. It still holds resources on the operation @@ -162,7 +288,7 @@ struct ReadyQueueEntry * next ptr for DLL */ struct ReadyQueueEntry *next; - + /** * prev ptr for DLL */ @@ -219,6 +345,21 @@ struct GNUNET_TESTBED_Operation */ struct ReadyQueueEntry *rq_entry; + /** + * Head pointer for DLL of tslots allocated to this operation + */ + struct TimeSlot *tslots_head; + + /** + * Tail pointer for DLL of tslots allocated to this operation + */ + struct TimeSlot *tslots_tail; + + /** + * The time at which the operation is started + */ + struct GNUNET_TIME_Absolute tstart; + /** * Number of queues in the operation queues array */ @@ -229,29 +370,75 @@ struct GNUNET_TESTBED_Operation */ enum OperationState state; + /** + * Is this a failed operation? + */ + int failed; + }; /** * DLL head for the ready queue */ -struct ReadyQueueEntry *rq_head; +static struct ReadyQueueEntry *rq_head; /** * DLL tail for the ready queue */ -struct ReadyQueueEntry *rq_tail; +static struct ReadyQueueEntry *rq_tail; + +/** + * Array of operation queues which are to be destroyed + */ +static struct OperationQueue **expired_opqs; + +/** + * Number of expired operation queues in the above array + */ +static unsigned int n_expired_opqs; /** * The id of the task to process the ready queue */ -GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id; +struct GNUNET_SCHEDULER_Task *process_rq_task_id; -void + +/** + * Assigns the given operation a time slot from the given operation queue + * + * @param op the operation + * @param queue the operation queue + * @return the timeslot + */ +static void +assign_timeslot (struct GNUNET_TESTBED_Operation *op, + struct OperationQueue *queue) +{ + struct FeedbackCtx *fctx = queue->fctx; + struct TimeSlot *tslot; + + GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type); + tslot = fctx->alloc_head; + GNUNET_assert (NULL != tslot); + GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot); + GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot); + tslot->op = op; +} + + +/** + * Removes a queue entry of an operation from one of the operation queues' lists + * depending on the state of the operation + * + * @param op the operation whose entry has to be removed + * @param index the index of the entry in the operation's array of queue entries + */ +static void remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) { struct OperationQueue *opq; struct QueueEntry *entry; - + opq = op->queues[index]; entry = op->qentries[index]; switch (op->state) @@ -265,7 +452,7 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) case OP_STATE_READY: GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry); break; - case OP_STATE_STARTED: + case OP_STATE_ACTIVE: GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); break; case OP_STATE_INACTIVE: @@ -274,14 +461,22 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) } } -void + +/** + * Changes the state of the operation while moving its associated queue entries + * in the operation's operation queues + * + * @param op the operation whose state has to be changed + * @param state the state the operation should have. It cannot be OP_STATE_INIT + */ +static void change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) { struct QueueEntry *entry; struct OperationQueue *opq; unsigned int cnt; unsigned int s; - + GNUNET_assert (OP_STATE_INIT != state); GNUNET_assert (NULL != op->queues); GNUNET_assert (NULL != op->nres); @@ -291,11 +486,11 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) { if (OP_STATE_INIT == op->state) { - entry = GNUNET_malloc (sizeof (struct QueueEntry)); + entry = GNUNET_new (struct QueueEntry); entry->op = op; entry->nres = op->nres[cnt]; s = cnt; - GNUNET_array_append (op->qentries, s, entry); + GNUNET_array_append (op->qentries, s, entry); } else { @@ -314,7 +509,7 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) case OP_STATE_READY: GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry); break; - case OP_STATE_STARTED: + case OP_STATE_ACTIVE: GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry); break; case OP_STATE_INACTIVE: @@ -334,15 +529,15 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) */ static void rq_remove (struct GNUNET_TESTBED_Operation *op) -{ +{ GNUNET_assert (NULL != op->rq_entry); GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry); GNUNET_free (op->rq_entry); op->rq_entry = NULL; - if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) ) + if ( (NULL == rq_head) && (NULL != process_rq_task_id) ) { GNUNET_SCHEDULER_cancel (process_rq_task_id); - process_rq_task_id = GNUNET_SCHEDULER_NO_TASK; + process_rq_task_id = NULL; } } @@ -354,22 +549,30 @@ rq_remove (struct GNUNET_TESTBED_Operation *op) * the ready queue. * * @param cls NULL - * @param tc scheduler task context. Not used. */ static void -process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +process_rq_task (void *cls) { struct GNUNET_TESTBED_Operation *op; + struct OperationQueue *queue; + unsigned int cnt; - process_rq_task_id = GNUNET_SCHEDULER_NO_TASK; + process_rq_task_id = NULL; GNUNET_assert (NULL != rq_head); GNUNET_assert (NULL != (op = rq_head->op)); rq_remove (op); if (NULL != rq_head) process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); - change_state (op, OP_STATE_STARTED); + change_state (op, OP_STATE_ACTIVE); + for (cnt = 0; cnt < op->nqueues; cnt++) + { + queue = op->queues[cnt]; + if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) + assign_timeslot (op, queue); + } + op->tstart = GNUNET_TIME_absolute_get (); if (NULL != op->start) - op->start (op->cb_cls); + op->start (op->cb_cls); } @@ -384,15 +587,22 @@ rq_add (struct GNUNET_TESTBED_Operation *op) struct ReadyQueueEntry *rq_entry; GNUNET_assert (NULL == op->rq_entry); - rq_entry = GNUNET_malloc (sizeof (struct ReadyQueueEntry)); + rq_entry = GNUNET_new (struct ReadyQueueEntry); rq_entry->op = op; GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry); op->rq_entry = rq_entry; - if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id) + if (NULL == process_rq_task_id) process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); } +/** + * Checks if the given operation queue is empty or not + * + * @param opq the operation queue + * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO + * otherwise + */ static int is_queue_empty (struct OperationQueue *opq) { @@ -405,7 +615,23 @@ is_queue_empty (struct OperationQueue *opq) } -int +/** + * Checks if the given operation queue has enough resources to provide for the + * operation of the given queue entry. It also checks if any inactive + * operations are to be released in order to accommodate the needed resources + * and returns them as an array. + * + * @param opq the operation queue to check for resource accommodation + * @param entry the operation queue entry whose operation's resources are to be + * accommodated + * @param ops_ pointer to return the array of operations which are to be released + * in order to accommodate the new operation. Can be NULL + * @param n_ops_ the number of operations in ops_ + * @return GNUNET_YES if the given entry's operation can be accommodated in this + * queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will + * be set to NULL and 0 respectively. + */ +static int decide_capacity (struct OperationQueue *opq, struct QueueEntry *entry, struct GNUNET_TESTBED_Operation ***ops_, @@ -417,20 +643,33 @@ decide_capacity (struct OperationQueue *opq, unsigned int n_ops; unsigned int n_evict_entries; unsigned int need; + unsigned int max; int deficit; int rval; GNUNET_assert (NULL != (op = entry->op)); GNUNET_assert (0 < (need = entry->nres)); - GNUNET_assert (opq->active <= opq->max_active); ops = NULL; n_ops = 0; evict_entries = NULL; n_evict_entries = 0; - rval = GNUNET_OK; - if ((opq->active + need) <= opq->max_active) + rval = GNUNET_YES; + if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) + { + GNUNET_assert (NULL != opq->fctx); + GNUNET_assert (opq->max_active >= opq->overload); + max = opq->max_active - opq->overload; + } + else + max = opq->max_active; + if (opq->active > max) + { + rval = GNUNET_NO; + goto ret; + } + if ((opq->active + need) <= max) goto ret; - deficit = need - (opq->max_active - opq->active); + deficit = need - (max - opq->active); for (entry = opq->nq_head; (0 < deficit) && (NULL != entry); entry = entry->next) @@ -450,14 +689,27 @@ decide_capacity (struct OperationQueue *opq, } ret: - GNUNET_free_non_null (evict_entries); - if (NULL != ops_) *ops_ = ops; - if (NULL != n_ops_) *n_ops_ = n_ops; + GNUNET_free_non_null (evict_entries); + if (NULL != ops_) + *ops_ = ops; + else + GNUNET_free (ops); + if (NULL != n_ops_) + *n_ops_ = n_ops; return rval; } -/* FIXME: improve.. */ -void + +/** + * Merges an array of operations into another, eliminating duplicates. No + * ordering is guaranteed. + * + * @param old the array into which the merging is done. + * @param n_old the number of operations in old array + * @param new the array from which operations are to be merged + * @param n_new the number of operations in new array + */ +static void merge_ops (struct GNUNET_TESTBED_Operation ***old, unsigned int *n_old, struct GNUNET_TESTBED_Operation **new, @@ -467,12 +719,12 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old, unsigned int i; unsigned int j; unsigned int n_cur; - + GNUNET_assert (NULL != old); n_cur = *n_old; cur = *old; for (i = 0; i < n_new; i++) - { + { for (j = 0; j < *n_old; j++) { if (new[i] == cur[j]) @@ -485,7 +737,7 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old, *old = cur; *n_old = n_cur; } - + /** @@ -493,7 +745,7 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old, * * @param op the operation */ -static void +static int check_readiness (struct GNUNET_TESTBED_Operation *op) { struct GNUNET_TESTBED_Operation **evict_ops; @@ -514,12 +766,12 @@ check_readiness (struct GNUNET_TESTBED_Operation *op) &ops, &n_ops)) { GNUNET_free_non_null (evict_ops); - return; + return GNUNET_NO; } if (NULL == ops) continue; merge_ops (&evict_ops, &n_evict_ops, ops, n_ops); - GNUNET_free (ops); + GNUNET_free (ops); } if (NULL != evict_ops) { @@ -529,12 +781,13 @@ check_readiness (struct GNUNET_TESTBED_Operation *op) evict_ops = NULL; /* Evicting the operations should schedule this operation */ GNUNET_assert (OP_STATE_READY == op->state); - return; + return GNUNET_YES; } for (i = 0; i < op->nqueues; i++) op->queues[i]->active += op->nres[i]; change_state (op, OP_STATE_READY); rq_add (op); + return GNUNET_YES; } @@ -551,11 +804,211 @@ defer (struct GNUNET_TESTBED_Operation *op) GNUNET_assert (OP_STATE_READY == op->state); rq_remove (op); for (i = 0; i < op->nqueues; i++) - op->queues[i]->active--; + { + GNUNET_assert (op->queues[i]->active >= op->nres[i]); + op->queues[i]->active -= op->nres[i]; + } change_state (op, OP_STATE_WAITING); } +/** + * Cleanups the array of timeslots of an operation queue. For each time slot in + * the array, if it is allocated to an operation, it will be deallocated from + * the operation + * + * @param queue the operation queue + */ +static void +cleanup_tslots (struct OperationQueue *queue) +{ + struct FeedbackCtx *fctx = queue->fctx; + struct TimeSlot *tslot; + struct GNUNET_TESTBED_Operation *op; + unsigned int cnt; + + GNUNET_assert (NULL != fctx); + for (cnt = 0; cnt < queue->max_active; cnt++) + { + tslot = &fctx->tslots_freeptr[cnt]; + op = tslot->op; + if (NULL == op) + continue; + GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); + } + GNUNET_free_non_null (fctx->tslots_freeptr); + fctx->tslots_freeptr = NULL; + fctx->alloc_head = NULL; + fctx->alloc_tail = NULL; + fctx->tslots_filled = 0; +} + + +/** + * Cleansup the existing timing slots and sets new timing slots in the given + * queue to accommodate given number of max active operations. + * + * @param queue the queue + * @param n the number of maximum active operations. If n is greater than the + * maximum limit set while creating the queue, then the minimum of these two + * will be selected as n + */ +static void +adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n) +{ + struct FeedbackCtx *fctx = queue->fctx; + struct TimeSlot *tslot; + unsigned int cnt; + + cleanup_tslots (queue); + n = GNUNET_MIN (n ,fctx->max_active_bound); + fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot)); + fctx->nfailed = 0; + for (cnt = 0; cnt < n; cnt++) + { + tslot = &fctx->tslots_freeptr[cnt]; + tslot->queue = queue; + GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot); + } + GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n); +} + + +/** + * Adapts parallelism in an adaptive queue by using the statistical data from + * the feedback context. + * + * @param queue the queue + */ +static void +adapt_parallelism (struct OperationQueue *queue) +{ + struct GNUNET_TIME_Relative avg; + struct FeedbackCtx *fctx; + struct TimeSlot *tslot; + int sd; + unsigned int nvals; + unsigned int cnt; + unsigned int parallelism; + + avg = GNUNET_TIME_UNIT_ZERO; + nvals = 0; + fctx = queue->fctx; + for (cnt = 0; cnt < queue->max_active; cnt++) + { + tslot = &fctx->tslots_freeptr[cnt]; + avg = GNUNET_TIME_relative_add (avg, tslot->tsum); + nvals += tslot->nvals; + } + GNUNET_assert (nvals >= queue->max_active); + GNUNET_assert (fctx->nfailed <= nvals); + nvals -= fctx->nfailed; + if (0 == nvals) + { + if (1 == queue->max_active) + adaptive_queue_set_max_active (queue, 1); + else + adaptive_queue_set_max_active (queue, queue->max_active / 2); + return; + } + avg = GNUNET_TIME_relative_divide (avg, nvals); + GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); + if (GNUNET_SYSERR == + GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, + (unsigned int) avg.rel_value_us, + &sd)) + { + adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ + return; + } + + parallelism = 0; + if (-1 == sd) + parallelism = queue->max_active + 1; + if (sd <= -2) + parallelism = queue->max_active * 2; + if (1 == sd) + parallelism = queue->max_active - 1; + if (2 <= sd) + parallelism = queue->max_active / 2; + parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); + adaptive_queue_set_max_active (queue, parallelism); + +#if 0 + /* old algorithm */ + if (sd < 0) + sd = 0; + GNUNET_assert (0 <= sd); + //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); + if (0 == sd) + { + adaptive_queue_set_max_active (queue, queue->max_active * 2); + return; + } + if (1 == sd) + { + adaptive_queue_set_max_active (queue, queue->max_active + 1); + return; + } + if (1 == queue->max_active) + { + adaptive_queue_set_max_active (queue, 1); + return; + } + if (2 == sd) + { + adaptive_queue_set_max_active (queue, queue->max_active - 1); + return; + } + adaptive_queue_set_max_active (queue, queue->max_active / 2); +#endif +} + + +/** + * update tslots with the operation's completion time. Additionally, if + * updating a timeslot makes all timeslots filled in an adaptive operation + * queue, call adapt_parallelism() for that queue. + * + * @param op the operation + */ +static void +update_tslots (struct GNUNET_TESTBED_Operation *op) +{ + struct OperationQueue *queue; + struct GNUNET_TIME_Relative t; + struct TimeSlot *tslot; + struct FeedbackCtx *fctx; + unsigned int i; + + t = GNUNET_TIME_absolute_get_duration (op->tstart); + while (NULL != (tslot = op->tslots_head)) /* update time slots */ + { + queue = tslot->queue; + fctx = queue->fctx; + GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); + tslot->op = NULL; + GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, + tslot); + if (op->failed) + { + fctx->nfailed++; + for (i = 0; i < op->nqueues; i++) + if (queue == op->queues[i]) + break; + GNUNET_assert (i != op->nqueues); + op->queues[i]->overload += op->nres[i]; + } + tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t); + if (0 != tslot->nvals++) + continue; + fctx->tslots_filled++; + if (queue->max_active == fctx->tslots_filled) + adapt_parallelism (queue); + } +} + + /** * Create an 'operation' to be performed. * @@ -570,7 +1023,7 @@ GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start, { struct GNUNET_TESTBED_Operation *op; - op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation)); + op = GNUNET_new (struct GNUNET_TESTBED_Operation); op->start = start; op->state = OP_STATE_INIT; op->release = release; @@ -582,32 +1035,75 @@ GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start, /** * Create an operation queue. * + * @param type the type of operation queue * @param max_active maximum number of operations in this * queue that can be active in parallel at the same time * @return handle to the queue */ struct OperationQueue * -GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active) +GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type, + unsigned int max_active) { struct OperationQueue *queue; + struct FeedbackCtx *fctx; - queue = GNUNET_malloc (sizeof (struct OperationQueue)); - queue->max_active = max_active; + queue = GNUNET_new (struct OperationQueue); + queue->type = type; + if (OPERATION_QUEUE_TYPE_FIXED == type) + { + queue->max_active = max_active; + } + else + { + fctx = GNUNET_new (struct FeedbackCtx); + fctx->max_active_bound = max_active; + fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY); + queue->fctx = fctx; + adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); + } return queue; } /** - * Destroy an operation queue. The queue MUST be empty - * at this time. + * Cleanup the given operation queue. + * + * @param queue the operation queue to destroy + */ +static void +queue_destroy (struct OperationQueue *queue) +{ + struct FeedbackCtx *fctx; + + if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) + { + cleanup_tslots (queue); + fctx = queue->fctx; + GNUNET_TESTBED_SD_destroy_ (fctx->sd); + GNUNET_free (fctx); + } + GNUNET_free (queue); +} + + +/** + * Destroys an operation queue. If the queue is still in use by operations it + * is marked as expired and its resources are released in the destructor + * GNUNET_TESTBED_operations_fini(). * * @param queue queue to destroy */ void GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) { - GNUNET_break (GNUNET_YES == is_queue_empty (queue)); - GNUNET_free (queue); + if (GNUNET_YES != is_queue_empty (queue)) + { + GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */ + queue->expired = 1; + GNUNET_array_append (expired_opqs, n_expired_opqs, queue); + return; + } + queue_destroy (queue); } @@ -628,17 +1124,24 @@ GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue) } -void +/** + * Rechecks if any of the operations in the given operation queue's waiting list + * can be made active + * + * @param opq the operation queue + */ +static void recheck_waiting (struct OperationQueue *opq) { struct QueueEntry *entry; struct QueueEntry *entry2; entry = opq->wq_head; - while ( (NULL != entry) && (opq->active < opq->max_active) ) + while (NULL != entry) { entry2 = entry->next; - check_readiness (entry->op); + if (GNUNET_NO == check_readiness (entry->op)) + break; entry = entry2; } } @@ -659,6 +1162,7 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, struct QueueEntry *entry; queue->max_active = max_active; + queue->overload = 0; while ( (queue->active > queue->max_active) && (NULL != (entry = queue->rq_head)) ) defer (entry->op); @@ -725,7 +1229,7 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) { GNUNET_assert (NULL == op->rq_entry); change_state (op, OP_STATE_WAITING); - check_readiness (op); + (void) check_readiness (op); } @@ -741,8 +1245,22 @@ GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) void GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) { - GNUNET_assert (OP_STATE_STARTED == op->state); + struct OperationQueue **queues; + size_t ms; + unsigned int nqueues; + unsigned int i; + + GNUNET_assert (OP_STATE_ACTIVE == op->state); change_state (op, OP_STATE_INACTIVE); + nqueues = op->nqueues; + ms = sizeof (struct OperationQueue *) * nqueues; + queues = GNUNET_malloc (ms); + /* Cloning is needed as the operation be released by waiting operations and + hence its nqueues memory ptr will be freed */ + GNUNET_memcpy (queues, op->queues, ms); + for (i = 0; i < nqueues; i++) + recheck_waiting (queues[i]); + GNUNET_free (queues); } @@ -756,8 +1274,9 @@ GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) void GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) { + GNUNET_assert (OP_STATE_INACTIVE == op->state); - change_state (op, OP_STATE_STARTED); + change_state (op, OP_STATE_ACTIVE); } @@ -770,7 +1289,7 @@ GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) void GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) { - struct QueueEntry *entry; + struct QueueEntry *entry; struct OperationQueue *opq; unsigned int i; @@ -783,6 +1302,8 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) rq_remove (op); if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ GNUNET_TESTBED_operation_activate_ (op); + if (OP_STATE_ACTIVE == op->state) + update_tslots (op); GNUNET_assert (NULL != op->queues); GNUNET_assert (NULL != op->qentries); for (i = 0; i < op->nqueues; i++) @@ -791,21 +1312,21 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) remove_queue_entry (op, i); opq = op->queues[i]; switch (op->state) - { + { case OP_STATE_INIT: case OP_STATE_INACTIVE: GNUNET_assert (0); break; - case OP_STATE_WAITING: + case OP_STATE_WAITING: break; + case OP_STATE_ACTIVE: case OP_STATE_READY: - case OP_STATE_STARTED: GNUNET_assert (0 != opq->active); GNUNET_assert (opq->active >= entry->nres); opq->active -= entry->nres; recheck_waiting (opq); break; - } + } GNUNET_free (entry); } GNUNET_free_non_null (op->qentries); @@ -817,4 +1338,41 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) } +/** + * Marks an operation as failed + * + * @param op the operation to be marked as failed + */ +void +GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op) +{ + op->failed = GNUNET_YES; +} + + +/** + * Cleanup expired operation queues. While doing so, also check for any + * operations which are not completed and warn about them. + */ +void __attribute__ ((destructor)) +GNUNET_TESTBED_operations_fini () +{ + struct OperationQueue *queue; + unsigned int i; + int warn = 0; + + for (i=0; i < n_expired_opqs; i++) + { + queue = expired_opqs[i]; + if (GNUNET_NO == is_queue_empty (queue)) + warn = 1; + queue_destroy (queue); + } + GNUNET_free_non_null (expired_opqs); + n_expired_opqs = 0; + if (warn) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Be disciplined. Some operations were not marked as done.\n"); + +} /* end of testbed_api_operations.c */