/*
This file is part of GNUnet
- (C) 2008--2012 Christian Grothoff (and other contributing authors)
+ (C) 2008--2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
* @file testbed/testbed_api_operations.c
* @brief functions to manage operation queues
* @author Christian Grothoff
+ * @author Sree Harsha Totakura
*/
+
#include "platform.h"
#include "testbed_api_operations.h"
OP_STATE_READY,
/**
- * The operation has started
+ * The operation has started and is active
*/
- OP_STATE_STARTED,
+ OP_STATE_ACTIVE,
/**
* The operation is inactive. It still holds resources on the operation
*/
GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
-void
+
+/**
+ * Removes a queue entry of an operation from one of the operation queues' lists
+ * depending on the state of the operation
+ *
+ * @param op the operation whose entry has to be removed
+ * @param index the index of the entry in the operation's array of queue entries
+ */
+static void
remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
{
struct OperationQueue *opq;
case OP_STATE_READY:
GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
break;
- case OP_STATE_STARTED:
+ case OP_STATE_ACTIVE:
GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
break;
case OP_STATE_INACTIVE:
}
}
-void
+
+/**
+ * Changes the state of the operation while moving its associated queue entries
+ * in the operation's operation queues
+ *
+ * @param op the operation whose state has to be changed
+ * @param state the state the operation should have. It cannot be OP_STATE_INIT
+ */
+static void
change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
{
struct QueueEntry *entry;
case OP_STATE_READY:
GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
break;
- case OP_STATE_STARTED:
+ case OP_STATE_ACTIVE:
GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
break;
case OP_STATE_INACTIVE:
rq_remove (op);
if (NULL != rq_head)
process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
- change_state (op, OP_STATE_STARTED);
+ change_state (op, OP_STATE_ACTIVE);
if (NULL != op->start)
op->start (op->cb_cls);
}
}
+/**
+ * Checks if the given operation queue is empty or not
+ *
+ * @param opq the operation queue
+ * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
+ * otherwise
+ */
static int
is_queue_empty (struct OperationQueue *opq)
{
}
-int
+/**
+ * Checks if the given operation queue has enough resources to provide for the
+ * operation of the given queue entry. It also checks if any inactive
+ * operations are to be released in order to accommodate the needed resources
+ * and returns them as an array.
+ *
+ * @param opq the operation queue to check for resource accommodation
+ * @param entry the operation queue entry whose operation's resources are to be
+ * accommodated
+ * @param ops_ pointer to return the array of operations which are to be released
+ * in order to accommodate the new operation. Can be NULL
+ * @param n_ops_ the number of operations in ops_
+ * @return GNUNET_YES if the given entry's operation can be accommodated in this
+ * queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
+ * be set to NULL and 0 respectively.
+ */
+static int
decide_capacity (struct OperationQueue *opq,
struct QueueEntry *entry,
struct GNUNET_TESTBED_Operation ***ops_,
GNUNET_assert (NULL != (op = entry->op));
GNUNET_assert (0 < (need = entry->nres));
- GNUNET_assert (opq->active <= opq->max_active);
ops = NULL;
n_ops = 0;
evict_entries = NULL;
n_evict_entries = 0;
- rval = GNUNET_OK;
+ rval = GNUNET_YES;
+ if (opq->active > opq->max_active)
+ {
+ rval = GNUNET_NO;
+ goto ret;
+ }
if ((opq->active + need) <= opq->max_active)
goto ret;
deficit = need - (opq->max_active - opq->active);
ret:
GNUNET_free_non_null (evict_entries);
- if (NULL != ops_) *ops_ = ops;
- if (NULL != n_ops_) *n_ops_ = n_ops;
+ if (NULL != ops_)
+ *ops_ = ops;
+ else
+ GNUNET_free (ops);
+ if (NULL != n_ops_)
+ *n_ops_ = n_ops;
return rval;
}
-/* FIXME: improve.. */
-void
+
+/**
+ * Merges an array of operations into another, eliminating duplicates. No
+ * ordering is guaranteed.
+ *
+ * @param old the array into which the merging is done.
+ * @param n_old the number of operations in old array
+ * @param new the array from which operations are to be merged
+ * @param n_new the number of operations in new array
+ */
+static void
merge_ops (struct GNUNET_TESTBED_Operation ***old,
unsigned int *n_old,
struct GNUNET_TESTBED_Operation **new,
*
* @param op the operation
*/
-static void
+static int
check_readiness (struct GNUNET_TESTBED_Operation *op)
{
struct GNUNET_TESTBED_Operation **evict_ops;
&ops, &n_ops))
{
GNUNET_free_non_null (evict_ops);
- return;
+ return GNUNET_NO;
}
if (NULL == ops)
continue;
evict_ops = NULL;
/* Evicting the operations should schedule this operation */
GNUNET_assert (OP_STATE_READY == op->state);
- return;
+ return GNUNET_YES;
}
for (i = 0; i < op->nqueues; i++)
op->queues[i]->active += op->nres[i];
change_state (op, OP_STATE_READY);
rq_add (op);
+ return GNUNET_YES;
}
}
-void
+/**
+ * Rechecks if any of the operations in the given operation queue's waiting list
+ * can be made active
+ *
+ * @param opq the operation queue
+ */
+static void
recheck_waiting (struct OperationQueue *opq)
{
struct QueueEntry *entry;
struct QueueEntry *entry2;
entry = opq->wq_head;
- while ( (NULL != entry) && (opq->active < opq->max_active) )
+ while (NULL != entry)
{
entry2 = entry->next;
- check_readiness (entry->op);
+ if (GNUNET_NO == check_readiness (entry->op))
+ break;
entry = entry2;
}
}
{
GNUNET_assert (NULL == op->rq_entry);
change_state (op, OP_STATE_WAITING);
- check_readiness (op);
+ (void) check_readiness (op);
}
void
GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
{
- GNUNET_assert (OP_STATE_STARTED == op->state);
+ struct OperationQueue **queues;
+ size_t ms;
+ unsigned int nqueues;
+ unsigned int i;
+
+ GNUNET_assert (OP_STATE_ACTIVE == op->state);
change_state (op, OP_STATE_INACTIVE);
+ nqueues = op->nqueues;
+ ms = sizeof (struct OperationQueue *) * nqueues;
+ queues = GNUNET_malloc (ms);
+ /* Cloning is needed as the operation be released by waiting operations and
+ hence its nqueues memory ptr will be freed */
+ GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
+ for (i = 0; i < nqueues; i++)
+ recheck_waiting (queues[i]);
+ GNUNET_free (queues);
}
void
GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
{
+
GNUNET_assert (OP_STATE_INACTIVE == op->state);
- change_state (op, OP_STATE_STARTED);
+ change_state (op, OP_STATE_ACTIVE);
}
case OP_STATE_WAITING:
break;
case OP_STATE_READY:
- case OP_STATE_STARTED:
+ case OP_STATE_ACTIVE:
GNUNET_assert (0 != opq->active);
GNUNET_assert (opq->active >= entry->nres);
opq->active -= entry->nres;