fix pointer indentation
[oweals/gnunet.git] / src / testbed / testbed_api_operations.c
index c09ec366ac388a9ee43b4500ed94a8c6022663d5..a406c0e0b834b01344cec29ac3034173eb85d9b1 100644 (file)
@@ -1,21 +1,21 @@
 /*
       This file is part of GNUnet
-      (C) 2008--2013 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2008--2013 GNUnet e.V.
 
-      GNUnet is free software; you can redistribute it and/or modify
-      it under the terms of the GNU General Public License as published
-      by the Free Software Foundation; either version 3, or (at your
-      option) any later version.
+      GNUnet is free software: you can redistribute it and/or modify it
+      under the terms of the GNU Affero General Public License as published
+      by the Free Software Foundation, either version 3 of the License,
+      or (at your option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
       WITHOUT ANY WARRANTY; without even the implied warranty of
       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-      General Public License for more details.
+      Affero General Public License for more details.
 
-      You should have received a copy of the GNU General Public License
-      along with GNUnet; see the file COPYING.  If not, write to the
-      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-      Boston, MA 02111-1307, USA.
+      You should have received a copy of the GNU Affero General Public License
+      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
  */
 
 /**
 #include "testbed_api_operations.h"
 #include "testbed_api_sd.h"
 
+/**
+ * The number of readings containing past operation's timing information that we
+ * keep track of for adaptive queues
+ */
+#define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40
+
+/**
+ * The number of parallel opeartions we start with by default for adaptive
+ * queues
+ */
+#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
 
 /**
  * An entry in the operation queue
@@ -115,7 +126,7 @@ struct FeedbackCtx
    * Head for DLL of time slots which are free to be allocated to operations
    */
   struct TimeSlot *alloc_head;
-    
+
   /**
    * Tail for DLL of time slots which are free to be allocated to operations
    */
@@ -131,12 +142,16 @@ struct FeedbackCtx
    * Number of time slots filled so far
    */
   unsigned int tslots_filled;
-  
+
   /**
    * Bound on the maximum number of operations which can be active
    */
   unsigned int max_active_bound;
 
+  /**
+   * Number of operations that have failed
+   */
+  unsigned int nfailed;
 };
 
 
@@ -211,10 +226,21 @@ struct OperationQueue
    * Max number of operations which can be active at any time in this queue.
    * This value can be changed either by calling
    * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
-   * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
+   * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE
    */
   unsigned int max_active;
 
+  /**
+   * The number of resources occupied by failed operations in the current shot.
+   * This is only relavant if the operation queue is of type
+   * #OPERATION_QUEUE_TYPE_ADAPTIVE
+   */
+  unsigned int overload;
+
+  /**
+   * Is this queue marked for expiry?
+   */
+  unsigned int expired;
 };
 
 
@@ -262,7 +288,7 @@ struct ReadyQueueEntry
    * next ptr for DLL
    */
   struct ReadyQueueEntry *next;
-  
+
   /**
    * prev ptr for DLL
    */
@@ -348,23 +374,32 @@ struct GNUNET_TESTBED_Operation
    * Is this a failed operation?
    */
   int failed;
-
 };
 
 /**
  * DLL head for the ready queue
  */
-struct ReadyQueueEntry *rq_head;
+static struct ReadyQueueEntry *rq_head;
 
 /**
  * DLL tail for the ready queue
  */
-struct ReadyQueueEntry *rq_tail;
+static struct ReadyQueueEntry *rq_tail;
+
+/**
+ * Array of operation queues which are to be destroyed
+ */
+static struct OperationQueue **expired_opqs;
+
+/**
+ * Number of expired operation queues in the above array
+ */
+static unsigned int n_expired_opqs;
 
 /**
  * The id of the task to process the ready queue
  */
-GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
+struct GNUNET_SCHEDULER_Task *process_rq_task_id;
 
 
 /**
@@ -402,7 +437,7 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
 {
   struct OperationQueue *opq;
   struct QueueEntry *entry;
-  
+
   opq = op->queues[index];
   entry = op->qentries[index];
   switch (op->state)
@@ -410,15 +445,19 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
   case OP_STATE_INIT:
     GNUNET_assert (0);
     break;
+
   case OP_STATE_WAITING:
     GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
     break;
+
   case OP_STATE_READY:
     GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
     break;
+
   case OP_STATE_ACTIVE:
     GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
     break;
+
   case OP_STATE_INACTIVE:
     GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
     break;
@@ -440,7 +479,7 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
   struct OperationQueue *opq;
   unsigned int cnt;
   unsigned int s;
-  
+
   GNUNET_assert (OP_STATE_INIT != state);
   GNUNET_assert (NULL != op->queues);
   GNUNET_assert (NULL != op->nres);
@@ -450,11 +489,11 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
   {
     if (OP_STATE_INIT == op->state)
     {
-      entry = GNUNET_malloc (sizeof (struct QueueEntry));
+      entry = GNUNET_new (struct QueueEntry);
       entry->op = op;
       entry->nres = op->nres[cnt];
       s = cnt;
-      GNUNET_array_append (op->qentries, s, entry);      
+      GNUNET_array_append (op->qentries, s, entry);
     }
     else
     {
@@ -467,15 +506,19 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
     case OP_STATE_INIT:
       GNUNET_assert (0);
       break;
+
     case OP_STATE_WAITING:
       GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
       break;
+
     case OP_STATE_READY:
       GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
       break;
+
     case OP_STATE_ACTIVE:
       GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
       break;
+
     case OP_STATE_INACTIVE:
       GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
       break;
@@ -493,15 +536,15 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
  */
 static void
 rq_remove (struct GNUNET_TESTBED_Operation *op)
-{  
+{
   GNUNET_assert (NULL != op->rq_entry);
   GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
   GNUNET_free (op->rq_entry);
   op->rq_entry = NULL;
-  if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) )
+  if ((NULL == rq_head) && (NULL != process_rq_task_id))
   {
     GNUNET_SCHEDULER_cancel (process_rq_task_id);
-    process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
+    process_rq_task_id = NULL;
   }
 }
 
@@ -513,16 +556,15 @@ rq_remove (struct GNUNET_TESTBED_Operation *op)
  * the ready queue.
  *
  * @param cls NULL
- * @param tc scheduler task context.  Not used.
  */
 static void
-process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+process_rq_task (void *cls)
 {
   struct GNUNET_TESTBED_Operation *op;
   struct OperationQueue *queue;
   unsigned int cnt;
 
-  process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
+  process_rq_task_id = NULL;
   GNUNET_assert (NULL != rq_head);
   GNUNET_assert (NULL != (op = rq_head->op));
   rq_remove (op);
@@ -552,11 +594,11 @@ rq_add (struct GNUNET_TESTBED_Operation *op)
   struct ReadyQueueEntry *rq_entry;
 
   GNUNET_assert (NULL == op->rq_entry);
-  rq_entry = GNUNET_malloc (sizeof (struct ReadyQueueEntry));
+  rq_entry = GNUNET_new (struct ReadyQueueEntry);
   rq_entry->op = op;
   GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
   op->rq_entry = rq_entry;
-  if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id)
+  if (NULL == process_rq_task_id)
     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
 }
 
@@ -571,10 +613,10 @@ rq_add (struct GNUNET_TESTBED_Operation *op)
 static int
 is_queue_empty (struct OperationQueue *opq)
 {
-  if ( (NULL != opq->wq_head)
-       || (NULL != opq->rq_head)
-       || (NULL != opq->aq_head)
-       || (NULL != opq->nq_head) )
+  if ((NULL != opq->wq_head)
+      || (NULL != opq->rq_head)
+      || (NULL != opq->aq_head)
+      || (NULL != opq->nq_head))
     return GNUNET_NO;
   return GNUNET_YES;
 }
