- opaque mq structs
[oweals/gnunet.git] / src / testbed / testbed_api_operations.c
index 5a62cac4e742b2b91172dea4d04a583943293c2e..47d135ffd47d09532d5c44a61bd788c6a88b5156 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      (C) 2008--2012 Christian Grothoff (and other contributing authors)
+      (C) 2008--2013 Christian Grothoff (and other contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -22,7 +22,9 @@
  * @file testbed/testbed_api_operations.c
  * @brief functions to manage operation queues
  * @author Christian Grothoff
+ * @author Sree Harsha Totakura
  */
+
 #include "platform.h"
 #include "testbed_api_operations.h"
 
@@ -60,16 +62,6 @@ struct QueueEntry
  */
 struct OperationQueue
 {
- /**
-   * The head of the operation queue
-   */
-  struct QueueEntry *head;
-
-  /**
-   * The tail of the operation queue
-   */
-  struct QueueEntry *tail;
-
   /**
    * DLL head for the wait queue.  Operations which are waiting for this
    * operation queue are put here
@@ -81,6 +73,40 @@ struct OperationQueue
    */
   struct QueueEntry *wq_tail;
 
+  /**
+   * DLL head for the ready queue.  Operations which are in this operation queue
+   * and are in ready state are put here
+   */
+  struct QueueEntry *rq_head;
+
+  /**
+   * DLL tail for the ready queue
+   */
+  struct QueueEntry *rq_tail;
+
+  /**
+   * DLL head for the active queue.  Operations which are in this operation
+   * queue and are currently active are put here
+   */
+  struct QueueEntry *aq_head;
+
+  /**
+   * DLL tail for the active queue.
+   */
+  struct QueueEntry *aq_tail;
+
+  /**
+   * DLL head for the inactive queue.  Operations which are inactive and can be
+   * evicted if the queues it holds are maxed out and another operation begins
+   * to wait on them.
+   */
+  struct QueueEntry *nq_head;
+
+  /**
+   * DLL tail for the inactive queue.
+   */
+  struct QueueEntry *nq_tail;
+
   /**
    * Number of operations that are currently active in this queue.
    */
@@ -115,9 +141,17 @@ enum OperationState
   OP_STATE_READY,
 
   /**
-   * The operation has started
+   * The operation has started and is active
+   */
+  OP_STATE_ACTIVE,
+
+  /**
+   * The operation is inactive.  It still holds resources on the operation
+   * queues.  However, this operation will be evicted when another operation
+   * requires resources from the maxed out queues this operation is holding
+   * resources from.
    */
-  OP_STATE_STARTED
+  OP_STATE_INACTIVE
 };
 
 
@@ -215,6 +249,101 @@ struct ReadyQueueEntry *rq_tail;
 GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
 
 
+/**
+ * Removes a queue entry of an operation from one of the operation queues' lists
+ * depending on the state of the operation
+ *
+ * @param op the operation whose entry has to be removed
+ * @param index the index of the entry in the operation's array of queue entries
+ */
+static void
+remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
+{
+  struct OperationQueue *opq;
+  struct QueueEntry *entry;
+  
+  opq = op->queues[index];
+  entry = op->qentries[index];
+  switch (op->state)
+  {
+  case OP_STATE_INIT:
+    GNUNET_assert (0);
+    break;
+  case OP_STATE_WAITING:
+    GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
+    break;
+  case OP_STATE_READY:
+    GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
+    break;
+  case OP_STATE_ACTIVE:
+    GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
+    break;
+  case OP_STATE_INACTIVE:
+    GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
+    break;
+  }
+}
+
+
+/**
+ * Changes the state of the operation while moving its associated queue entries
+ * in the operation's operation queues
+ *
+ * @param op the operation whose state has to be changed
+ * @param state the state the operation should have.  It cannot be OP_STATE_INIT
+ */
+static void
+change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
+{
+  struct QueueEntry *entry;
+  struct OperationQueue *opq;
+  unsigned int cnt;
+  unsigned int s;
+  
+  GNUNET_assert (OP_STATE_INIT != state);
+  GNUNET_assert (NULL != op->queues);
+  GNUNET_assert (NULL != op->nres);
+  GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
+  GNUNET_assert (op->state != state);
+  for (cnt = 0; cnt < op->nqueues; cnt++)
+  {
+    if (OP_STATE_INIT == op->state)
+    {
+      entry = GNUNET_malloc (sizeof (struct QueueEntry));
+      entry->op = op;
+      entry->nres = op->nres[cnt];
+      s = cnt;
+      GNUNET_array_append (op->qentries, s, entry);      
+    }
+    else
+    {
+      entry = op->qentries[cnt];
+      remove_queue_entry (op, cnt);
+    }
+    opq = op->queues[cnt];
+    switch (state)
+    {
+    case OP_STATE_INIT:
+      GNUNET_assert (0);
+      break;
+    case OP_STATE_WAITING:
+      GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
+      break;
+    case OP_STATE_READY:
+      GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
+      break;
+    case OP_STATE_ACTIVE:
+      GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
+      break;
+    case OP_STATE_INACTIVE:
+      GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
+      break;
+    }
+  }
+  op->state = state;
+}
+
+
 /**
  * Removes an operation from the ready queue.  Also stops the 'process_rq_task'
  * if the given operation is the last one in the queue.
@@ -256,7 +385,7 @@ process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   rq_remove (op);
   if (NULL != rq_head)
     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
-  op->state = OP_STATE_STARTED;
+  change_state (op, OP_STATE_ACTIVE);
   if (NULL != op->start)
     op->start (op->cb_cls);  
 }
@@ -282,46 +411,136 @@ rq_add (struct GNUNET_TESTBED_Operation *op)
 }
 
 
-void
-wq_add (struct GNUNET_TESTBED_Operation *op)
+/**
+ * Checks if the given operation queue is empty or not
+ *
+ * @param opq the operation queue
+ * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
+ *           otherwise
+ */
+static int
+is_queue_empty (struct OperationQueue *opq)
 {
-  struct QueueEntry *entry;
-  struct OperationQueue *opq;
-  unsigned int cnt;
+  if ( (NULL != opq->wq_head)
+       || (NULL != opq->rq_head)
+       || (NULL != opq->aq_head)
+       || (NULL != opq->nq_head) )
+    return GNUNET_NO;
+  return GNUNET_YES;
+}
 
-  GNUNET_assert (OP_STATE_WAITING == op->state);
-  GNUNET_assert (NULL == op->qentries);
-  for (cnt = 0; cnt < op->nqueues;)
+
+/**
+ * Checks if the given operation queue has enough resources to provide for the
+ * operation of the given queue entry.  It also checks if any inactive
+ * operations are to be released in order to accommodate the needed resources
+ * and returns them as an array.
+ *
+ * @param opq the operation queue to check for resource accommodation
+ * @param entry the operation queue entry whose operation's resources are to be
+ *          accommodated
+ * @param ops_ pointer to return the array of operations which are to be released
+ *          in order to accommodate the new operation.  Can be NULL
+ * @param n_ops_ the number of operations in ops_
+ * @return GNUNET_YES if the given entry's operation can be accommodated in this
+ *           queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
+ *           be set to NULL and 0 respectively.
+ */
+static int
+decide_capacity (struct OperationQueue *opq,
+                 struct QueueEntry *entry,
+                 struct GNUNET_TESTBED_Operation ***ops_,
+                 unsigned int *n_ops_)
+{
+  struct QueueEntry **evict_entries;
+  struct GNUNET_TESTBED_Operation **ops;
+  struct GNUNET_TESTBED_Operation *op;
+  unsigned int n_ops;
+  unsigned int n_evict_entries;
+  unsigned int need;
+  int deficit;
+  int rval;
+
+  GNUNET_assert (NULL != (op = entry->op));
+  GNUNET_assert (0 < (need = entry->nres));
+  ops = NULL;
+  n_ops = 0;
+  evict_entries = NULL;
+  n_evict_entries = 0;
+  rval = GNUNET_YES;
+  if (opq->active > opq->max_active)
   {
-    opq = op->queues[cnt];
-    entry = GNUNET_malloc (sizeof (struct QueueEntry));
-    entry->op = op;
-    entry->nres = op->nres[cnt];
-    GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
-    GNUNET_array_append (op->qentries, cnt, entry); /* increments cnt */
+    need += opq->active - opq->max_active;
+    rval = GNUNET_NO;
+    goto ret;
   }
+  if ((opq->active + need) <= opq->max_active)
+    goto ret;
+  deficit = need - (opq->max_active - opq->active);
+  for (entry = opq->nq_head;
+       (0 < deficit) && (NULL != entry);
+       entry = entry->next)
+  {
+    GNUNET_array_append (evict_entries, n_evict_entries, entry);
+    deficit -= entry->nres;
+  }
+  if (0 < deficit)
+  {
+    rval = GNUNET_NO;
+    goto ret;
+  }
+  for (n_ops = 0; n_ops < n_evict_entries;)
+  {
+    op = evict_entries[n_ops]->op;
+    GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
+  }
+
+ ret:
+  GNUNET_free_non_null (evict_entries);  
+  if (NULL != ops_) *ops_ = ops;
+  if (NULL != n_ops_) *n_ops_ = n_ops;
+  return rval;
 }
 
 
-void
-wq_remove (struct GNUNET_TESTBED_Operation *op)
+/**
+ * Merges an array of operations into another, eliminating duplicates.  No
+ * ordering is guaranteed.
+ *
+ * @param old the array into which the merging is done.
+ * @param n_old the number of operations in old array
+ * @param new the array from which operations are to be merged
+ * @param n_new the number of operations in new array
+ */
+static void
+merge_ops (struct GNUNET_TESTBED_Operation ***old,
+           unsigned int *n_old,
+           struct GNUNET_TESTBED_Operation **new,
+           unsigned int n_new)
 {
-  struct QueueEntry *entry;
-  struct OperationQueue *opq;
-  unsigned int cnt;
-
-  GNUNET_assert (OP_STATE_WAITING == op->state);
-  GNUNET_assert (NULL != op->qentries);
-  for (cnt = 0; cnt < op->nqueues; cnt ++)
-  {
-    opq = op->queues[cnt];
-    entry = op->qentries[cnt];
-    GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
-    GNUNET_free (entry);
+  struct GNUNET_TESTBED_Operation **cur;
+  unsigned int i;
+  unsigned int j;
+  unsigned int n_cur;
+  GNUNET_assert (NULL != old);
+  n_cur = *n_old;
+  cur = *old;
+  for (i = 0; i < n_new; i++)
+  {    
+    for (j = 0; j < *n_old; j++)
+    {
+      if (new[i] == cur[j])
+        break;
+    }
+    if (j < *n_old)
+      continue;
+    GNUNET_array_append (cur, n_cur, new[j]);
   }
-  GNUNET_free (op->qentries);
-  op->qentries = NULL;
+  *old = cur;
+  *n_old = n_cur;
 }
+           
 
 
 /**
@@ -329,24 +548,49 @@ wq_remove (struct GNUNET_TESTBED_Operation *op)
  *
  * @param op the operation
  */
-static void
+static int
 check_readiness (struct GNUNET_TESTBED_Operation *op)
 {
+  struct GNUNET_TESTBED_Operation **evict_ops;
+  struct GNUNET_TESTBED_Operation **ops;
+  unsigned int n_ops;
+  unsigned int n_evict_ops;
   unsigned int i;
 
   GNUNET_assert (NULL == op->rq_entry);
   GNUNET_assert (OP_STATE_WAITING == op->state);
+  evict_ops = NULL;
+  n_evict_ops = 0;
   for (i = 0; i < op->nqueues; i++)
   {
-    GNUNET_assert (0 < op->nres[i]);
-    if ((op->queues[i]->active + op->nres[i]) > op->queues[i]->max_active)
-      return;
+    ops = NULL;
+    n_ops = 0;
+    if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
+                                      &ops, &n_ops))
+    {
+      GNUNET_free_non_null (evict_ops);
+      return GNUNET_NO;
+    }
+    if (NULL == ops)
+      continue;
+    merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
+    GNUNET_free (ops);    
+  }
+  if (NULL != evict_ops)
+  {
+    for (i = 0; i < n_evict_ops; i++)
+      GNUNET_TESTBED_operation_release_ (evict_ops[i]);
+    GNUNET_free (evict_ops);
+    evict_ops = NULL;
+    /* Evicting the operations should schedule this operation */
+    GNUNET_assert (OP_STATE_READY == op->state);
+    return GNUNET_YES;
   }
-  wq_remove (op);
   for (i = 0; i < op->nqueues; i++)
     op->queues[i]->active += op->nres[i];
-  op->state = OP_STATE_READY;  
+  change_state (op, OP_STATE_READY);
   rq_add (op);
+  return GNUNET_YES;
 }
 
 
