/*
This file is part of GNUnet
- (C) 2008--2012 Christian Grothoff (and other contributing authors)
+ (C) 2008--2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
* @file testbed/testbed_api_operations.c
* @brief functions to manage operation queues
* @author Christian Grothoff
+ * @author Sree Harsha Totakura
*/
+
#include "platform.h"
#include "testbed_api_operations.h"
*/
struct OperationQueue
{
- /**
- * The head of the operation queue
- */
- struct QueueEntry *head;
-
- /**
- * The tail of the operation queue
- */
- struct QueueEntry *tail;
-
/**
* DLL head for the wait queue. Operations which are waiting for this
* operation queue are put here
*/
struct QueueEntry *wq_tail;
+ /**
+ * DLL head for the ready queue. Operations which are in this operation queue
+ * and are in ready state are put here
+ */
+ struct QueueEntry *rq_head;
+
+ /**
+ * DLL tail for the ready queue
+ */
+ struct QueueEntry *rq_tail;
+
+ /**
+ * DLL head for the active queue. Operations which are in this operation
+ * queue and are currently active are put here
+ */
+ struct QueueEntry *aq_head;
+
+ /**
+ * DLL tail for the active queue.
+ */
+ struct QueueEntry *aq_tail;
+
+ /**
+ * DLL head for the inactive queue. Operations which are inactive and can be
+ * evicted if the queues it holds are maxed out and another operation begins
+ * to wait on them.
+ */
+ struct QueueEntry *nq_head;
+
+ /**
+ * DLL tail for the inactive queue.
+ */
+ struct QueueEntry *nq_tail;
+
/**
* Number of operations that are currently active in this queue.
*/
OP_STATE_READY,
/**
- * The operation has started
+ * The operation has started and is active
+ */
+ OP_STATE_ACTIVE,
+
+ /**
+ * The operation is inactive. It still holds resources on the operation
+ * queues. However, this operation will be evicted when another operation
+ * requires resources from the maxed out queues this operation is holding
+ * resources from.
*/
- OP_STATE_STARTED
+ OP_STATE_INACTIVE
};
GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
+/**
+ * Removes a queue entry of an operation from one of the operation queues' lists
+ * depending on the state of the operation
+ *
+ * @param op the operation whose entry has to be removed
+ * @param index the index of the entry in the operation's array of queue entries
+ */
+static void
+remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
+{
+ struct OperationQueue *opq;
+ struct QueueEntry *entry;
+
+ opq = op->queues[index];
+ entry = op->qentries[index];
+ switch (op->state)
+ {
+ case OP_STATE_INIT:
+ GNUNET_assert (0);
+ break;
+ case OP_STATE_WAITING:
+ GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
+ break;
+ case OP_STATE_READY:
+ GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
+ break;
+ case OP_STATE_ACTIVE:
+ GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
+ break;
+ case OP_STATE_INACTIVE:
+ GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
+ break;
+ }
+}
+
+
+/**
+ * Changes the state of the operation while moving its associated queue entries
+ * in the operation's operation queues
+ *
+ * @param op the operation whose state has to be changed
+ * @param state the state the operation should have. It cannot be OP_STATE_INIT
+ */
+static void
+change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
+{
+ struct QueueEntry *entry;
+ struct OperationQueue *opq;
+ unsigned int cnt;
+ unsigned int s;
+
+ GNUNET_assert (OP_STATE_INIT != state);
+ GNUNET_assert (NULL != op->queues);
+ GNUNET_assert (NULL != op->nres);
+ GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
+ GNUNET_assert (op->state != state);
+ for (cnt = 0; cnt < op->nqueues; cnt++)
+ {
+ if (OP_STATE_INIT == op->state)
+ {
+ entry = GNUNET_malloc (sizeof (struct QueueEntry));
+ entry->op = op;
+ entry->nres = op->nres[cnt];
+ s = cnt;
+ GNUNET_array_append (op->qentries, s, entry);
+ }
+ else
+ {
+ entry = op->qentries[cnt];
+ remove_queue_entry (op, cnt);
+ }
+ opq = op->queues[cnt];
+ switch (state)
+ {
+ case OP_STATE_INIT:
+ GNUNET_assert (0);
+ break;
+ case OP_STATE_WAITING:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
+ break;
+ case OP_STATE_READY:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
+ break;
+ case OP_STATE_ACTIVE:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
+ break;
+ case OP_STATE_INACTIVE:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
+ break;
+ }
+ }
+ op->state = state;
+}
+
+
/**
* Removes an operation from the ready queue. Also stops the 'process_rq_task'
* if the given operation is the last one in the queue.
rq_remove (op);
if (NULL != rq_head)
process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
- op->state = OP_STATE_STARTED;
+ change_state (op, OP_STATE_ACTIVE);
if (NULL != op->start)
op->start (op->cb_cls);
}
}
-void
-wq_add (struct GNUNET_TESTBED_Operation *op)
+/**
+ * Checks if the given operation queue is empty or not
+ *
+ * @param opq the operation queue
+ * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
+ * otherwise
+ */
+static int
+is_queue_empty (struct OperationQueue *opq)
{
- struct QueueEntry *entry;
- struct OperationQueue *opq;
- unsigned int cnt;
+ if ( (NULL != opq->wq_head)
+ || (NULL != opq->rq_head)
+ || (NULL != opq->aq_head)
+ || (NULL != opq->nq_head) )
+ return GNUNET_NO;
+ return GNUNET_YES;
+}
- GNUNET_assert (OP_STATE_WAITING == op->state);
- GNUNET_assert (NULL == op->qentries);
- for (cnt = 0; cnt < op->nqueues;)
+
+/**
+ * Checks if the given operation queue has enough resources to provide for the
+ * operation of the given queue entry. It also checks if any inactive
+ * operations are to be released in order to accommodate the needed resources
+ * and returns them as an array.
+ *
+ * @param opq the operation queue to check for resource accommodation
+ * @param entry the operation queue entry whose operation's resources are to be
+ * accommodated
+ * @param ops_ pointer to return the array of operations which are to be released
+ * in order to accommodate the new operation. Can be NULL
+ * @param n_ops_ the number of operations in ops_
+ * @return GNUNET_YES if the given entry's operation can be accommodated in this
+ * queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
+ * be set to NULL and 0 respectively.
+ */
+static int
+decide_capacity (struct OperationQueue *opq,
+ struct QueueEntry *entry,
+ struct GNUNET_TESTBED_Operation ***ops_,
+ unsigned int *n_ops_)
+{
+ struct QueueEntry **evict_entries;
+ struct GNUNET_TESTBED_Operation **ops;
+ struct GNUNET_TESTBED_Operation *op;
+ unsigned int n_ops;
+ unsigned int n_evict_entries;
+ unsigned int need;
+ int deficit;
+ int rval;
+
+ GNUNET_assert (NULL != (op = entry->op));
+ GNUNET_assert (0 < (need = entry->nres));
+ ops = NULL;
+ n_ops = 0;
+ evict_entries = NULL;
+ n_evict_entries = 0;
+ rval = GNUNET_YES;
+ if (opq->active > opq->max_active)
{
- opq = op->queues[cnt];
- entry = GNUNET_malloc (sizeof (struct QueueEntry));
- entry->op = op;
- entry->nres = op->nres[cnt];
- GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
- GNUNET_array_append (op->qentries, cnt, entry); /* increments cnt */
+ rval = GNUNET_NO;
+ goto ret;
}
+ if ((opq->active + need) <= opq->max_active)
+ goto ret;
+ deficit = need - (opq->max_active - opq->active);
+ for (entry = opq->nq_head;
+ (0 < deficit) && (NULL != entry);
+ entry = entry->next)
+ {
+ GNUNET_array_append (evict_entries, n_evict_entries, entry);
+ deficit -= entry->nres;
+ }
+ if (0 < deficit)
+ {
+ rval = GNUNET_NO;
+ goto ret;
+ }
+ for (n_ops = 0; n_ops < n_evict_entries;)
+ {
+ op = evict_entries[n_ops]->op;
+ GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
+ }
+
+ ret:
+ GNUNET_free_non_null (evict_entries);
+ if (NULL != ops_)
+ *ops_ = ops;
+ else
+ GNUNET_free (ops);
+ if (NULL != n_ops_)
+ *n_ops_ = n_ops;
+ return rval;
}
-void
-wq_remove (struct GNUNET_TESTBED_Operation *op)
+/**
+ * Merges an array of operations into another, eliminating duplicates. No
+ * ordering is guaranteed.
+ *
+ * @param old the array into which the merging is done.
+ * @param n_old the number of operations in old array
+ * @param new the array from which operations are to be merged
+ * @param n_new the number of operations in new array
+ */
+static void
+merge_ops (struct GNUNET_TESTBED_Operation ***old,
+ unsigned int *n_old,
+ struct GNUNET_TESTBED_Operation **new,
+ unsigned int n_new)
{
- struct QueueEntry *entry;
- struct OperationQueue *opq;
- unsigned int cnt;
-
- GNUNET_assert (OP_STATE_WAITING == op->state);
- GNUNET_assert (NULL != op->qentries);
- for (cnt = 0; cnt < op->nqueues; cnt ++)
- {
- opq = op->queues[cnt];
- entry = op->qentries[cnt];
- GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
- GNUNET_free (entry);
+ struct GNUNET_TESTBED_Operation **cur;
+ unsigned int i;
+ unsigned int j;
+ unsigned int n_cur;
+
+ GNUNET_assert (NULL != old);
+ n_cur = *n_old;
+ cur = *old;
+ for (i = 0; i < n_new; i++)
+ {
+ for (j = 0; j < *n_old; j++)
+ {
+ if (new[i] == cur[j])
+ break;
+ }
+ if (j < *n_old)
+ continue;
+ GNUNET_array_append (cur, n_cur, new[j]);
}
- GNUNET_free (op->qentries);
- op->qentries = NULL;
+ *old = cur;
+ *n_old = n_cur;
}
+
/**
*
* @param op the operation
*/
-static void
+static int
check_readiness (struct GNUNET_TESTBED_Operation *op)
{
+ struct GNUNET_TESTBED_Operation **evict_ops;
+ struct GNUNET_TESTBED_Operation **ops;
+ unsigned int n_ops;
+ unsigned int n_evict_ops;
unsigned int i;
GNUNET_assert (NULL == op->rq_entry);
GNUNET_assert (OP_STATE_WAITING == op->state);
+ evict_ops = NULL;
+ n_evict_ops = 0;
for (i = 0; i < op->nqueues; i++)
{
- GNUNET_assert (0 < op->nres[i]);
- if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active)
- return;
+ ops = NULL;
+ n_ops = 0;
+ if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
+ &ops, &n_ops))
+ {
+ GNUNET_free_non_null (evict_ops);
+ return GNUNET_NO;
+ }
+ if (NULL == ops)
+ continue;
+ merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
+ GNUNET_free (ops);
+ }
+ if (NULL != evict_ops)
+ {
+ for (i = 0; i < n_evict_ops; i++)
+ GNUNET_TESTBED_operation_release_ (evict_ops[i]);
+ GNUNET_free (evict_ops);
+ evict_ops = NULL;
+ /* Evicting the operations should schedule this operation */
+ GNUNET_assert (OP_STATE_READY == op->state);
+ return GNUNET_YES;
}
- wq_remove (op);
for (i = 0; i < op->nqueues; i++)
op->queues[i]->active += op->nres[i];
- op->state = OP_STATE_READY;
+ change_state (op, OP_STATE_READY);
rq_add (op);
+ return GNUNET_YES;
}
rq_remove (op);
for (i = 0; i < op->nqueues; i++)
op->queues[i]->active--;
- op->state = OP_STATE_WAITING;
- wq_add (op);
+ change_state (op, OP_STATE_WAITING);
}
void
GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
{
- GNUNET_break (NULL == queue->head);
- GNUNET_break (NULL == queue->tail);
+ GNUNET_break (GNUNET_YES == is_queue_empty (queue));
GNUNET_free (queue);
}
int
GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
{
- if (NULL != queue->head)
+ if (GNUNET_NO == is_queue_empty (queue))
return GNUNET_NO;
GNUNET_TESTBED_operation_queue_destroy_ (queue);
return GNUNET_YES;
}
+/**
+ * Rechecks if any of the operations in the given operation queue's waiting list
+ * can be made active
+ *
+ * @param opq the operation queue
+ */
+static void
+recheck_waiting (struct OperationQueue *opq)
+{
+ struct QueueEntry *entry;
+ struct QueueEntry *entry2;
+
+ entry = opq->wq_head;
+ while (NULL != entry)
+ {
+ entry2 = entry->next;
+ if (GNUNET_NO == check_readiness (entry->op))
+ break;
+ entry = entry2;
+ }
+}
+
+
/**
* Function to reset the maximum number of operations in the given queue. If
* max_active is lesser than the number of currently active operations, the
struct QueueEntry *entry;
queue->max_active = max_active;
- entry = queue->head;
- while ((queue->active > queue->max_active) && (NULL != entry))
- {
- if (entry->op->state == OP_STATE_READY)
- defer (entry->op);
- entry = entry->next;
- }
-
- entry = queue->head;
- while ((NULL != entry) && (queue->active < queue->max_active))
- {
- if (OP_STATE_WAITING == entry->op->state)
- check_readiness (entry->op);
- entry = entry->next;
- }
+ while ( (queue->active > queue->max_active)
+ && (NULL != (entry = queue->rq_head)) )
+ defer (entry->op);
+ recheck_waiting (queue);
}
struct GNUNET_TESTBED_Operation *op,
unsigned int nres)
{
- struct QueueEntry *entry;
unsigned int qsize;
GNUNET_assert (0 < nres);
- entry = GNUNET_malloc (sizeof (struct QueueEntry));
- entry->op = op;
- entry->nres = nres;
- GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry);
qsize = op->nqueues;
GNUNET_array_append (op->queues, op->nqueues, queue);
GNUNET_array_append (op->nres, qsize, nres);
GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
{
GNUNET_assert (NULL == op->rq_entry);
- op->state = OP_STATE_WAITING;
- wq_add (op);
- check_readiness (op);
+ change_state (op, OP_STATE_WAITING);
+ (void) check_readiness (op);
}
/**
- * Remove an operation from a queue. This can be because the
- * oeration was active and has completed (and the resources have
- * been released), or because the operation was cancelled and
- * thus scheduling the operation is no longer required.
+ * Marks an active operation as inactive - the operation will be kept in a
+ * ready-to-be-released state and continues to hold resources until another
+ * operation contents for them.
*
- * @param queue queue to add the operation to
- * @param op operation to add to the queue
+ * @param op the operation to be marked as inactive. The operation start
+ * callback should have been called before for this operation to mark
+ * it as inactive.
*/
void
-GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
- struct GNUNET_TESTBED_Operation
- *op)
+GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
{
- struct QueueEntry *entry;
+ struct OperationQueue **queues;
+ size_t ms;
+ unsigned int nqueues;
+ unsigned int i;
- for (entry = queue->head; NULL != entry; entry = entry->next)
- if (entry->op == op)
- break;
- GNUNET_assert (NULL != entry);
- GNUNET_assert (0 < entry->nres);
- switch (op->state)
- {
- case OP_STATE_INIT:
- case OP_STATE_WAITING:
- break;
- case OP_STATE_READY:
- case OP_STATE_STARTED:
- GNUNET_assert (0 != queue->active);
- GNUNET_assert (queue->active >= entry->nres);
- queue->active -= entry->nres;
- break;
- }
- GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry);
- GNUNET_free (entry);
- entry = queue->wq_head;
- if (NULL == entry)
- return;
- check_readiness (entry->op);
+ GNUNET_assert (OP_STATE_ACTIVE == op->state);
+ change_state (op, OP_STATE_INACTIVE);
+ nqueues = op->nqueues;
+ ms = sizeof (struct OperationQueue *) * nqueues;
+ queues = GNUNET_malloc (ms);
+ /* Cloning is needed as the operation be released by waiting operations and
+ hence its nqueues memory ptr will be freed */
+ GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
+ for (i = 0; i < nqueues; i++)
+ recheck_waiting (queues[i]);
+ GNUNET_free (queues);
+}
+
+
+/**
+ * Marks and inactive operation as active. This fuction should be called to
+ * ensure that the oprelease callback will not be called until it is either
+ * marked as inactive or released.
+ *
+ * @param op the operation to be marked as active
+ */
+void
+GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
+{
+
+ GNUNET_assert (OP_STATE_INACTIVE == op->state);
+ change_state (op, OP_STATE_ACTIVE);
}
void
GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
{
+ struct QueueEntry *entry;
+ struct OperationQueue *opq;
unsigned int i;
- switch (op->state)
+ if (OP_STATE_INIT == op->state)
{
- case OP_STATE_READY:
- rq_remove (op);
- break;
- case OP_STATE_WAITING:
- wq_remove (op);
- break;
- case OP_STATE_STARTED:
- case OP_STATE_INIT:
- break;
+ GNUNET_free (op);
+ return;
}
+ if (OP_STATE_READY == op->state)
+ rq_remove (op);
+ if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
+ GNUNET_TESTBED_operation_activate_ (op);
+ GNUNET_assert (NULL != op->queues);
+ GNUNET_assert (NULL != op->qentries);
for (i = 0; i < op->nqueues; i++)
- GNUNET_TESTBED_operation_queue_remove_ (op->queues[i], op);
+ {
+ entry = op->qentries[i];
+ remove_queue_entry (op, i);
+ opq = op->queues[i];
+ switch (op->state)
+ {
+ case OP_STATE_INIT:
+ case OP_STATE_INACTIVE:
+ GNUNET_assert (0);
+ break;
+ case OP_STATE_WAITING:
+ break;
+ case OP_STATE_READY:
+ case OP_STATE_ACTIVE:
+ GNUNET_assert (0 != opq->active);
+ GNUNET_assert (opq->active >= entry->nres);
+ opq->active -= entry->nres;
+ recheck_waiting (opq);
+ break;
+ }
+ GNUNET_free (entry);
+ }
+ GNUNET_free_non_null (op->qentries);
GNUNET_free (op->queues);
GNUNET_free (op->nres);
if (NULL != op->release)