X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftestbed%2Ftestbed_api_operations.c;h=47d135ffd47d09532d5c44a61bd788c6a88b5156;hb=a900b29ddaa9ea46c731b054b5e3ef3e725b95a8;hp=5a62cac4e742b2b91172dea4d04a583943293c2e;hpb=8c29ba8ee17d8c8d98548b8b64cd6a6605b7b295;p=oweals%2Fgnunet.git diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index 5a62cac4e..47d135ffd 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2008--2012 Christian Grothoff (and other contributing authors) + (C) 2008--2013 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -22,7 +22,9 @@ * @file testbed/testbed_api_operations.c * @brief functions to manage operation queues * @author Christian Grothoff + * @author Sree Harsha Totakura */ + #include "platform.h" #include "testbed_api_operations.h" @@ -60,16 +62,6 @@ struct QueueEntry */ struct OperationQueue { - /** - * The head of the operation queue - */ - struct QueueEntry *head; - - /** - * The tail of the operation queue - */ - struct QueueEntry *tail; - /** * DLL head for the wait queue. Operations which are waiting for this * operation queue are put here @@ -81,6 +73,40 @@ struct OperationQueue */ struct QueueEntry *wq_tail; + /** + * DLL head for the ready queue. Operations which are in this operation queue + * and are in ready state are put here + */ + struct QueueEntry *rq_head; + + /** + * DLL tail for the ready queue + */ + struct QueueEntry *rq_tail; + + /** + * DLL head for the active queue. Operations which are in this operation + * queue and are currently active are put here + */ + struct QueueEntry *aq_head; + + /** + * DLL tail for the active queue. + */ + struct QueueEntry *aq_tail; + + /** + * DLL head for the inactive queue. Operations which are inactive and can be + * evicted if the queues it holds are maxed out and another operation begins + * to wait on them. + */ + struct QueueEntry *nq_head; + + /** + * DLL tail for the inactive queue. + */ + struct QueueEntry *nq_tail; + /** * Number of operations that are currently active in this queue. */ @@ -115,9 +141,17 @@ enum OperationState OP_STATE_READY, /** - * The operation has started + * The operation has started and is active + */ + OP_STATE_ACTIVE, + + /** + * The operation is inactive. It still holds resources on the operation + * queues. However, this operation will be evicted when another operation + * requires resources from the maxed out queues this operation is holding + * resources from. */ - OP_STATE_STARTED + OP_STATE_INACTIVE }; @@ -215,6 +249,101 @@ struct ReadyQueueEntry *rq_tail; GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id; +/** + * Removes a queue entry of an operation from one of the operation queues' lists + * depending on the state of the operation + * + * @param op the operation whose entry has to be removed + * @param index the index of the entry in the operation's array of queue entries + */ +static void +remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) +{ + struct OperationQueue *opq; + struct QueueEntry *entry; + + opq = op->queues[index]; + entry = op->qentries[index]; + switch (op->state) + { + case OP_STATE_INIT: + GNUNET_assert (0); + break; + case OP_STATE_WAITING: + GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); + break; + case OP_STATE_READY: + GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry); + break; + case OP_STATE_ACTIVE: + GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); + break; + case OP_STATE_INACTIVE: + GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry); + break; + } +} + + +/** + * Changes the state of the operation while moving its associated queue entries + * in the operation's operation queues + * + * @param op the operation whose state has to be changed + * @param state the state the operation should have. It cannot be OP_STATE_INIT + */ +static void +change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) +{ + struct QueueEntry *entry; + struct OperationQueue *opq; + unsigned int cnt; + unsigned int s; + + GNUNET_assert (OP_STATE_INIT != state); + GNUNET_assert (NULL != op->queues); + GNUNET_assert (NULL != op->nres); + GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries)); + GNUNET_assert (op->state != state); + for (cnt = 0; cnt < op->nqueues; cnt++) + { + if (OP_STATE_INIT == op->state) + { + entry = GNUNET_malloc (sizeof (struct QueueEntry)); + entry->op = op; + entry->nres = op->nres[cnt]; + s = cnt; + GNUNET_array_append (op->qentries, s, entry); + } + else + { + entry = op->qentries[cnt]; + remove_queue_entry (op, cnt); + } + opq = op->queues[cnt]; + switch (state) + { + case OP_STATE_INIT: + GNUNET_assert (0); + break; + case OP_STATE_WAITING: + GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry); + break; + case OP_STATE_READY: + GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry); + break; + case OP_STATE_ACTIVE: + GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry); + break; + case OP_STATE_INACTIVE: + GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry); + break; + } + } + op->state = state; +} + + /** * Removes an operation from the ready queue. Also stops the 'process_rq_task' * if the given operation is the last one in the queue. @@ -256,7 +385,7 @@ process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) rq_remove (op); if (NULL != rq_head) process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); - op->state = OP_STATE_STARTED; + change_state (op, OP_STATE_ACTIVE); if (NULL != op->start) op->start (op->cb_cls); } @@ -282,46 +411,136 @@ rq_add (struct GNUNET_TESTBED_Operation *op) } -void -wq_add (struct GNUNET_TESTBED_Operation *op) +/** + * Checks if the given operation queue is empty or not + * + * @param opq the operation queue + * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO + * otherwise + */ +static int +is_queue_empty (struct OperationQueue *opq) { - struct QueueEntry *entry; - struct OperationQueue *opq; - unsigned int cnt; + if ( (NULL != opq->wq_head) + || (NULL != opq->rq_head) + || (NULL != opq->aq_head) + || (NULL != opq->nq_head) ) + return GNUNET_NO; + return GNUNET_YES; +} - GNUNET_assert (OP_STATE_WAITING == op->state); - GNUNET_assert (NULL == op->qentries); - for (cnt = 0; cnt < op->nqueues;) + +/** + * Checks if the given operation queue has enough resources to provide for the + * operation of the given queue entry. It also checks if any inactive + * operations are to be released in order to accommodate the needed resources + * and returns them as an array. + * + * @param opq the operation queue to check for resource accommodation + * @param entry the operation queue entry whose operation's resources are to be + * accommodated + * @param ops_ pointer to return the array of operations which are to be released + * in order to accommodate the new operation. Can be NULL + * @param n_ops_ the number of operations in ops_ + * @return GNUNET_YES if the given entry's operation can be accommodated in this + * queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will + * be set to NULL and 0 respectively. + */ +static int +decide_capacity (struct OperationQueue *opq, + struct QueueEntry *entry, + struct GNUNET_TESTBED_Operation ***ops_, + unsigned int *n_ops_) +{ + struct QueueEntry **evict_entries; + struct GNUNET_TESTBED_Operation **ops; + struct GNUNET_TESTBED_Operation *op; + unsigned int n_ops; + unsigned int n_evict_entries; + unsigned int need; + int deficit; + int rval; + + GNUNET_assert (NULL != (op = entry->op)); + GNUNET_assert (0 < (need = entry->nres)); + ops = NULL; + n_ops = 0; + evict_entries = NULL; + n_evict_entries = 0; + rval = GNUNET_YES; + if (opq->active > opq->max_active) { - opq = op->queues[cnt]; - entry = GNUNET_malloc (sizeof (struct QueueEntry)); - entry->op = op; - entry->nres = op->nres[cnt]; - GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry); - GNUNET_array_append (op->qentries, cnt, entry); /* increments cnt */ + need += opq->active - opq->max_active; + rval = GNUNET_NO; + goto ret; } + if ((opq->active + need) <= opq->max_active) + goto ret; + deficit = need - (opq->max_active - opq->active); + for (entry = opq->nq_head; + (0 < deficit) && (NULL != entry); + entry = entry->next) + { + GNUNET_array_append (evict_entries, n_evict_entries, entry); + deficit -= entry->nres; + } + if (0 < deficit) + { + rval = GNUNET_NO; + goto ret; + } + for (n_ops = 0; n_ops < n_evict_entries;) + { + op = evict_entries[n_ops]->op; + GNUNET_array_append (ops, n_ops, op); /* increments n-ops */ + } + + ret: + GNUNET_free_non_null (evict_entries); + if (NULL != ops_) *ops_ = ops; + if (NULL != n_ops_) *n_ops_ = n_ops; + return rval; } -void -wq_remove (struct GNUNET_TESTBED_Operation *op) +/** + * Merges an array of operations into another, eliminating duplicates. No + * ordering is guaranteed. + * + * @param old the array into which the merging is done. + * @param n_old the number of operations in old array + * @param new the array from which operations are to be merged + * @param n_new the number of operations in new array + */ +static void +merge_ops (struct GNUNET_TESTBED_Operation ***old, + unsigned int *n_old, + struct GNUNET_TESTBED_Operation **new, + unsigned int n_new) { - struct QueueEntry *entry; - struct OperationQueue *opq; - unsigned int cnt; - - GNUNET_assert (OP_STATE_WAITING == op->state); - GNUNET_assert (NULL != op->qentries); - for (cnt = 0; cnt < op->nqueues; cnt ++) - { - opq = op->queues[cnt]; - entry = op->qentries[cnt]; - GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); - GNUNET_free (entry); + struct GNUNET_TESTBED_Operation **cur; + unsigned int i; + unsigned int j; + unsigned int n_cur; + + GNUNET_assert (NULL != old); + n_cur = *n_old; + cur = *old; + for (i = 0; i < n_new; i++) + { + for (j = 0; j < *n_old; j++) + { + if (new[i] == cur[j]) + break; + } + if (j < *n_old) + continue; + GNUNET_array_append (cur, n_cur, new[j]); } - GNUNET_free (op->qentries); - op->qentries = NULL; + *old = cur; + *n_old = n_cur; } + /** @@ -329,24 +548,49 @@ wq_remove (struct GNUNET_TESTBED_Operation *op) * * @param op the operation */ -static void +static int check_readiness (struct GNUNET_TESTBED_Operation *op) { + struct GNUNET_TESTBED_Operation **evict_ops; + struct GNUNET_TESTBED_Operation **ops; + unsigned int n_ops; + unsigned int n_evict_ops; unsigned int i; GNUNET_assert (NULL == op->rq_entry); GNUNET_assert (OP_STATE_WAITING == op->state); + evict_ops = NULL; + n_evict_ops = 0; for (i = 0; i < op->nqueues; i++) { - GNUNET_assert (0 < op->nres[i]); - if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active) - return; + ops = NULL; + n_ops = 0; + if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i], + &ops, &n_ops)) + { + GNUNET_free_non_null (evict_ops); + return GNUNET_NO; + } + if (NULL == ops) + continue; + merge_ops (&evict_ops, &n_evict_ops, ops, n_ops); + GNUNET_free (ops); + } + if (NULL != evict_ops) + { + for (i = 0; i < n_evict_ops; i++) + GNUNET_TESTBED_operation_release_ (evict_ops[i]); + GNUNET_free (evict_ops); + evict_ops = NULL; + /* Evicting the operations should schedule this operation */ + GNUNET_assert (OP_STATE_READY == op->state); + return GNUNET_YES; } - wq_remove (op); for (i = 0; i < op->nqueues; i++) op->queues[i]->active += op->nres[i]; - op->state = OP_STATE_READY; + change_state (op, OP_STATE_READY); rq_add (op); + return GNUNET_YES; } @@ -364,8 +608,7 @@ defer (struct GNUNET_TESTBED_Operation *op) rq_remove (op); for (i = 0; i < op->nqueues; i++) op->queues[i]->active--; - op->state = OP_STATE_WAITING; - wq_add (op); + change_state (op, OP_STATE_WAITING); } @@ -419,8 +662,7 @@ GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active) void GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) { - GNUNET_break (NULL == queue->head); - GNUNET_break (NULL == queue->tail); + GNUNET_break (GNUNET_YES == is_queue_empty (queue)); GNUNET_free (queue); } @@ -435,13 +677,36 @@ GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) int GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue) { - if (NULL != queue->head) + if (GNUNET_NO == is_queue_empty (queue)) return GNUNET_NO; GNUNET_TESTBED_operation_queue_destroy_ (queue); return GNUNET_YES; } +/** + * Rechecks if any of the operations in the given operation queue's waiting list + * can be made active + * + * @param opq the operation queue + */ +static void +recheck_waiting (struct OperationQueue *opq) +{ + struct QueueEntry *entry; + struct QueueEntry *entry2; + + entry = opq->wq_head; + while (NULL != entry) + { + entry2 = entry->next; + if (GNUNET_NO == check_readiness (entry->op)) + break; + entry = entry2; + } +} + + /** * Function to reset the maximum number of operations in the given queue. If * max_active is lesser than the number of currently active operations, the @@ -457,21 +722,10 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, struct QueueEntry *entry; queue->max_active = max_active; - entry = queue->head; - while ((queue->active > queue->max_active) && (NULL != entry)) - { - if (entry->op->state == OP_STATE_READY) - defer (entry->op); - entry = entry->next; - } - - entry = queue->head; - while ((NULL != entry) && (queue->active < queue->max_active)) - { - if (OP_STATE_WAITING == entry->op->state) - check_readiness (entry->op); - entry = entry->next; - } + while ( (queue->active > queue->max_active) + && (NULL != (entry = queue->rq_head)) ) + defer (entry->op); + recheck_waiting (queue); } @@ -491,14 +745,9 @@ GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue, struct GNUNET_TESTBED_Operation *op, unsigned int nres) { - struct QueueEntry *entry; unsigned int qsize; GNUNET_assert (0 < nres); - entry = GNUNET_malloc (sizeof (struct QueueEntry)); - entry->op = op; - entry->nres = nres; - GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry); qsize = op->nqueues; GNUNET_array_append (op->queues, op->nqueues, queue); GNUNET_array_append (op->nres, qsize, nres); @@ -538,51 +787,55 @@ void GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op) { GNUNET_assert (NULL == op->rq_entry); - op->state = OP_STATE_WAITING; - wq_add (op); - check_readiness (op); + change_state (op, OP_STATE_WAITING); + (void) check_readiness (op); } /** - * Remove an operation from a queue. This can be because the - * oeration was active and has completed (and the resources have - * been released), or because the operation was cancelled and - * thus scheduling the operation is no longer required. + * Marks an active operation as inactive - the operation will be kept in a + * ready-to-be-released state and continues to hold resources until another + * operation contents for them. * - * @param queue queue to add the operation to - * @param op operation to add to the queue + * @param op the operation to be marked as inactive. The operation start + * callback should have been called before for this operation to mark + * it as inactive. */ void -GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, - struct GNUNET_TESTBED_Operation - *op) +GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) { - struct QueueEntry *entry; + struct OperationQueue **queues; + size_t ms; + unsigned int nqueues; + unsigned int i; - for (entry = queue->head; NULL != entry; entry = entry->next) - if (entry->op == op) - break; - GNUNET_assert (NULL != entry); - GNUNET_assert (0 < entry->nres); - switch (op->state) - { - case OP_STATE_INIT: - case OP_STATE_WAITING: - break; - case OP_STATE_READY: - case OP_STATE_STARTED: - GNUNET_assert (0 != queue->active); - GNUNET_assert (queue->active >= entry->nres); - queue->active -= entry->nres; - break; - } - GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry); - GNUNET_free (entry); - entry = queue->wq_head; - if (NULL == entry) - return; - check_readiness (entry->op); + GNUNET_assert (OP_STATE_ACTIVE == op->state); + change_state (op, OP_STATE_INACTIVE); + nqueues = op->nqueues; + ms = sizeof (struct OperationQueue *) * nqueues; + queues = GNUNET_malloc (ms); + /* Cloning is needed as the operation be released by waiting operations and + hence its nqueues memory ptr will be freed */ + GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms))); + for (i = 0; i < nqueues; i++) + recheck_waiting (queues[i]); + GNUNET_free (queues); +} + + +/** + * Marks and inactive operation as active. This fuction should be called to + * ensure that the oprelease callback will not be called until it is either + * marked as inactive or released. + * + * @param op the operation to be marked as active + */ +void +GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) +{ + + GNUNET_assert (OP_STATE_INACTIVE == op->state); + change_state (op, OP_STATE_ACTIVE); } @@ -595,22 +848,45 @@ GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue, void GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) { + struct QueueEntry *entry; + struct OperationQueue *opq; unsigned int i; - switch (op->state) + if (OP_STATE_INIT == op->state) { - case OP_STATE_READY: - rq_remove (op); - break; - case OP_STATE_WAITING: - wq_remove (op); - break; - case OP_STATE_STARTED: - case OP_STATE_INIT: - break; + GNUNET_free (op); + return; } + if (OP_STATE_READY == op->state) + rq_remove (op); + if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */ + GNUNET_TESTBED_operation_activate_ (op); + GNUNET_assert (NULL != op->queues); + GNUNET_assert (NULL != op->qentries); for (i = 0; i < op->nqueues; i++) - GNUNET_TESTBED_operation_queue_remove_ (op->queues[i], op); + { + entry = op->qentries[i]; + remove_queue_entry (op, i); + opq = op->queues[i]; + switch (op->state) + { + case OP_STATE_INIT: + case OP_STATE_INACTIVE: + GNUNET_assert (0); + break; + case OP_STATE_WAITING: + break; + case OP_STATE_READY: + case OP_STATE_ACTIVE: + GNUNET_assert (0 != opq->active); + GNUNET_assert (opq->active >= entry->nres); + opq->active -= entry->nres; + recheck_waiting (opq); + break; + } + GNUNET_free (entry); + } + GNUNET_free_non_null (op->qentries); GNUNET_free (op->queues); GNUNET_free (op->nres); if (NULL != op->release)