@@ -608,6 +650,7 @@ decide_capacity (struct OperationQueue *opq,
   unsigned int n_ops;
   unsigned int n_evict_entries;
   unsigned int need;
+  unsigned int max;
   int deficit;
   int rval;
 
@@ -618,14 +661,22 @@ decide_capacity (struct OperationQueue *opq,
   evict_entries = NULL;
   n_evict_entries = 0;
   rval = GNUNET_YES;
-  if (opq->active > opq->max_active)
+  if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
+  {
+    GNUNET_assert (NULL != opq->fctx);
+    GNUNET_assert (opq->max_active >= opq->overload);
+    max = opq->max_active - opq->overload;
+  }
+  else
+    max = opq->max_active;
+  if (opq->active > max)
   {
     rval = GNUNET_NO;
     goto ret;
   }
-  if ((opq->active + need) <= opq->max_active)
+  if ((opq->active + need) <= max)
     goto ret;
-  deficit = need - (opq->max_active - opq->active);
+  deficit = need - (max - opq->active);
   for (entry = opq->nq_head;
        (0 < deficit) && (NULL != entry);
        entry = entry->next)
@@ -641,11 +692,11 @@ decide_capacity (struct OperationQueue *opq,
   for (n_ops = 0; n_ops < n_evict_entries;)
   {
     op = evict_entries[n_ops]->op;
-    GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
+    GNUNET_array_append (ops, n_ops, op);  /* increments n-ops */
   }
 
- ret:
-  GNUNET_free_non_null (evict_entries);  
+ret:
+  GNUNET_free_non_null (evict_entries);
   if (NULL != ops_)
     *ops_ = ops;
   else
@@ -675,12 +726,12 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old,
   unsigned int i;
   unsigned int j;
   unsigned int n_cur;
+
   GNUNET_assert (NULL != old);
   n_cur = *n_old;
   cur = *old;
   for (i = 0; i < n_new; i++)
-  {    
+  {
     for (j = 0; j < *n_old; j++)
     {
       if (new[i] == cur[j])
@@ -693,7 +744,6 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old,
   *old = cur;
   *n_old = n_cur;
 }
-           
 
 
 /**
@@ -727,12 +777,12 @@ check_readiness (struct GNUNET_TESTBED_Operation *op)
     if (NULL == ops)
       continue;
     merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
-    GNUNET_free (ops);    
+    GNUNET_free (ops);
   }
   if (NULL != evict_ops)
   {
     for (i = 0; i < n_evict_ops; i++)
-      GNUNET_TESTBED_operation_release_ (evict_ops[i]); 
+      GNUNET_TESTBED_operation_release_ (evict_ops[i]);
     GNUNET_free (evict_ops);
     evict_ops = NULL;
     /* Evicting the operations should schedule this operation */
@@ -762,7 +812,7 @@ defer (struct GNUNET_TESTBED_Operation *op)
   for (i = 0; i < op->nqueues; i++)
   {
     GNUNET_assert (op->queues[i]->active >= op->nres[i]);
-    op->queues[i]->active -= op->nres[i];    
+    op->queues[i]->active -= op->nres[i];
   }
   change_state (op, OP_STATE_WAITING);
 }
@@ -801,10 +851,13 @@ cleanup_tslots (struct OperationQueue *queue)
 
 
 /**
- * Initializes the operation queue for parallel overlay connects
+ * Cleansup the existing timing slots and sets new timing slots in the given
+ * queue to accommodate given number of max active operations.
  *
- * @param h the host handle
- * @param npoc the number of parallel overlay connects - the queue size
+ * @param queue the queue
+ * @param n the number of maximum active operations.  If n is greater than the
+ *   maximum limit set while creating the queue, then the minimum of these two
+ *   will be selected as n
  */
 static void
 adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
@@ -812,15 +865,17 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
   struct FeedbackCtx *fctx = queue->fctx;
   struct TimeSlot *tslot;
   unsigned int cnt;
-  
+
   cleanup_tslots (queue);
-  n = GNUNET_MIN (n ,fctx->max_active_bound);
-  fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
+  n = GNUNET_MIN (n, fctx->max_active_bound);
+  fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot));
+  fctx->nfailed = 0;
   for (cnt = 0; cnt < n; cnt++)
   {
     tslot = &fctx->tslots_freeptr[cnt];
     tslot->queue = queue;
-    GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot);
+    GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
+                                      tslot);
   }
   GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
 }
@@ -831,10 +886,9 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
  * the feedback context.
  *
  * @param queue the queue
- * @param fail GNUNET_YES if the last operation failed; GNUNET_NO if not;
  */
 static void
-adapt_parallelism (struct OperationQueue *queue, int fail)
+adapt_parallelism (struct OperationQueue *queue)
 {
   struct GNUNET_TIME_Relative avg;
   struct FeedbackCtx *fctx;
@@ -842,6 +896,7 @@ adapt_parallelism (struct OperationQueue *queue, int fail)
   int sd;
   unsigned int nvals;
   unsigned int cnt;
+  unsigned int parallelism;
 
   avg = GNUNET_TIME_UNIT_ZERO;
   nvals = 0;
@@ -853,27 +908,51 @@ adapt_parallelism (struct OperationQueue *queue, int fail)
     nvals += tslot->nvals;
   }
   GNUNET_assert (nvals >= queue->max_active);
+  GNUNET_assert (fctx->nfailed <= nvals);
+  nvals -= fctx->nfailed;
+  if (0 == nvals)
+  {
+    if (1 == queue->max_active)
+      adaptive_queue_set_max_active (queue, 1);
+    else
+      adaptive_queue_set_max_active (queue, queue->max_active / 2);
+    return;
+  }
   avg = GNUNET_TIME_relative_divide (avg, nvals);
