*/
struct OperationQueue
{
- /**
- * The head of the operation queue
- */
- struct QueueEntry *head;
-
- /**
- * The tail of the operation queue
- */
- struct QueueEntry *tail;
-
/**
* DLL head for the wait queue. Operations which are waiting for this
* operation queue are put here
*/
struct QueueEntry *wq_tail;
+ /**
+ * DLL head for the ready queue. Operations which are in this operation queue
+ * and are in ready state are put here
+ */
+ struct QueueEntry *rq_head;
+
+ /**
+ * DLL tail for the ready queue
+ */
+ struct QueueEntry *rq_tail;
+
+ /**
+ * DLL head for the active queue. Operations which are in this operation
+ * queue and are currently active are put here
+ */
+ struct QueueEntry *aq_head;
+
+ /**
+ * DLL tail for the active queue.
+ */
+ struct QueueEntry *aq_tail;
+
/**
* Number of operations that are currently active in this queue.
*/
*/
GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
+void
+remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
+{
+ struct OperationQueue *opq;
+ struct QueueEntry *entry;
+
+ opq = op->queues[index];
+ entry = op->qentries[index];
+ switch (op->state)
+ {
+ case OP_STATE_INIT:
+ GNUNET_assert (0);
+ break;
+ case OP_STATE_WAITING:
+ GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
+ break;
+ case OP_STATE_READY:
+ GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
+ break;
+ case OP_STATE_STARTED:
+ GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
+ break;
+ }
+}
+
+void
+change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
+{
+ struct QueueEntry *entry;
+ struct OperationQueue *opq;
+ unsigned int cnt;
+ unsigned int s;
+
+ GNUNET_assert (OP_STATE_INIT != state);
+ GNUNET_assert (NULL != op->queues);
+ GNUNET_assert (NULL != op->nres);
+ GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
+ GNUNET_assert (op->state != state);
+ for (cnt = 0; cnt < op->nqueues; cnt++)
+ {
+ if (OP_STATE_INIT == op->state)
+ {
+ entry = GNUNET_malloc (sizeof (struct QueueEntry));
+ entry->op = op;
+ entry->nres = op->nres[cnt];
+ s = cnt;
+ GNUNET_array_append (op->qentries, s, entry);
+ }
+ else
+ {
+ entry = op->qentries[cnt];
+ remove_queue_entry (op, cnt);
+ }
+ opq = op->queues[cnt];
+ switch (state)
+ {
+ case OP_STATE_INIT:
+ GNUNET_assert (0);
+ break;
+ case OP_STATE_WAITING:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
+ break;
+ case OP_STATE_READY:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
+ break;
+ case OP_STATE_STARTED:
+ GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
+ break;
+ }
+ }
+ op->state = state;
+}
+
/**
* Removes an operation from the ready queue. Also stops the 'process_rq_task'
rq_remove (op);
if (NULL != rq_head)
process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
- op->state = OP_STATE_STARTED;
+ change_state (op, OP_STATE_STARTED);
if (NULL != op->start)
op->start (op->cb_cls);
}
}
-void
-wq_add (struct GNUNET_TESTBED_Operation *op)
+static int
+is_queue_empty (struct OperationQueue *opq)
{
- struct QueueEntry *entry;
- struct OperationQueue *opq;
- unsigned int cnt;
-
- GNUNET_assert (OP_STATE_WAITING == op->state);
- GNUNET_assert (NULL == op->qentries);
- for (cnt = 0; cnt < op->nqueues;)
- {
- opq = op->queues[cnt];
- entry = GNUNET_malloc (sizeof (struct QueueEntry));
- entry->op = op;
- entry->nres = op->nres[cnt];
- GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
- GNUNET_array_append (op->qentries, cnt, entry); /* increments cnt */
- }
-}
-
-
-void
-wq_remove (struct GNUNET_TESTBED_Operation *op)
-{
- struct QueueEntry *entry;
- struct OperationQueue *opq;
- unsigned int cnt;
-
- GNUNET_assert (OP_STATE_WAITING == op->state);
- GNUNET_assert (NULL != op->qentries);
- for (cnt = 0; cnt < op->nqueues; cnt ++)
- {
- opq = op->queues[cnt];
- entry = op->qentries[cnt];
- GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
- GNUNET_free (entry);
- }
- GNUNET_free (op->qentries);
- op->qentries = NULL;
+ if ( (NULL != opq->wq_head)
+ || (NULL != opq->rq_head)
+ || (NULL != opq->aq_head) )
+ return GNUNET_NO;
+ return GNUNET_YES;
}
-
/**
* Checks for the readiness of an operation and schedules a operation start task
*
if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active)
return;
}
- wq_remove (op);
for (i = 0; i < op->nqueues; i++)
op->queues[i]->active += op->nres[i];
- op->state = OP_STATE_READY;
+ change_state (op, OP_STATE_READY);
rq_add (op);
}
rq_remove (op);
for (i = 0; i < op->nqueues; i++)
op->queues[i]->active--;
- op->state = OP_STATE_WAITING;
- wq_add (op);
+ change_state (op, OP_STATE_WAITING);
}
void
GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
{
- GNUNET_break (NULL == queue->head);
- GNUNET_break (NULL == queue->tail);
+ GNUNET_break (GNUNET_YES == is_queue_empty (queue));
GNUNET_free (queue);
}
int
GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
{
- if (NULL != queue->head)
+ if (GNUNET_NO == is_queue_empty (queue))
return GNUNET_NO;
GNUNET_TESTBED_operation_queue_destroy_ (queue);
return GNUNET_YES;
}
+void
+recheck_waiting (struct OperationQueue *opq)
+{
+ struct QueueEntry *entry;
+ struct QueueEntry *entry2;
+
+ entry = opq->wq_head;
+ while ( (NULL != entry) && (opq->active < opq->max_active) )
+ {
+ entry2 = entry->next;
+ check_readiness (entry->op);
+ entry = entry2;
+ }
+}
+
+
/**
* Function to reset the maximum number of operations in the given queue. If
* max_active is lesser than the number of currently active operations, the
struct QueueEntry *entry;
queue->max_active = max_active;
- entry = queue->head;
- while ((queue->active > queue->max_active) && (NULL != entry))
- {
- if (entry->op->state == OP_STATE_READY)
- defer (entry->op);
- entry = entry->next;
- }
-
- entry = queue->head;
- while ((NULL != entry) && (queue->active < queue->max_active))
- {
- if (OP_STATE_WAITING == entry->op->state)
- check_readiness (entry->op);
- entry = entry->next;
- }
+ while ( (queue->active > queue->max_active)
+ && (NULL != (entry = queue->rq_head)) )
+ defer (entry->op);
+ recheck_waiting (queue);
}
struct GNUNET_TESTBED_Operation *op,
unsigned int nres)
{
- struct QueueEntry *entry;
unsigned int qsize;
GNUNET_assert (0 < nres);
- entry = GNUNET_malloc (sizeof (struct QueueEntry));
- entry->op = op;
- entry->nres = nres;
- GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry);
qsize = op->nqueues;
GNUNET_array_append (op->queues, op->nqueues, queue);
GNUNET_array_append (op->nres, qsize, nres);
GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
{
GNUNET_assert (NULL == op->rq_entry);
- op->state = OP_STATE_WAITING;
- wq_add (op);
+ change_state (op, OP_STATE_WAITING);
check_readiness (op);
}
-/**
- * Remove an operation from a queue. This can be because the
- * oeration was active and has completed (and the resources have
- * been released), or because the operation was cancelled and
- * thus scheduling the operation is no longer required.
- *
- * @param queue queue to add the operation to
- * @param op operation to add to the queue
- */
-void
-GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
- struct GNUNET_TESTBED_Operation
- *op)
-{
- struct QueueEntry *entry;
-
- for (entry = queue->head; NULL != entry; entry = entry->next)
- if (entry->op == op)
- break;
- GNUNET_assert (NULL != entry);
- GNUNET_assert (0 < entry->nres);
- switch (op->state)
- {
- case OP_STATE_INIT:
- case OP_STATE_WAITING:
- break;
- case OP_STATE_READY:
- case OP_STATE_STARTED:
- GNUNET_assert (0 != queue->active);
- GNUNET_assert (queue->active >= entry->nres);
- queue->active -= entry->nres;
- break;
- }
- GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry);
- GNUNET_free (entry);
- entry = queue->wq_head;
- if (NULL == entry)
- return;
- check_readiness (entry->op);
-}
-
-
/**
* An operation is 'done' (was cancelled or finished); remove
* it from the queues and release associated resources.
void
GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
{
+ struct QueueEntry *entry;
+ struct OperationQueue *opq;
unsigned int i;
- switch (op->state)
+ if (OP_STATE_INIT == op->state)
{
- case OP_STATE_READY:
- rq_remove (op);
- break;
- case OP_STATE_WAITING:
- wq_remove (op);
- break;
- case OP_STATE_STARTED:
- case OP_STATE_INIT:
- break;
+ GNUNET_free (op);
+ return;
}
+ if (OP_STATE_READY == op->state)
+ rq_remove (op);
+ GNUNET_assert (NULL != op->queues);
+ GNUNET_assert (NULL != op->qentries);
for (i = 0; i < op->nqueues; i++)
- GNUNET_TESTBED_operation_queue_remove_ (op->queues[i], op);
+ {
+ entry = op->qentries[i];
+ remove_queue_entry (op, i);
+ opq = op->queues[i];
+ switch (op->state)
+ {
+ case OP_STATE_INIT:
+ case OP_STATE_WAITING:
+ break;
+ case OP_STATE_READY:
+ case OP_STATE_STARTED:
+ GNUNET_assert (0 != opq->active);
+ GNUNET_assert (opq->active >= entry->nres);
+ opq->active -= entry->nres;
+ recheck_waiting (opq);
+ break;
+ }
+ GNUNET_free (entry);
+ }
+ GNUNET_free_non_null (op->qentries);
GNUNET_free (op->queues);
GNUNET_free (op->nres);
if (NULL != op->release)