* The operation this entry holds
*/
struct GNUNET_TESTBED_Operation *op;
+
+ /**
+ * How many units of resources does the operation need
+ */
+ unsigned int nres;
};
struct QueueEntry *tail;
/**
- * Number of operations that can be concurrently
- * active in this queue.
+ * Number of operations that are currently active in this queue.
+ */
+ unsigned int active;
+
+ /**
+ * Max number of operations which can be active at any time in this queue
*/
- unsigned int active;
+ unsigned int max_active;
+
};
* Operation state
*/
enum OperationState
- {
- /**
- * The operation is currently waiting for resources
- */
- OP_STATE_WAITING,
+{
+ /**
+ * The operation is just created and is in initial state
+ */
+ OP_STATE_INIT,
- /**
- * The operation has started
- */
- OP_STATE_STARTED,
- };
+ /**
+ * The operation is currently waiting for resources
+ */
+ OP_STATE_WAITING,
+
+ /**
+ * The operation is ready to be started
+ */
+ OP_STATE_READY,
+
+ /**
+ * The operation has started
+ */
+ OP_STATE_STARTED
+};
/**
* not have been started yet).
*/
OperationRelease release;
-
+
/**
* Closure for callbacks.
*/
*/
struct OperationQueue **queues;
+ /**
+ * Array of number resources an operation need from each queue. This numbers
+ * in this array should correspond to the queues array
+ */
+ unsigned int *nres;
+
/**
* The id of the task which calls OperationStart for this operation
*/
/**
* The state of the operation
*/
- enum OperationState state;
-
+ enum OperationState state;
+
};
*/
static void
call_start (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
+{
struct GNUNET_TESTBED_Operation *op = cls;
-
+
op->start_task_id = GNUNET_SCHEDULER_NO_TASK;
op->state = OP_STATE_STARTED;
if (NULL != op->start)
- {
op->start (op->cb_cls);
- }
}
*/
static void
check_readiness (struct GNUNET_TESTBED_Operation *op)
-{
+{
unsigned int i;
- if (GNUNET_SCHEDULER_NO_TASK != op->start_task_id)
- return;
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == op->start_task_id);
for (i = 0; i < op->nqueues; i++)
{
- if (0 == op->queues[i]->active)
+ GNUNET_assert (0 < op->nres[i]);
+ if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active)
return;
}
for (i = 0; i < op->nqueues; i++)
- {
- op->queues[i]->active--;
- }
+ op->queues[i]->active += op->nres[i];
+ op->state = OP_STATE_READY;
op->start_task_id = GNUNET_SCHEDULER_add_now (&call_start, op);
}
+/**
+ * Defers a ready to be executed operation back to waiting
+ *
+ * @param op the operation to defer
+ */
+static void
+defer (struct GNUNET_TESTBED_Operation *op)
+{
+ unsigned int i;
+
+ GNUNET_assert (OP_STATE_READY == op->state);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != op->start_task_id);
+ GNUNET_SCHEDULER_cancel (op->start_task_id);
+ op->start_task_id = GNUNET_SCHEDULER_NO_TASK;
+ for (i = 0; i < op->nqueues; i++)
+ op->queues[i]->active--;
+ op->state = OP_STATE_WAITING;
+}
+
+
/**
* Create an 'operation' to be performed.
*
* @return handle to the operation
*/
struct GNUNET_TESTBED_Operation *
-GNUNET_TESTBED_operation_create_ (void *cls,
- OperationStart start,
- OperationRelease release)
+GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
+ OperationRelease release)
{
struct GNUNET_TESTBED_Operation *op;
op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation));
op->start = start;
+ op->state = OP_STATE_INIT;
op->release = release;
op->cb_cls = cls;
op->start_task_id = GNUNET_SCHEDULER_NO_TASK;
struct OperationQueue *queue;
queue = GNUNET_malloc (sizeof (struct OperationQueue));
- queue->active = max_active;
+ queue->max_active = max_active;
return queue;
}
/**
- * Add an operation to a queue. An operation can be in multiple
- * queues at once. Once all queues permit the operation to become
- * active, the operation will be activated. The actual activation
- * will occur in a separate task (thus allowing multiple queue
- * insertions to be made without having the first one instantly
- * trigger the operation if the first queue has sufficient
- * resources).
+ * Function to reset the maximum number of operations in the given queue. If
+ * max_active is lesser than the number of currently active operations, the
+ * active operations are not stopped immediately.
+ *
+ * @param queue the operation queue which has to be modified
+ * @param max_active the new maximum number of active operations
+ */
+void
+GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
+ unsigned int max_active)
+{
+ struct QueueEntry *entry;
+
+ queue->max_active = max_active;
+ /* if (queue->active >= queue->max_active) */
+ /* return; */
+
+ entry = queue->head;
+ while ((queue->active > queue->max_active) && (NULL != entry))
+ {
+ if (entry->op->state == OP_STATE_READY)
+ defer (entry->op);
+ entry = entry->next;
+ }
+
+ entry = queue->head;
+ while ((NULL != entry) && (queue->active < queue->max_active))
+ {
+ if (OP_STATE_WAITING == entry->op->state)
+ check_readiness (entry->op);
+ entry = entry->next;
+ }
+}
+
+
+/**
+ * Add an operation to a queue. An operation can be in multiple queues at
+ * once. Once the operation is inserted into all the queues
+ * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
+ * waiting for the operation to become active.
*
* @param queue queue to add the operation to
* @param operation operation to add to the queue
+ * @param nres the number of units of the resources of queue needed by the
+ * operation. Should be greater than 0.
*/
void
-GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
- struct GNUNET_TESTBED_Operation *operation)
+GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
+ struct GNUNET_TESTBED_Operation
+ *operation, unsigned int nres)
{
struct QueueEntry *entry;
+ unsigned int qsize;
+ GNUNET_assert (0 < nres);
entry = GNUNET_malloc (sizeof (struct QueueEntry));
entry->op = operation;
+ entry->nres = nres;
GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry);
- operation->queues =
- GNUNET_realloc (operation->queues,
- sizeof (struct OperationQueue *) * (++operation->nqueues));
- operation->queues[operation->nqueues - 1] = queue;
+ qsize = operation->nqueues;
+ GNUNET_array_append (operation->queues, operation->nqueues, queue);
+ GNUNET_array_append (operation->nres, qsize, nres);
+ GNUNET_assert (qsize == operation->nqueues);
+}
+
+
+/**
+ * Add an operation to a queue. An operation can be in multiple queues at
+ * once. Once the operation is inserted into all the queues
+ * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
+ * waiting for the operation to become active. The operation is assumed to take
+ * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
+ * requires more than 1
+ *
+ * @param queue queue to add the operation to
+ * @param operation operation to add to the queue
+ */
+void
+GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
+ struct GNUNET_TESTBED_Operation
+ *operation)
+{
+ return GNUNET_TESTBED_operation_queue_insert2_ (queue, operation, 1);
+}
+
+
+/**
+ * Marks the given operation as waiting on the queues. Once all queues permit
+ * the operation to become active, the operation will be activated. The actual
+ * activation will occur in a separate task (thus allowing multiple queue
+ * insertions to be made without having the first one instantly trigger the
+ * operation if the first queue has sufficient resources).
+ *
+ * @param operation the operation to marks as waiting
+ */
+void
+GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation
+ *operation)
+{
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == operation->start_task_id);
+ operation->state = OP_STATE_WAITING;
check_readiness (operation);
}
*/
void
GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
- struct GNUNET_TESTBED_Operation *operation)
+ struct GNUNET_TESTBED_Operation
+ *operation)
{
struct QueueEntry *entry;
struct QueueEntry *entry2;
-
+
for (entry = queue->head; NULL != entry; entry = entry->next)
if (entry->op == operation)
break;
GNUNET_assert (NULL != entry);
- if (OP_STATE_STARTED == operation->state)
- queue->active++;
+ GNUNET_assert (0 < entry->nres);
+ switch (operation->state)
+ {
+ case OP_STATE_INIT:
+ case OP_STATE_WAITING:
+ break;
+ case OP_STATE_READY:
+ case OP_STATE_STARTED:
+ GNUNET_assert (0 != queue->active);
+ GNUNET_assert (queue->active >= entry->nres);
+ queue->active -= entry->nres;
+ break;
+ }
entry2 = entry->next;
GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry);
GNUNET_free (entry);
for (; NULL != entry2; entry2 = entry2->next)
- if (OP_STATE_STARTED != entry2->op->state)
+ if (OP_STATE_WAITING == entry2->op->state)
break;
if (NULL == entry2)
return;
GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *operation)
{
unsigned int i;
-
+
if (GNUNET_SCHEDULER_NO_TASK != operation->start_task_id)
{
GNUNET_SCHEDULER_cancel (operation->start_task_id);
for (i = 0; i < operation->nqueues; i++)
GNUNET_TESTBED_operation_queue_remove_ (operation->queues[i], operation);
GNUNET_free (operation->queues);
+ GNUNET_free (operation->nres);
if (NULL != operation->release)
operation->release (operation->cb_cls);
GNUNET_free (operation);