- docs
[oweals/gnunet.git] / src / testbed / testbed_api_operations.c
1 /*
2       This file is part of GNUnet
3       (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file testbed/testbed_api_operations.c
23  * @brief functions to manage operation queues
24  * @author Christian Grothoff
25  * @author Sree Harsha Totakura
26  */
27
28 #include "platform.h"
29 #include "testbed_api_operations.h"
30
31
32 /**
33  * An entry in the operation queue
34  */
35 struct QueueEntry
36 {
37   /**
38    * The next DLL pointer
39    */
40   struct QueueEntry *next;
41
42   /**
43    * The prev DLL pointer
44    */
45   struct QueueEntry *prev;
46
47   /**
48    * The operation this entry holds
49    */
50   struct GNUNET_TESTBED_Operation *op;
51
52   /**
53    * How many units of resources does the operation need
54    */
55   unsigned int nres;
56 };
57
58
59 /**
60  * Queue of operations where we can only support a certain
61  * number of concurrent operations of a particular type.
62  */
63 struct OperationQueue
64 {
65   /**
66    * DLL head for the wait queue.  Operations which are waiting for this
67    * operation queue are put here
68    */
69   struct QueueEntry *wq_head;
70
71   /**
72    * DLL tail for the wait queue.
73    */
74   struct QueueEntry *wq_tail;
75
76   /**
77    * DLL head for the ready queue.  Operations which are in this operation queue
78    * and are in ready state are put here
79    */
80   struct QueueEntry *rq_head;
81
82   /**
83    * DLL tail for the ready queue
84    */
85   struct QueueEntry *rq_tail;
86
87   /**
88    * DLL head for the active queue.  Operations which are in this operation
89    * queue and are currently active are put here
90    */
91   struct QueueEntry *aq_head;
92
93   /**
94    * DLL tail for the active queue.
95    */
96   struct QueueEntry *aq_tail;
97
98   /**
99    * DLL head for the inactive queue.  Operations which are inactive and can be
100    * evicted if the queues it holds are maxed out and another operation begins
101    * to wait on them.
102    */
103   struct QueueEntry *nq_head;
104
105   /**
106    * DLL tail for the inactive queue.
107    */
108   struct QueueEntry *nq_tail;
109
110   /**
111    * Number of operations that are currently active in this queue.
112    */
113   unsigned int active;
114
115   /**
116    * Max number of operations which can be active at any time in this queue
117    */
118   unsigned int max_active;
119
120 };
121
122
123 /**
124  * Operation state
125  */
126 enum OperationState
127 {
128   /**
129    * The operation is just created and is in initial state
130    */
131   OP_STATE_INIT,
132
133   /**
134    * The operation is currently waiting for resources
135    */
136   OP_STATE_WAITING,
137
138   /**
139    * The operation is ready to be started
140    */
141   OP_STATE_READY,
142
143   /**
144    * The operation has started and is active
145    */
146   OP_STATE_ACTIVE,
147
148   /**
149    * The operation is inactive.  It still holds resources on the operation
150    * queues.  However, this operation will be evicted when another operation
151    * requires resources from the maxed out queues this operation is holding
152    * resources from.
153    */
154   OP_STATE_INACTIVE
155 };
156
157
158 /**
159  * An entry in the ready queue (implemented as DLL)
160  */
161 struct ReadyQueueEntry
162 {
163   /**
164    * next ptr for DLL
165    */
166   struct ReadyQueueEntry *next;
167   
168   /**
169    * prev ptr for DLL
170    */
171   struct ReadyQueueEntry *prev;
172
173   /**
174    * The operation associated with this entry
175    */
176   struct GNUNET_TESTBED_Operation *op;
177 };
178
179
180 /**
181  * Opaque handle to an abstract operation to be executed by the testing framework.
182  */
183 struct GNUNET_TESTBED_Operation
184 {
185   /**
186    * Function to call when we have the resources to begin the operation.
187    */
188   OperationStart start;
189
190   /**
191    * Function to call to clean up after the operation (which may or may
192    * not have been started yet).
193    */
194   OperationRelease release;
195
196   /**
197    * Closure for callbacks.
198    */
199   void *cb_cls;
200
201   /**
202    * Array of operation queues this Operation belongs to.
203    */
204   struct OperationQueue **queues;
205
206   /**
207    * Array of operation queue entries corresponding to this operation in
208    * operation queues for this operation
209    */
210   struct QueueEntry **qentries;
211
212   /**
213    * Array of number of resources an operation need from each queue. The numbers
214    * in this array should correspond to the queues array
215    */
216   unsigned int *nres;
217
218   /**
219    * Entry corresponding to this operation in ready queue.  Will be NULL if the
220    * operation is not marked as READY
221    */
222   struct ReadyQueueEntry *rq_entry;
223
224   /**
225    * Number of queues in the operation queues array
226    */
227   unsigned int nqueues;
228
229   /**
230    * The state of the operation
231    */
232   enum OperationState state;
233
234 };
235
236 /**
237  * DLL head for the ready queue
238  */
239 struct ReadyQueueEntry *rq_head;
240
241 /**
242  * DLL tail for the ready queue
243  */
244 struct ReadyQueueEntry *rq_tail;
245
246 /**
247  * The id of the task to process the ready queue
248  */
249 GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
250
251
252 /**
253  * Removes a queue entry of an operation from one of the operation queues' lists
254  * depending on the state of the operation
255  *
256  * @param op the operation whose entry has to be removed
257  * @param index the index of the entry in the operation's array of queue entries
258  */
259 static void
260 remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
261 {
262   struct OperationQueue *opq;
263   struct QueueEntry *entry;
264   
265   opq = op->queues[index];
266   entry = op->qentries[index];
267   switch (op->state)
268   {
269   case OP_STATE_INIT:
270     GNUNET_assert (0);
271     break;
272   case OP_STATE_WAITING:
273     GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
274     break;
275   case OP_STATE_READY:
276     GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
277     break;
278   case OP_STATE_ACTIVE:
279     GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
280     break;
281   case OP_STATE_INACTIVE:
282     GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
283     break;
284   }
285 }
286
287
288 /**
289  * Changes the state of the operation while moving its associated queue entries
290  * in the operation's operation queues
291  *
292  * @param op the operation whose state has to be changed
293  * @param state the state the operation should have.  It cannot be OP_STATE_INIT
294  */
295 static void
296 change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
297 {
298   struct QueueEntry *entry;
299   struct OperationQueue *opq;
300   unsigned int cnt;
301   unsigned int s;
302   
303   GNUNET_assert (OP_STATE_INIT != state);
304   GNUNET_assert (NULL != op->queues);
305   GNUNET_assert (NULL != op->nres);
306   GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
307   GNUNET_assert (op->state != state);
308   for (cnt = 0; cnt < op->nqueues; cnt++)
309   {
310     if (OP_STATE_INIT == op->state)
311     {
312       entry = GNUNET_malloc (sizeof (struct QueueEntry));
313       entry->op = op;
314       entry->nres = op->nres[cnt];
315       s = cnt;
316       GNUNET_array_append (op->qentries, s, entry);      
317     }
318     else
319     {
320       entry = op->qentries[cnt];
321       remove_queue_entry (op, cnt);
322     }
323     opq = op->queues[cnt];
324     switch (state)
325     {
326     case OP_STATE_INIT:
327       GNUNET_assert (0);
328       break;
329     case OP_STATE_WAITING:
330       GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
331       break;
332     case OP_STATE_READY:
333       GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
334       break;
335     case OP_STATE_ACTIVE:
336       GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
337       break;
338     case OP_STATE_INACTIVE:
339       GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
340       break;
341     }
342   }
343   op->state = state;
344 }
345
346
347 /**
348  * Removes an operation from the ready queue.  Also stops the 'process_rq_task'
349  * if the given operation is the last one in the queue.
350  *
351  * @param op the operation to be removed
352  */
353 static void
354 rq_remove (struct GNUNET_TESTBED_Operation *op)
355 {  
356   GNUNET_assert (NULL != op->rq_entry);
357   GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
358   GNUNET_free (op->rq_entry);
359   op->rq_entry = NULL;
360   if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) )
361   {
362     GNUNET_SCHEDULER_cancel (process_rq_task_id);
363     process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
364   }
365 }
366
367
368 /**
369  * Processes the ready queue by calling the operation start callback of the
370  * operation at the head.  The operation is then removed from the queue.  The
371  * task is scheduled to run again immediately until no more operations are in
372  * the ready queue.
373  *
374  * @param cls NULL
375  * @param tc scheduler task context.  Not used.
376  */
377 static void
378 process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
379 {
380   struct GNUNET_TESTBED_Operation *op;
381
382   process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
383   GNUNET_assert (NULL != rq_head);
384   GNUNET_assert (NULL != (op = rq_head->op));
385   rq_remove (op);
386   if (NULL != rq_head)
387     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
388   change_state (op, OP_STATE_ACTIVE);
389   if (NULL != op->start)
390     op->start (op->cb_cls);  
391 }
392
393
394 /**
395  * Adds the operation to the ready queue and starts the 'process_rq_task'
396  *
397  * @param op the operation to be queued
398  */
399 static void
400 rq_add (struct GNUNET_TESTBED_Operation *op)
401 {
402   struct ReadyQueueEntry *rq_entry;
403
404   GNUNET_assert (NULL == op->rq_entry);
405   rq_entry = GNUNET_malloc (sizeof (struct ReadyQueueEntry));
406   rq_entry->op = op;
407   GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
408   op->rq_entry = rq_entry;
409   if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id)
410     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
411 }
412
413
414 /**
415  * Checks if the given operation queue is empty or not
416  *
417  * @param opq the operation queue
418  * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
419  *           otherwise
420  */
421 static int
422 is_queue_empty (struct OperationQueue *opq)
423 {
424   if ( (NULL != opq->wq_head)
425        || (NULL != opq->rq_head)
426        || (NULL != opq->aq_head)
427        || (NULL != opq->nq_head) )
428     return GNUNET_NO;
429   return GNUNET_YES;
430 }
431
432
433 /**
434  * Checks if the given operation queue has enough resources to provide for the
435  * operation of the given queue entry.  It also checks if any inactive
436  * operations are to be released in order to accommodate the needed resources
437  * and returns them as an array.
438  *
439  * @param opq the operation queue to check for resource accommodation
440  * @param entry the operation queue entry whose operation's resources are to be
441  *          accommodated
442  * @param ops_ pointer to return the array of operations which are to be released
443  *          in order to accommodate the new operation.  Can be NULL
444  * @param n_ops_ the number of operations in ops_
445  * @return GNUNET_YES if the given entry's operation can be accommodated in this
446  *           queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
447  *           be set to NULL and 0 respectively.
448  */
449 static int
450 decide_capacity (struct OperationQueue *opq,
451                  struct QueueEntry *entry,
452                  struct GNUNET_TESTBED_Operation ***ops_,
453                  unsigned int *n_ops_)
454 {
455   struct QueueEntry **evict_entries;
456   struct GNUNET_TESTBED_Operation **ops;
457   struct GNUNET_TESTBED_Operation *op;
458   unsigned int n_ops;
459   unsigned int n_evict_entries;
460   unsigned int need;
461   int deficit;
462   int rval;
463
464   GNUNET_assert (NULL != (op = entry->op));
465   GNUNET_assert (0 < (need = entry->nres));
466   ops = NULL;
467   n_ops = 0;
468   evict_entries = NULL;
469   n_evict_entries = 0;
470   rval = GNUNET_YES;
471   if (opq->active > opq->max_active)
472   {
473     need += opq->active - opq->max_active;
474     rval = GNUNET_NO;
475     goto ret;
476   }
477   if ((opq->active + need) <= opq->max_active)
478     goto ret;
479   deficit = need - (opq->max_active - opq->active);
480   for (entry = opq->nq_head;
481        (0 < deficit) && (NULL != entry);
482        entry = entry->next)
483   {
484     GNUNET_array_append (evict_entries, n_evict_entries, entry);
485     deficit -= entry->nres;
486   }
487   if (0 < deficit)
488   {
489     rval = GNUNET_NO;
490     goto ret;
491   }
492   for (n_ops = 0; n_ops < n_evict_entries;)
493   {
494     op = evict_entries[n_ops]->op;
495     GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
496   }
497
498  ret:
499   GNUNET_free_non_null (evict_entries);  
500   if (NULL != ops_)
501     *ops_ = ops;
502   else
503     GNUNET_free (ops);
504   if (NULL != n_ops_)
505     *n_ops_ = n_ops;
506   return rval;
507 }
508
509
510 /**
511  * Merges an array of operations into another, eliminating duplicates.  No
512  * ordering is guaranteed.
513  *
514  * @param old the array into which the merging is done.
515  * @param n_old the number of operations in old array
516  * @param new the array from which operations are to be merged
517  * @param n_new the number of operations in new array
518  */
519 static void
520 merge_ops (struct GNUNET_TESTBED_Operation ***old,
521            unsigned int *n_old,
522            struct GNUNET_TESTBED_Operation **new,
523            unsigned int n_new)
524 {
525   struct GNUNET_TESTBED_Operation **cur;
526   unsigned int i;
527   unsigned int j;
528   unsigned int n_cur;
529  
530   GNUNET_assert (NULL != old);
531   n_cur = *n_old;
532   cur = *old;
533   for (i = 0; i < n_new; i++)
534   {    
535     for (j = 0; j < *n_old; j++)
536     {
537       if (new[i] == cur[j])
538         break;
539     }
540     if (j < *n_old)
541       continue;
542     GNUNET_array_append (cur, n_cur, new[j]);
543   }
544   *old = cur;
545   *n_old = n_cur;
546 }
547            
548
549
550 /**
551  * Checks for the readiness of an operation and schedules a operation start task
552  *
553  * @param op the operation
554  */
555 static int
556 check_readiness (struct GNUNET_TESTBED_Operation *op)
557 {
558   struct GNUNET_TESTBED_Operation **evict_ops;
559   struct GNUNET_TESTBED_Operation **ops;
560   unsigned int n_ops;
561   unsigned int n_evict_ops;
562   unsigned int i;
563
564   GNUNET_assert (NULL == op->rq_entry);
565   GNUNET_assert (OP_STATE_WAITING == op->state);
566   evict_ops = NULL;
567   n_evict_ops = 0;
568   for (i = 0; i < op->nqueues; i++)
569   {
570     ops = NULL;
571     n_ops = 0;
572     if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
573                                       &ops, &n_ops))
574     {
575       GNUNET_free_non_null (evict_ops);
576       return GNUNET_NO;
577     }
578     if (NULL == ops)
579       continue;
580     merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
581     GNUNET_free (ops);    
582   }
583   if (NULL != evict_ops)
584   {
585     for (i = 0; i < n_evict_ops; i++)
586       GNUNET_TESTBED_operation_release_ (evict_ops[i]);
587     GNUNET_free (evict_ops);
588     evict_ops = NULL;
589     /* Evicting the operations should schedule this operation */
590     GNUNET_assert (OP_STATE_READY == op->state);
591     return GNUNET_YES;
592   }
593   for (i = 0; i < op->nqueues; i++)
594     op->queues[i]->active += op->nres[i];
595   change_state (op, OP_STATE_READY);
596   rq_add (op);
597   return GNUNET_YES;
598 }
599
600
601 /**
602  * Defers a ready to be executed operation back to waiting
603  *
604  * @param op the operation to defer
605  */
606 static void
607 defer (struct GNUNET_TESTBED_Operation *op)
608 {
609   unsigned int i;
610
611   GNUNET_assert (OP_STATE_READY == op->state);
612   rq_remove (op);
613   for (i = 0; i < op->nqueues; i++)
614     op->queues[i]->active--;
615   change_state (op, OP_STATE_WAITING);
616 }
617
618
619 /**
620  * Create an 'operation' to be performed.
621  *
622  * @param cls closure for the callbacks
623  * @param start function to call to start the operation
624  * @param release function to call to close down the operation
625  * @return handle to the operation
626  */
627 struct GNUNET_TESTBED_Operation *
628 GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
629                                   OperationRelease release)
630 {
631   struct GNUNET_TESTBED_Operation *op;
632
633   op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation));
634   op->start = start;
635   op->state = OP_STATE_INIT;
636   op->release = release;
637   op->cb_cls = cls;
638   return op;
639 }
640
641
642 /**
643  * Create an operation queue.
644  *
645  * @param max_active maximum number of operations in this
646  *        queue that can be active in parallel at the same time
647  * @return handle to the queue
648  */
649 struct OperationQueue *
650 GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
651 {
652   struct OperationQueue *queue;
653
654   queue = GNUNET_malloc (sizeof (struct OperationQueue));
655   queue->max_active = max_active;
656   return queue;
657 }
658
659
660 /**
661  * Destroy an operation queue.  The queue MUST be empty
662  * at this time.
663  *
664  * @param queue queue to destroy
665  */
666 void
667 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
668 {
669   GNUNET_break (GNUNET_YES == is_queue_empty (queue));
670   GNUNET_free (queue);
671 }
672
673
674 /**
675  * Destroys the operation queue if it is empty.  If not empty return GNUNET_NO.
676  *
677  * @param queue the queue to destroy if empty
678  * @return GNUNET_YES if the queue is destroyed.  GNUNET_NO if not (because it
679  *           is not empty)
680  */
681 int
682 GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
683 {
684   if (GNUNET_NO == is_queue_empty (queue))
685     return GNUNET_NO;
686   GNUNET_TESTBED_operation_queue_destroy_ (queue);
687   return GNUNET_YES;
688 }
689
690
691 /**
692  * Rechecks if any of the operations in the given operation queue's waiting list
693  * can be made active
694  *
695  * @param opq the operation queue
696  */
697 static void
698 recheck_waiting (struct OperationQueue *opq)
699 {
700   struct QueueEntry *entry;
701   struct QueueEntry *entry2;
702
703   entry = opq->wq_head;
704   while (NULL != entry)
705   {
706     entry2 = entry->next;
707     if (GNUNET_NO == check_readiness (entry->op))
708       break;
709     entry = entry2;
710   }
711 }
712
713
714 /**
715  * Function to reset the maximum number of operations in the given queue. If
716  * max_active is lesser than the number of currently active operations, the
717  * active operations are not stopped immediately.
718  *
719  * @param queue the operation queue which has to be modified
720  * @param max_active the new maximum number of active operations
721  */
722 void
723 GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
724                                                   unsigned int max_active)
725 {
726   struct QueueEntry *entry;
727
728   queue->max_active = max_active;
729   while ( (queue->active > queue->max_active)
730           && (NULL != (entry = queue->rq_head)) )
731     defer (entry->op);
732   recheck_waiting (queue);
733 }
734
735
736 /**
737  * Add an operation to a queue.  An operation can be in multiple queues at
738  * once. Once the operation is inserted into all the queues
739  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
740  * waiting for the operation to become active.
741  *
742  * @param queue queue to add the operation to
743  * @param op operation to add to the queue
744  * @param nres the number of units of the resources of queue needed by the
745  *          operation. Should be greater than 0.
746  */
747 void
748 GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
749                                          struct GNUNET_TESTBED_Operation *op,
750                                          unsigned int nres)
751 {
752   unsigned int qsize;
753
754   GNUNET_assert (0 < nres);
755   qsize = op->nqueues;
756   GNUNET_array_append (op->queues, op->nqueues, queue);
757   GNUNET_array_append (op->nres, qsize, nres);
758   GNUNET_assert (qsize == op->nqueues);
759 }
760
761
762 /**
763  * Add an operation to a queue.  An operation can be in multiple queues at
764  * once. Once the operation is inserted into all the queues
765  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
766  * waiting for the operation to become active. The operation is assumed to take
767  * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
768  * requires more than 1
769  *
770  * @param queue queue to add the operation to
771  * @param op operation to add to the queue
772  */
773 void
774 GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
775                                         struct GNUNET_TESTBED_Operation *op)
776 {
777   return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
778 }
779
780
781 /**
782  * Marks the given operation as waiting on the queues.  Once all queues permit
783  * the operation to become active, the operation will be activated.  The actual
784  * activation will occur in a separate task (thus allowing multiple queue
785  * insertions to be made without having the first one instantly trigger the
786  * operation if the first queue has sufficient resources).
787  *
788  * @param op the operation to marks as waiting
789  */
790 void
791 GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
792 {
793   GNUNET_assert (NULL == op->rq_entry);
794   change_state (op, OP_STATE_WAITING);
795   (void) check_readiness (op);
796 }
797
798
799 /**
800  * Marks an active operation as inactive - the operation will be kept in a
801  * ready-to-be-released state and continues to hold resources until another
802  * operation contents for them.
803  *
804  * @param op the operation to be marked as inactive.  The operation start
805  *          callback should have been called before for this operation to mark
806  *          it as inactive.
807  */
808 void
809 GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
810 {
811   struct OperationQueue **queues;
812   size_t ms;
813   unsigned int nqueues;
814   unsigned int i;
815
816   GNUNET_assert (OP_STATE_ACTIVE == op->state);
817   change_state (op, OP_STATE_INACTIVE);
818   nqueues = op->nqueues;
819   ms = sizeof (struct OperationQueue *) * nqueues;
820   queues = GNUNET_malloc (ms);
821   /* Cloning is needed as the operation be released by waiting operations and
822      hence its nqueues memory ptr will be freed */
823   GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
824   for (i = 0; i < nqueues; i++)
825     recheck_waiting (queues[i]);
826   GNUNET_free (queues);
827 }
828
829
830 /**
831  * Marks and inactive operation as active.  This fuction should be called to
832  * ensure that the oprelease callback will not be called until it is either
833  * marked as inactive or released.
834  *
835  * @param op the operation to be marked as active
836  */
837 void
838 GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
839 {
840
841   GNUNET_assert (OP_STATE_INACTIVE == op->state);
842   change_state (op, OP_STATE_ACTIVE);
843 }
844
845
846 /**
847  * An operation is 'done' (was cancelled or finished); remove
848  * it from the queues and release associated resources.
849  *
850  * @param op operation that finished
851  */
852 void
853 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
854 {
855   struct QueueEntry *entry;  
856   struct OperationQueue *opq;
857   unsigned int i;
858
859   if (OP_STATE_INIT == op->state)
860   {
861     GNUNET_free (op);
862     return;
863   }
864   if (OP_STATE_READY == op->state)
865     rq_remove (op);
866   if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
867     GNUNET_TESTBED_operation_activate_ (op);
868   GNUNET_assert (NULL != op->queues);
869   GNUNET_assert (NULL != op->qentries);
870   for (i = 0; i < op->nqueues; i++)
871   {
872     entry = op->qentries[i];
873     remove_queue_entry (op, i);
874     opq = op->queues[i];
875     switch (op->state)
876     {      
877     case OP_STATE_INIT:
878     case OP_STATE_INACTIVE:
879       GNUNET_assert (0);
880       break;
881     case OP_STATE_WAITING:      
882       break;
883     case OP_STATE_READY:
884     case OP_STATE_ACTIVE:
885       GNUNET_assert (0 != opq->active);
886       GNUNET_assert (opq->active >= entry->nres);
887       opq->active -= entry->nres;
888       recheck_waiting (opq);
889       break;
890     }    
891     GNUNET_free (entry);
892   }
893   GNUNET_free_non_null (op->qentries);
894   GNUNET_free (op->queues);
895   GNUNET_free (op->nres);
896   if (NULL != op->release)
897     op->release (op->cb_cls);
898   GNUNET_free (op);
899 }
900
901
902 /* end of testbed_api_operations.c */