-  if (GNUNET_SYSERR == 
+  GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+  if (GNUNET_SYSERR ==
       GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
                                            (unsigned int) avg.rel_value_us,
                                            &sd))
   {
-    GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
-    adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
+    adaptive_queue_set_max_active (queue, queue->max_active);  /* no change */
     return;
   }
-  if ((0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, queue->max_active)))
-    GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+
+  parallelism = 0;
+  if (-1 == sd)
+    parallelism = queue->max_active + 1;
+  if (sd <= -2)
+    parallelism = queue->max_active * 2;
+  if (1 == sd)
+    parallelism = queue->max_active - 1;
+  if (2 <= sd)
+    parallelism = queue->max_active / 2;
+  parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
+  adaptive_queue_set_max_active (queue, parallelism);
+
+#if 0
+  /* old algorithm */
   if (sd < 0)
     sd = 0;
   GNUNET_assert (0 <= sd);
-  if ((0 == sd) && (! fail))
+  // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+  if (0 == sd)
   {
     adaptive_queue_set_max_active (queue, queue->max_active * 2);
     return;
   }
-  if ((1 == sd) && (! fail))
+  if (1 == sd)
   {
     adaptive_queue_set_max_active (queue, queue->max_active + 1);
     return;
@@ -883,12 +962,13 @@ adapt_parallelism (struct OperationQueue *queue, int fail)
     adaptive_queue_set_max_active (queue, 1);
     return;
   }
-  if (((sd < 2) && (fail)) || (2 == sd))
+  if (2 == sd)
   {
     adaptive_queue_set_max_active (queue, queue->max_active - 1);
     return;
   }
   adaptive_queue_set_max_active (queue, queue->max_active / 2);
+#endif
 }
 
 
@@ -906,22 +986,32 @@ update_tslots (struct GNUNET_TESTBED_Operation *op)
   struct GNUNET_TIME_Relative t;
   struct TimeSlot *tslot;
   struct FeedbackCtx *fctx;
-  
+  unsigned int i;
+
   t = GNUNET_TIME_absolute_get_duration (op->tstart);
   while (NULL != (tslot = op->tslots_head)) /* update time slots */
   {
     queue = tslot->queue;
     fctx = queue->fctx;
-    tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
     GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
-    tslot->op = NULL;    
+    tslot->op = NULL;
     GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
-  tslot);
+                                      tslot);
+    if (op->failed)
+    {
+      fctx->nfailed++;
+      for (i = 0; i < op->nqueues; i++)
+        if (queue == op->queues[i])
+          break;
+      GNUNET_assert (i != op->nqueues);
+      op->queues[i]->overload += op->nres[i];
+    }
+    tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
     if (0 != tslot->nvals++)
       continue;
     fctx->tslots_filled++;
     if (queue->max_active == fctx->tslots_filled)
-      adapt_parallelism (queue, op->failed);
+      adapt_parallelism (queue);
   }
 }
 
@@ -940,7 +1030,7 @@ GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
 {
   struct GNUNET_TESTBED_Operation *op;
 
-  op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation));
+  op = GNUNET_new (struct GNUNET_TESTBED_Operation);
   op->start = start;
   op->state = OP_STATE_INIT;
   op->release = release;
@@ -964,7 +1054,7 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
   struct OperationQueue *queue;
   struct FeedbackCtx *fctx;
 
-  queue = GNUNET_malloc (sizeof (struct OperationQueue));
+  queue = GNUNET_new (struct OperationQueue);
   queue->type = type;
   if (OPERATION_QUEUE_TYPE_FIXED == type)
   {
@@ -972,28 +1062,26 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
   }
   else
   {
-    fctx = GNUNET_malloc (sizeof (struct FeedbackCtx));
+    fctx = GNUNET_new (struct FeedbackCtx);
     fctx->max_active_bound = max_active;
-    fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
+    fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
     queue->fctx = fctx;
-    adaptive_queue_set_max_active (queue, 1); /* start with 1 */
+    adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
   }
   return queue;
 }
 
 
 /**
- * Destroy an operation queue.  The queue MUST be empty
- * at this time.
+ * Cleanup the given operation queue.
  *
- * @param queue queue to destroy
+ * @param queue the operation queue to destroy
  */