@@ -364,8 +608,7 @@ defer (struct GNUNET_TESTBED_Operation *op)
   rq_remove (op);
   for (i = 0; i < op->nqueues; i++)
     op->queues[i]->active--;
-  op->state = OP_STATE_WAITING;
-  wq_add (op);
+  change_state (op, OP_STATE_WAITING);
 }
 
 
@@ -419,8 +662,7 @@ GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
 void
 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
 {
-  GNUNET_break (NULL == queue->head);
-  GNUNET_break (NULL == queue->tail);
+  GNUNET_break (GNUNET_YES == is_queue_empty (queue));
   GNUNET_free (queue);
 }
 
@@ -435,13 +677,36 @@ GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
 int
 GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
 {
-  if (NULL != queue->head)
+  if (GNUNET_NO == is_queue_empty (queue))
     return GNUNET_NO;
   GNUNET_TESTBED_operation_queue_destroy_ (queue);
   return GNUNET_YES;
 }
 
 
+/**
+ * Rechecks if any of the operations in the given operation queue's waiting list
+ * can be made active
+ *
+ * @param opq the operation queue
+ */
+static void
+recheck_waiting (struct OperationQueue *opq)
+{
+  struct QueueEntry *entry;
+  struct QueueEntry *entry2;
+
+  entry = opq->wq_head;
+  while (NULL != entry)
+  {
+    entry2 = entry->next;
+    if (GNUNET_NO == check_readiness (entry->op))
+      break;
+    entry = entry2;
+  }
+}
+
+
 /**
  * Function to reset the maximum number of operations in the given queue. If
  * max_active is lesser than the number of currently active operations, the
@@ -457,21 +722,10 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
   struct QueueEntry *entry;
 
   queue->max_active = max_active;
-  entry = queue->head;
-  while ((queue->active > queue->max_active) && (NULL != entry))
-  {
-    if (entry->op->state == OP_STATE_READY)
-      defer (entry->op);
-    entry = entry->next;
-  }
-
-  entry = queue->head;
-  while ((NULL != entry) && (queue->active < queue->max_active))
-  {
-    if (OP_STATE_WAITING == entry->op->state)
-      check_readiness (entry->op);
-    entry = entry->next;
-  }
+  while ( (queue->active > queue->max_active)
+          && (NULL != (entry = queue->rq_head)) )
+    defer (entry->op);
+  recheck_waiting (queue);
 }
 
 
@@ -491,14 +745,9 @@ GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
                                          struct GNUNET_TESTBED_Operation *op,
                                          unsigned int nres)
 {
-  struct QueueEntry *entry;
   unsigned int qsize;
 
   GNUNET_assert (0 < nres);
-  entry = GNUNET_malloc (sizeof (struct QueueEntry));
-  entry->op = op;
-  entry->nres = nres;
-  GNUNET_CONTAINER_DLL_insert_tail (queue->head, queue->tail, entry);
   qsize = op->nqueues;
   GNUNET_array_append (op->queues, op->nqueues, queue);
   GNUNET_array_append (op->nres, qsize, nres);
@@ -538,51 +787,55 @@ void
 GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
 {
   GNUNET_assert (NULL == op->rq_entry);
-  op->state = OP_STATE_WAITING;
-  wq_add (op);
-  check_readiness (op);
+  change_state (op, OP_STATE_WAITING);
+  (void) check_readiness (op);
 }
 
 
 /**
- * Remove an operation from a queue.  This can be because the
- * oeration was active and has completed (and the resources have
- * been released), or because the operation was cancelled and
- * thus scheduling the operation is no longer required.
+ * Marks an active operation as inactive - the operation will be kept in a
+ * ready-to-be-released state and continues to hold resources until another
+ * operation contents for them.
  *
- * @param queue queue to add the operation to
- * @param op operation to add to the queue
+ * @param op the operation to be marked as inactive.  The operation start
+ *          callback should have been called before for this operation to mark
+ *          it as inactive.
  */
 void
-GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
-                                        struct GNUNET_TESTBED_Operation
-                                        *op)
+GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
 {
-  struct QueueEntry *entry;
+  struct OperationQueue **queues;
+  size_t ms;
+  unsigned int nqueues;
+  unsigned int i;
 
-  for (entry = queue->head; NULL != entry; entry = entry->next)
-    if (entry->op == op)
-      break;
-  GNUNET_assert (NULL != entry);
-  GNUNET_assert (0 < entry->nres);
-  switch (op->state)
-  {
-  case OP_STATE_INIT:
-  case OP_STATE_WAITING:
-    break;
-  case OP_STATE_READY:
-  case OP_STATE_STARTED:
-    GNUNET_assert (0 != queue->active);
-    GNUNET_assert (queue->active >= entry->nres);
-    queue->active -= entry->nres;
-    break;
-  }
-  GNUNET_CONTAINER_DLL_remove (queue->head, queue->tail, entry);
-  GNUNET_free (entry);
-  entry = queue->wq_head;
-  if (NULL == entry)
-    return;
-  check_readiness (entry->op);
+  GNUNET_assert (OP_STATE_ACTIVE == op->state);
+  change_state (op, OP_STATE_INACTIVE);
+  nqueues = op->nqueues;
+  ms = sizeof (struct OperationQueue *) * nqueues;
+  queues = GNUNET_malloc (ms);
+  /* Cloning is needed as the operation be released by waiting operations and
+     hence its nqueues memory ptr will be freed */
+  GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
+  for (i = 0; i < nqueues; i++)
+    recheck_waiting (queues[i]);
+  GNUNET_free (queues);
+}
+
+
+/**
+ * Marks and inactive operation as active.  This fuction should be called to
+ * ensure that the oprelease callback will not be called until it is either
+ * marked as inactive or released.
+ *
+ * @param op the operation to be marked as active
+ */
+void
+GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
+{
+
+  GNUNET_assert (OP_STATE_INACTIVE == op->state);
+  change_state (op, OP_STATE_ACTIVE);
 }
 
 
