a96549810c658f6ccfa5722a3f91e430c9142b10
[oweals/gnunet.git] / src / testbed / testbed_api_operations.c
1 /*
2       This file is part of GNUnet
3       (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file testbed/testbed_api_operations.c
23  * @brief functions to manage operation queues
24  * @author Christian Grothoff
25  * @author Sree Harsha Totakura
26  */
27
28 #include "platform.h"
29 #include "testbed_api_operations.h"
30
31
32 /**
33  * An entry in the operation queue
34  */
35 struct QueueEntry
36 {
37   /**
38    * The next DLL pointer
39    */
40   struct QueueEntry *next;
41
42   /**
43    * The prev DLL pointer
44    */
45   struct QueueEntry *prev;
46
47   /**
48    * The operation this entry holds
49    */
50   struct GNUNET_TESTBED_Operation *op;
51
52   /**
53    * How many units of resources does the operation need
54    */
55   unsigned int nres;
56 };
57
58
59 /**
60  * Queue of operations where we can only support a certain
61  * number of concurrent operations of a particular type.
62  */
63 struct OperationQueue
64 {
65   /**
66    * DLL head for the wait queue.  Operations which are waiting for this
67    * operation queue are put here
68    */
69   struct QueueEntry *wq_head;
70
71   /**
72    * DLL tail for the wait queue.
73    */
74   struct QueueEntry *wq_tail;
75
76   /**
77    * DLL head for the ready queue.  Operations which are in this operation queue
78    * and are in ready state are put here
79    */
80   struct QueueEntry *rq_head;
81
82   /**
83    * DLL tail for the ready queue
84    */
85   struct QueueEntry *rq_tail;
86
87   /**
88    * DLL head for the active queue.  Operations which are in this operation
89    * queue and are currently active are put here
90    */
91   struct QueueEntry *aq_head;
92
93   /**
94    * DLL tail for the active queue.
95    */
96   struct QueueEntry *aq_tail;
97
98   /**
99    * DLL head for the inactive queue.  Operations which are inactive and can be
100    * evicted if the queues it holds are maxed out and another operation begins
101    * to wait on them.
102    */
103   struct QueueEntry *nq_head;
104
105   /**
106    * DLL tail for the inactive queue.
107    */
108   struct QueueEntry *nq_tail;
109
110   /**
111    * Number of operations that are currently active in this queue.
112    */
113   unsigned int active;
114
115   /**
116    * Max number of operations which can be active at any time in this queue
117    */
118   unsigned int max_active;
119
120 };
121
122
123 /**
124  * Operation state
125  */
126 enum OperationState
127 {
128   /**
129    * The operation is just created and is in initial state
130    */
131   OP_STATE_INIT,
132
133   /**
134    * The operation is currently waiting for resources
135    */
136   OP_STATE_WAITING,
137
138   /**
139    * The operation is ready to be started
140    */
141   OP_STATE_READY,
142
143   /**
144    * The operation has started and is active
145    */
146   OP_STATE_ACTIVE,
147
148   /**
149    * The operation is inactive.  It still holds resources on the operation
150    * queues.  However, this operation will be evicted when another operation
151    * requires resources from the maxed out queues this operation is holding
152    * resources from.
153    */
154   OP_STATE_INACTIVE
155 };
156
157
158 /**
159  * An entry in the ready queue (implemented as DLL)
160  */
161 struct ReadyQueueEntry
162 {
163   /**
164    * next ptr for DLL
165    */
166   struct ReadyQueueEntry *next;
167   
168   /**
169    * prev ptr for DLL
170    */
171   struct ReadyQueueEntry *prev;
172
173   /**
174    * The operation associated with this entry
175    */
176   struct GNUNET_TESTBED_Operation *op;
177 };
178
179
180 /**
181  * Opaque handle to an abstract operation to be executed by the testing framework.
182  */
183 struct GNUNET_TESTBED_Operation
184 {
185   /**
186    * Function to call when we have the resources to begin the operation.
187    */
188   OperationStart start;
189
190   /**
191    * Function to call to clean up after the operation (which may or may
192    * not have been started yet).
193    */
194   OperationRelease release;
195
196   /**
197    * Closure for callbacks.
198    */
199   void *cb_cls;
200
201   /**
202    * Array of operation queues this Operation belongs to.
203    */
204   struct OperationQueue **queues;
205
206   /**
207    * Array of operation queue entries corresponding to this operation in
208    * operation queues for this operation
209    */
210   struct QueueEntry **qentries;
211
212   /**
213    * Array of number of resources an operation need from each queue. The numbers
214    * in this array should correspond to the queues array
215    */
216   unsigned int *nres;
217
218   /**
219    * Entry corresponding to this operation in ready queue.  Will be NULL if the
220    * operation is not marked as READY
221    */
222   struct ReadyQueueEntry *rq_entry;
223
224   /**
225    * Number of queues in the operation queues array
226    */
227   unsigned int nqueues;
228
229   /**
230    * The state of the operation
231    */
232   enum OperationState state;
233
234 };
235
236 /**
237  * DLL head for the ready queue
238  */
239 struct ReadyQueueEntry *rq_head;
240
241 /**
242  * DLL tail for the ready queue
243  */
244 struct ReadyQueueEntry *rq_tail;
245
246 /**
247  * The id of the task to process the ready queue
248  */
249 GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
250
251
252 /**
253  * Removes a queue entry of an operation from one of the operation queues' lists
254  * depending on the state of the operation
255  *
256  * @param op the operation whose entry has to be removed
257  * @param index the index of the entry in the operation's array of queue entries
258  */
259 static void
260 remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
261 {
262   struct OperationQueue *opq;
263   struct QueueEntry *entry;
264   
265   opq = op->queues[index];
266   entry = op->qentries[index];
267   switch (op->state)
268   {
269   case OP_STATE_INIT:
270     GNUNET_assert (0);
271     break;
272   case OP_STATE_WAITING:
273     GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
274     break;
275   case OP_STATE_READY:
276     GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
277     break;
278   case OP_STATE_ACTIVE:
279     GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
280     break;
281   case OP_STATE_INACTIVE:
282     GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
283     break;
284   }
285 }
286
287
288 /**
289  * Changes the state of the operation while moving its associated queue entries
290  * in the operation's operation queues
291  *
292  * @param op the operation whose state has to be changed
293  * @param state the state the operation should have.  It cannot be OP_STATE_INIT
294  */
295 static void
296 change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
297 {
298   struct QueueEntry *entry;
299   struct OperationQueue *opq;
300   unsigned int cnt;
301   unsigned int s;
302   
303   GNUNET_assert (OP_STATE_INIT != state);
304   GNUNET_assert (NULL != op->queues);
305   GNUNET_assert (NULL != op->nres);
306   GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
307   GNUNET_assert (op->state != state);
308   for (cnt = 0; cnt < op->nqueues; cnt++)
309   {
310     if (OP_STATE_INIT == op->state)
311     {
312       entry = GNUNET_malloc (sizeof (struct QueueEntry));
313       entry->op = op;
314       entry->nres = op->nres[cnt];
315       s = cnt;
316       GNUNET_array_append (op->qentries, s, entry);      
317     }
318     else
319     {
320       entry = op->qentries[cnt];
321       remove_queue_entry (op, cnt);
322     }
323     opq = op->queues[cnt];
324     switch (state)
325     {
326     case OP_STATE_INIT:
327       GNUNET_assert (0);
328       break;
329     case OP_STATE_WAITING:
330       GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
331       break;
332     case OP_STATE_READY:
333       GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
334       break;
335     case OP_STATE_ACTIVE:
336       GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
337       break;
338     case OP_STATE_INACTIVE:
339       GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
340       break;
341     }
342   }
343   op->state = state;
344 }
345
346
347 /**
348  * Removes an operation from the ready queue.  Also stops the 'process_rq_task'
349  * if the given operation is the last one in the queue.
350  *
351  * @param op the operation to be removed
352  */
353 static void
354 rq_remove (struct GNUNET_TESTBED_Operation *op)
355 {  
356   GNUNET_assert (NULL != op->rq_entry);
357   GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
358   GNUNET_free (op->rq_entry);
359   op->rq_entry = NULL;
360   if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) )
361   {
362     GNUNET_SCHEDULER_cancel (process_rq_task_id);
363     process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
364   }
365 }
366
367
368 /**
369  * Processes the ready queue by calling the operation start callback of the
370  * operation at the head.  The operation is then removed from the queue.  The
371  * task is scheduled to run again immediately until no more operations are in
372  * the ready queue.
373  *
374  * @param cls NULL
375  * @param tc scheduler task context.  Not used.
376  */
377 static void
378 process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
379 {
380   struct GNUNET_TESTBED_Operation *op;
381
382   process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
383   GNUNET_assert (NULL != rq_head);
384   GNUNET_assert (NULL != (op = rq_head->op));
385   rq_remove (op);
386   if (NULL != rq_head)
387     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
388   change_state (op, OP_STATE_ACTIVE);
389   if (NULL != op->start)
390     op->start (op->cb_cls);  
391 }
392
393
394 /**
395  * Adds the operation to the ready queue and starts the 'process_rq_task'
396  *
397  * @param op the operation to be queued
398  */
399 static void
400 rq_add (struct GNUNET_TESTBED_Operation *op)
401 {
402   struct ReadyQueueEntry *rq_entry;
403
404   GNUNET_assert (NULL == op->rq_entry);
405   rq_entry = GNUNET_malloc (sizeof (struct ReadyQueueEntry));
406   rq_entry->op = op;
407   GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
408   op->rq_entry = rq_entry;
409   if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id)
410     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
411 }
412
413
414 /**
415  * Checks if the given operation queue is empty or not
416  *
417  * @param opq the operation queue
418  * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
419  *           otherwise
420  */
421 static int
422 is_queue_empty (struct OperationQueue *opq)
423 {
424   if ( (NULL != opq->wq_head)
425        || (NULL != opq->rq_head)
426        || (NULL != opq->aq_head)
427        || (NULL != opq->nq_head) )
428     return GNUNET_NO;
429   return GNUNET_YES;
430 }
431
432
433 /**
434  * Checks if the given operation queue has enough resources to provide for the
435  * operation of the given queue entry.  It also checks if any inactive
436  * operations are to be released in order to accommodate the needed resources
437  * and returns them as an array.
438  *
439  * @param opq the operation queue to check for resource accommodation
440  * @param entry the operation queue entry whose operation's resources are to be
441  *          accommodated
442  * @param ops_ pointer to return the array of operations which are to be released
443  *          in order to accommodate the new operation.  Can be NULL
444  * @param n_ops_ the number of operations in ops_
445  * @return GNUNET_YES if the given entry's operation can be accommodated in this
446  *           queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
447  *           be set to NULL and 0 respectively.
448  */
449 static int
450 decide_capacity (struct OperationQueue *opq,
451                  struct QueueEntry *entry,
452                  struct GNUNET_TESTBED_Operation ***ops_,
453                  unsigned int *n_ops_)
454 {
455   struct QueueEntry **evict_entries;
456   struct GNUNET_TESTBED_Operation **ops;
457   struct GNUNET_TESTBED_Operation *op;
458   unsigned int n_ops;
459   unsigned int n_evict_entries;
460   unsigned int need;
461   int deficit;
462   int rval;
463
464   GNUNET_assert (NULL != (op = entry->op));
465   GNUNET_assert (0 < (need = entry->nres));
466   GNUNET_assert (opq->active <= opq->max_active);
467   ops = NULL;
468   n_ops = 0;
469   evict_entries = NULL;
470   n_evict_entries = 0;
471   rval = GNUNET_OK;
472   if ((opq->active + need) <= opq->max_active)
473     goto ret;
474   deficit = need - (opq->max_active - opq->active);
475   for (entry = opq->nq_head;
476        (0 < deficit) && (NULL != entry);
477        entry = entry->next)
478   {
479     GNUNET_array_append (evict_entries, n_evict_entries, entry);
480     deficit -= entry->nres;
481   }
482   if (0 < deficit)
483   {
484     rval = GNUNET_NO;
485     goto ret;
486   }
487   for (n_ops = 0; n_ops < n_evict_entries;)
488   {
489     op = evict_entries[n_ops]->op;
490     GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
491   }
492
493  ret:
494   GNUNET_free_non_null (evict_entries);  
495   if (NULL != ops_) *ops_ = ops;
496   if (NULL != n_ops_) *n_ops_ = n_ops;
497   return rval;
498 }
499
500
501 /**
502  * Merges an array of operations into another, eliminating duplicates.  No
503  * ordering is guaranteed.
504  *
505  * @param old the array into which the merging is done.
506  * @param n_old the number of operations in old array
507  * @param new the array from which operations are to be merged
508  * @param n_new the number of operations in new array
509  */
510 static void
511 merge_ops (struct GNUNET_TESTBED_Operation ***old,
512            unsigned int *n_old,
513            struct GNUNET_TESTBED_Operation **new,
514            unsigned int n_new)
515 {
516   struct GNUNET_TESTBED_Operation **cur;
517   unsigned int i;
518   unsigned int j;
519   unsigned int n_cur;
520  
521   GNUNET_assert (NULL != old);
522   n_cur = *n_old;
523   cur = *old;
524   for (i = 0; i < n_new; i++)
525   {    
526     for (j = 0; j < *n_old; j++)
527     {
528       if (new[i] == cur[j])
529         break;
530     }
531     if (j < *n_old)
532       continue;
533     GNUNET_array_append (cur, n_cur, new[j]);
534   }
535   *old = cur;
536   *n_old = n_cur;
537 }
538            
539
540
541 /**
542  * Checks for the readiness of an operation and schedules a operation start task
543  *
544  * @param op the operation
545  */
546 static void
547 check_readiness (struct GNUNET_TESTBED_Operation *op)
548 {
549   struct GNUNET_TESTBED_Operation **evict_ops;
550   struct GNUNET_TESTBED_Operation **ops;
551   unsigned int n_ops;
552   unsigned int n_evict_ops;
553   unsigned int i;
554
555   GNUNET_assert (NULL == op->rq_entry);
556   GNUNET_assert (OP_STATE_WAITING == op->state);
557   evict_ops = NULL;
558   n_evict_ops = 0;
559   for (i = 0; i < op->nqueues; i++)
560   {
561     ops = NULL;
562     n_ops = 0;
563     if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
564                                       &ops, &n_ops))
565     {
566       GNUNET_free_non_null (evict_ops);
567       return;
568     }
569     if (NULL == ops)
570       continue;
571     merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
572     GNUNET_free (ops);    
573   }
574   if (NULL != evict_ops)
575   {
576     for (i = 0; i < n_evict_ops; i++)
577       GNUNET_TESTBED_operation_release_ (evict_ops[i]);
578     GNUNET_free (evict_ops);
579     evict_ops = NULL;
580     /* Evicting the operations should schedule this operation */
581     GNUNET_assert (OP_STATE_READY == op->state);
582     return;
583   }
584   for (i = 0; i < op->nqueues; i++)
585     op->queues[i]->active += op->nres[i];
586   change_state (op, OP_STATE_READY);
587   rq_add (op);
588 }
589
590
591 /**
592  * Defers a ready to be executed operation back to waiting
593  *
594  * @param op the operation to defer
595  */
596 static void
597 defer (struct GNUNET_TESTBED_Operation *op)
598 {
599   unsigned int i;
600
601   GNUNET_assert (OP_STATE_READY == op->state);
602   rq_remove (op);
603   for (i = 0; i < op->nqueues; i++)
604     op->queues[i]->active--;
605   change_state (op, OP_STATE_WAITING);
606 }
607
608
609 /**
610  * Create an 'operation' to be performed.
611  *
612  * @param cls closure for the callbacks
613  * @param start function to call to start the operation
614  * @param release function to call to close down the operation
615  * @return handle to the operation
616  */
617 struct GNUNET_TESTBED_Operation *
618 GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
619                                   OperationRelease release)
620 {
621   struct GNUNET_TESTBED_Operation *op;
622
623   op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation));
624   op->start = start;
625   op->state = OP_STATE_INIT;
626   op->release = release;
627   op->cb_cls = cls;
628   return op;
629 }
630
631
632 /**
633  * Create an operation queue.
634  *
635  * @param max_active maximum number of operations in this
636  *        queue that can be active in parallel at the same time
637  * @return handle to the queue
638  */
639 struct OperationQueue *
640 GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
641 {
642   struct OperationQueue *queue;
643
644   queue = GNUNET_malloc (sizeof (struct OperationQueue));
645   queue->max_active = max_active;
646   return queue;
647 }
648
649
650 /**
651  * Destroy an operation queue.  The queue MUST be empty
652  * at this time.
653  *
654  * @param queue queue to destroy
655  */
656 void
657 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
658 {
659   GNUNET_break (GNUNET_YES == is_queue_empty (queue));
660   GNUNET_free (queue);
661 }
662
663
664 /**
665  * Destroys the operation queue if it is empty.  If not empty return GNUNET_NO.
666  *
667  * @param queue the queue to destroy if empty
668  * @return GNUNET_YES if the queue is destroyed.  GNUNET_NO if not (because it
669  *           is not empty)
670  */
671 int
672 GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
673 {
674   if (GNUNET_NO == is_queue_empty (queue))
675     return GNUNET_NO;
676   GNUNET_TESTBED_operation_queue_destroy_ (queue);
677   return GNUNET_YES;
678 }
679
680
681 /**
682  * Rechecks if any of the operations in the given operation queue's waiting list
683  * can be made active
684  *
685  * @param opq the operation queue
686  */
687 static void
688 recheck_waiting (struct OperationQueue *opq)
689 {
690   struct QueueEntry *entry;
691   struct QueueEntry *entry2;
692
693   entry = opq->wq_head;
694   while (NULL != entry)
695   {
696     entry2 = entry->next;
697     check_readiness (entry->op);
698     entry = entry2;
699   }
700 }
701
702
703 /**
704  * Function to reset the maximum number of operations in the given queue. If
705  * max_active is lesser than the number of currently active operations, the
706  * active operations are not stopped immediately.
707  *
708  * @param queue the operation queue which has to be modified
709  * @param max_active the new maximum number of active operations
710  */
711 void
712 GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
713                                                   unsigned int max_active)
714 {
715   struct QueueEntry *entry;
716
717   queue->max_active = max_active;
718   while ( (queue->active > queue->max_active)
719           && (NULL != (entry = queue->rq_head)) )
720     defer (entry->op);
721   recheck_waiting (queue);
722 }
723
724
725 /**
726  * Add an operation to a queue.  An operation can be in multiple queues at
727  * once. Once the operation is inserted into all the queues
728  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
729  * waiting for the operation to become active.
730  *
731  * @param queue queue to add the operation to
732  * @param op operation to add to the queue
733  * @param nres the number of units of the resources of queue needed by the
734  *          operation. Should be greater than 0.
735  */
736 void
737 GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
738                                          struct GNUNET_TESTBED_Operation *op,
739                                          unsigned int nres)
740 {
741   unsigned int qsize;
742
743   GNUNET_assert (0 < nres);
744   qsize = op->nqueues;
745   GNUNET_array_append (op->queues, op->nqueues, queue);
746   GNUNET_array_append (op->nres, qsize, nres);
747   GNUNET_assert (qsize == op->nqueues);
748 }
749
750
751 /**
752  * Add an operation to a queue.  An operation can be in multiple queues at
753  * once. Once the operation is inserted into all the queues
754  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
755  * waiting for the operation to become active. The operation is assumed to take
756  * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
757  * requires more than 1
758  *
759  * @param queue queue to add the operation to
760  * @param op operation to add to the queue
761  */
762 void
763 GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
764                                         struct GNUNET_TESTBED_Operation *op)
765 {
766   return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
767 }
768
769
770 /**
771  * Marks the given operation as waiting on the queues.  Once all queues permit
772  * the operation to become active, the operation will be activated.  The actual
773  * activation will occur in a separate task (thus allowing multiple queue
774  * insertions to be made without having the first one instantly trigger the
775  * operation if the first queue has sufficient resources).
776  *
777  * @param op the operation to marks as waiting
778  */
779 void
780 GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
781 {
782   GNUNET_assert (NULL == op->rq_entry);
783   change_state (op, OP_STATE_WAITING);
784   check_readiness (op);
785 }
786
787
788 /**
789  * Marks an active operation as inactive - the operation will be kept in a
790  * ready-to-be-released state and continues to hold resources until another
791  * operation contents for them.
792  *
793  * @param op the operation to be marked as inactive.  The operation start
794  *          callback should have been called before for this operation to mark
795  *          it as inactive.
796  */
797 void
798 GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
799 {
800   struct OperationQueue **queues;
801   size_t ms;
802   unsigned int nqueues;
803   unsigned int i;
804
805   GNUNET_assert (OP_STATE_ACTIVE == op->state);
806   change_state (op, OP_STATE_INACTIVE);
807   nqueues = op->nqueues;
808   ms = sizeof (struct OperationQueue *) * nqueues;
809   queues = GNUNET_malloc (ms);
810   GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
811   for (i = 0; i < nqueues; i++)
812     recheck_waiting (queues[i]);
813   GNUNET_free (queues);
814 }
815
816
817 /**
818  * Marks and inactive operation as active.  This fuction should be called to
819  * ensure that the oprelease callback will not be called until it is either
820  * marked as inactive or released.
821  *
822  * @param op the operation to be marked as active
823  */
824 void
825 GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
826 {
827
828   GNUNET_assert (OP_STATE_INACTIVE == op->state);
829   change_state (op, OP_STATE_ACTIVE);
830 }
831
832
833 /**
834  * An operation is 'done' (was cancelled or finished); remove
835  * it from the queues and release associated resources.
836  *
837  * @param op operation that finished
838  */
839 void
840 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
841 {
842   struct QueueEntry *entry;  
843   struct OperationQueue *opq;
844   unsigned int i;
845
846   if (OP_STATE_INIT == op->state)
847   {
848     GNUNET_free (op);
849     return;
850   }
851   if (OP_STATE_READY == op->state)
852     rq_remove (op);
853   if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
854     GNUNET_TESTBED_operation_activate_ (op);
855   GNUNET_assert (NULL != op->queues);
856   GNUNET_assert (NULL != op->qentries);
857   for (i = 0; i < op->nqueues; i++)
858   {
859     entry = op->qentries[i];
860     remove_queue_entry (op, i);
861     opq = op->queues[i];
862     switch (op->state)
863     {      
864     case OP_STATE_INIT:
865     case OP_STATE_INACTIVE:
866       GNUNET_assert (0);
867       break;
868     case OP_STATE_WAITING:      
869       break;
870     case OP_STATE_READY:
871     case OP_STATE_ACTIVE:
872       GNUNET_assert (0 != opq->active);
873       GNUNET_assert (opq->active >= entry->nres);
874       opq->active -= entry->nres;
875       recheck_waiting (opq);
876       break;
877     }    
878     GNUNET_free (entry);
879   }
880   GNUNET_free_non_null (op->qentries);
881   GNUNET_free (op->queues);
882   GNUNET_free (op->nres);
883   if (NULL != op->release)
884     op->release (op->cb_cls);
885   GNUNET_free (op);
886 }
887
888
889 /* end of testbed_api_operations.c */