-void
-GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
+static void
+queue_destroy (struct OperationQueue *queue)
 {
   struct FeedbackCtx *fctx;
-  
-  GNUNET_break (GNUNET_YES == is_queue_empty (queue));
+
   if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
   {
     cleanup_tslots (queue);
@@ -1005,6 +1093,27 @@ GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
 }
 
 
+/**
+ * Destroys an operation queue.  If the queue is still in use by operations it
+ * is marked as expired and its resources are released in the destructor
+ * GNUNET_TESTBED_operations_fini().
+ *
+ * @param queue queue to destroy
+ */
+void
+GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
+{
+  if (GNUNET_YES != is_queue_empty (queue))
+  {
+    GNUNET_assert (0 == queue->expired);  /* Are you calling twice on same queue? */
+    queue->expired = 1;
+    GNUNET_array_append (expired_opqs, n_expired_opqs, queue);
+    return;
+  }
+  queue_destroy (queue);
+}
+
+
 /**
  * Destroys the operation queue if it is empty.  If not empty return GNUNET_NO.
  *
@@ -1060,8 +1169,9 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
   struct QueueEntry *entry;
 
   queue->max_active = max_active;
-  while ( (queue->active > queue->max_active)
-          && (NULL != (entry = queue->rq_head)) )
+  queue->overload = 0;
+  while ((queue->active > queue->max_active)
+         && (NULL != (entry = queue->rq_head)))
     defer (entry->op);
   recheck_waiting (queue);
 }
@@ -1150,11 +1260,11 @@ GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
   GNUNET_assert (OP_STATE_ACTIVE == op->state);
   change_state (op, OP_STATE_INACTIVE);
   nqueues = op->nqueues;
-  ms = sizeof (struct OperationQueue *) * nqueues;
+  ms = sizeof(struct OperationQueue *) * nqueues;
   queues = GNUNET_malloc (ms);
   /* Cloning is needed as the operation be released by waiting operations and
      hence its nqueues memory ptr will be freed */
-  GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
+  GNUNET_memcpy (queues, op->queues, ms);
   for (i = 0; i < nqueues; i++)
     recheck_waiting (queues[i]);
   GNUNET_free (queues);
@@ -1171,7 +1281,6 @@ GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
 void
 GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
 {
-
   GNUNET_assert (OP_STATE_INACTIVE == op->state);
   change_state (op, OP_STATE_ACTIVE);
 }
@@ -1186,7 +1295,7 @@ GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
 void
 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
 {
-  struct QueueEntry *entry;  
+  struct QueueEntry *entry;
   struct OperationQueue *opq;
   unsigned int i;
 
@@ -1202,20 +1311,22 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
   if (OP_STATE_ACTIVE == op->state)
     update_tslots (op);
   GNUNET_assert (NULL != op->queues);
-  GNUNET_assert (NULL != op->qentries);  
+  GNUNET_assert (NULL != op->qentries);
   for (i = 0; i < op->nqueues; i++)
   {
     entry = op->qentries[i];
     remove_queue_entry (op, i);
     opq = op->queues[i];
     switch (op->state)
-    {      
+    {
     case OP_STATE_INIT:
     case OP_STATE_INACTIVE:
       GNUNET_assert (0);
       break;
-    case OP_STATE_WAITING:      
+
+    case OP_STATE_WAITING:
       break;
+
     case OP_STATE_ACTIVE:
     case OP_STATE_READY:
       GNUNET_assert (0 != opq->active);
@@ -1223,7 +1334,7 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
       opq->active -= entry->nres;
       recheck_waiting (opq);
       break;
-    }    
+    }
     GNUNET_free (entry);
   }
   GNUNET_free_non_null (op->qentries);
@@ -1247,4 +1358,30 @@ GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
 }
 
 
+/**
+ * Cleanup expired operation queues.  While doing so, also check for any
+ * operations which are not completed and warn about them.
+ */
+void __attribute__ ((destructor))
+GNUNET_TESTBED_operations_fini ()
+{
+  struct OperationQueue *queue;
+  unsigned int i;
+  int warn = 0;
+
+  for (i = 0; i < n_expired_opqs; i++)
+  {
+    queue = expired_opqs[i];
+    if (GNUNET_NO == is_queue_empty (queue))
+      warn = 1;
+    queue_destroy (queue);
+  }
+  GNUNET_free_non_null (expired_opqs);
+  n_expired_opqs = 0;
+  if (warn)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Be disciplined.  Some operations were not marked as done.\n");
+}
+
+
 /* end of testbed_api_operations.c */