@@ -595,22 +848,45 @@ GNUNET_TESTBED_operation_queue_remove_ (struct OperationQueue *queue,
 void
 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
 {
+  struct QueueEntry *entry;  
+  struct OperationQueue *opq;
   unsigned int i;
 
-  switch (op->state)
+  if (OP_STATE_INIT == op->state)
   {
-  case OP_STATE_READY:
-    rq_remove (op);
-    break;
-  case OP_STATE_WAITING:
-    wq_remove (op);
-    break;
-  case OP_STATE_STARTED:
-  case OP_STATE_INIT:
-    break;
+    GNUNET_free (op);
+    return;
   }
+  if (OP_STATE_READY == op->state)
+    rq_remove (op);
+  if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
+    GNUNET_TESTBED_operation_activate_ (op);
+  GNUNET_assert (NULL != op->queues);
+  GNUNET_assert (NULL != op->qentries);
   for (i = 0; i < op->nqueues; i++)
-    GNUNET_TESTBED_operation_queue_remove_ (op->queues[i], op);
+  {
+    entry = op->qentries[i];
+    remove_queue_entry (op, i);
+    opq = op->queues[i];
+    switch (op->state)
+    {      
+    case OP_STATE_INIT:
+    case OP_STATE_INACTIVE:
+      GNUNET_assert (0);
+      break;
+    case OP_STATE_WAITING:      
+      break;
+    case OP_STATE_READY:
+    case OP_STATE_ACTIVE:
+      GNUNET_assert (0 != opq->active);
+      GNUNET_assert (opq->active >= entry->nres);
+      opq->active -= entry->nres;
+      recheck_waiting (opq);
+      break;
+    }    
+    GNUNET_free (entry);
+  }
+  GNUNET_free_non_null (op->qentries);
   GNUNET_free (op->queues);
   GNUNET_free (op->nres);
   if (NULL != op->release)