- remove dead assignments
[oweals/gnunet.git] / src / testbed / testbed_api_operations.c
1 /*
2       This file is part of GNUnet
3       (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file testbed/testbed_api_operations.c
23  * @brief functions to manage operation queues
24  * @author Christian Grothoff
25  * @author Sree Harsha Totakura
26  */
27
28 #include "platform.h"
29 #include "testbed_api_operations.h"
30
31
32 /**
33  * An entry in the operation queue
34  */
35 struct QueueEntry
36 {
37   /**
38    * The next DLL pointer
39    */
40   struct QueueEntry *next;
41
42   /**
43    * The prev DLL pointer
44    */
45   struct QueueEntry *prev;
46
47   /**
48    * The operation this entry holds
49    */
50   struct GNUNET_TESTBED_Operation *op;
51
52   /**
53    * How many units of resources does the operation need
54    */
55   unsigned int nres;
56 };
57
58
59 /**
60  * Queue of operations where we can only support a certain
61  * number of concurrent operations of a particular type.
62  */
63 struct OperationQueue
64 {
65   /**
66    * DLL head for the wait queue.  Operations which are waiting for this
67    * operation queue are put here
68    */
69   struct QueueEntry *wq_head;
70
71   /**
72    * DLL tail for the wait queue.
73    */
74   struct QueueEntry *wq_tail;
75
76   /**
77    * DLL head for the ready queue.  Operations which are in this operation queue
78    * and are in ready state are put here
79    */
80   struct QueueEntry *rq_head;
81
82   /**
83    * DLL tail for the ready queue
84    */
85   struct QueueEntry *rq_tail;
86
87   /**
88    * DLL head for the active queue.  Operations which are in this operation
89    * queue and are currently active are put here
90    */
91   struct QueueEntry *aq_head;
92
93   /**
94    * DLL tail for the active queue.
95    */
96   struct QueueEntry *aq_tail;
97
98   /**
99    * DLL head for the inactive queue.  Operations which are inactive and can be
100    * evicted if the queues it holds are maxed out and another operation begins
101    * to wait on them.
102    */
103   struct QueueEntry *nq_head;
104
105   /**
106    * DLL tail for the inactive queue.
107    */
108   struct QueueEntry *nq_tail;
109
110   /**
111    * Number of operations that are currently active in this queue.
112    */
113   unsigned int active;
114
115   /**
116    * Max number of operations which can be active at any time in this queue
117    */
118   unsigned int max_active;
119
120 };
121
122
123 /**
124  * Operation state
125  */
126 enum OperationState
127 {
128   /**
129    * The operation is just created and is in initial state
130    */
131   OP_STATE_INIT,
132
133   /**
134    * The operation is currently waiting for resources
135    */
136   OP_STATE_WAITING,
137
138   /**
139    * The operation is ready to be started
140    */
141   OP_STATE_READY,
142
143   /**
144    * The operation has started and is active
145    */
146   OP_STATE_ACTIVE,
147
148   /**
149    * The operation is inactive.  It still holds resources on the operation
150    * queues.  However, this operation will be evicted when another operation
151    * requires resources from the maxed out queues this operation is holding
152    * resources from.
153    */
154   OP_STATE_INACTIVE
155 };
156
157
158 /**
159  * An entry in the ready queue (implemented as DLL)
160  */
161 struct ReadyQueueEntry
162 {
163   /**
164    * next ptr for DLL
165    */
166   struct ReadyQueueEntry *next;
167   
168   /**
169    * prev ptr for DLL
170    */
171   struct ReadyQueueEntry *prev;
172
173   /**
174    * The operation associated with this entry
175    */
176   struct GNUNET_TESTBED_Operation *op;
177 };
178
179
180 /**
181  * Opaque handle to an abstract operation to be executed by the testing framework.
182  */
183 struct GNUNET_TESTBED_Operation
184 {
185   /**
186    * Function to call when we have the resources to begin the operation.
187    */
188   OperationStart start;
189
190   /**
191    * Function to call to clean up after the operation (which may or may
192    * not have been started yet).
193    */
194   OperationRelease release;
195
196   /**
197    * Closure for callbacks.
198    */
199   void *cb_cls;
200
201   /**
202    * Array of operation queues this Operation belongs to.
203    */
204   struct OperationQueue **queues;
205
206   /**
207    * Array of operation queue entries corresponding to this operation in
208    * operation queues for this operation
209    */
210   struct QueueEntry **qentries;
211
212   /**
213    * Array of number of resources an operation need from each queue. The numbers
214    * in this array should correspond to the queues array
215    */
216   unsigned int *nres;
217
218   /**
219    * Entry corresponding to this operation in ready queue.  Will be NULL if the
220    * operation is not marked as READY
221    */
222   struct ReadyQueueEntry *rq_entry;
223
224   /**
225    * Number of queues in the operation queues array
226    */
227   unsigned int nqueues;
228
229   /**
230    * The state of the operation
231    */
232   enum OperationState state;
233
234 };
235
236 /**
237  * DLL head for the ready queue
238  */
239 struct ReadyQueueEntry *rq_head;
240
241 /**
242  * DLL tail for the ready queue
243  */
244 struct ReadyQueueEntry *rq_tail;
245
246 /**
247  * The id of the task to process the ready queue
248  */
249 GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
250
251
252 /**
253  * Removes a queue entry of an operation from one of the operation queues' lists
254  * depending on the state of the operation
255  *
256  * @param op the operation whose entry has to be removed
257  * @param index the index of the entry in the operation's array of queue entries
258  */
259 static void
260 remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
261 {
262   struct OperationQueue *opq;
263   struct QueueEntry *entry;
264   
265   opq = op->queues[index];
266   entry = op->qentries[index];
267   switch (op->state)
268   {
269   case OP_STATE_INIT:
270     GNUNET_assert (0);
271     break;
272   case OP_STATE_WAITING:
273     GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
274     break;
275   case OP_STATE_READY:
276     GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
277     break;
278   case OP_STATE_ACTIVE:
279     GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
280     break;
281   case OP_STATE_INACTIVE:
282     GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
283     break;
284   }
285 }
286
287
288 /**
289  * Changes the state of the operation while moving its associated queue entries
290  * in the operation's operation queues
291  *
292  * @param op the operation whose state has to be changed
293  * @param state the state the operation should have.  It cannot be OP_STATE_INIT
294  */
295 static void
296 change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
297 {
298   struct QueueEntry *entry;
299   struct OperationQueue *opq;
300   unsigned int cnt;
301   unsigned int s;
302   
303   GNUNET_assert (OP_STATE_INIT != state);
304   GNUNET_assert (NULL != op->queues);
305   GNUNET_assert (NULL != op->nres);
306   GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
307   GNUNET_assert (op->state != state);
308   for (cnt = 0; cnt < op->nqueues; cnt++)
309   {
310     if (OP_STATE_INIT == op->state)
311     {
312       entry = GNUNET_malloc (sizeof (struct QueueEntry));
313       entry->op = op;
314       entry->nres = op->nres[cnt];
315       s = cnt;
316       GNUNET_array_append (op->qentries, s, entry);      
317     }
318     else
319     {
320       entry = op->qentries[cnt];
321       remove_queue_entry (op, cnt);
322     }
323     opq = op->queues[cnt];
324     switch (state)
325     {
326     case OP_STATE_INIT:
327       GNUNET_assert (0);
328       break;
329     case OP_STATE_WAITING:
330       GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
331       break;
332     case OP_STATE_READY:
333       GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
334       break;
335     case OP_STATE_ACTIVE:
336       GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
337       break;
338     case OP_STATE_INACTIVE:
339       GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
340       break;
341     }
342   }
343   op->state = state;
344 }
345
346
347 /**
348  * Removes an operation from the ready queue.  Also stops the 'process_rq_task'
349  * if the given operation is the last one in the queue.
350  *
351  * @param op the operation to be removed
352  */
353 static void
354 rq_remove (struct GNUNET_TESTBED_Operation *op)
355 {  
356   GNUNET_assert (NULL != op->rq_entry);
357   GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
358   GNUNET_free (op->rq_entry);
359   op->rq_entry = NULL;
360   if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) )
361   {
362     GNUNET_SCHEDULER_cancel (process_rq_task_id);
363     process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
364   }
365 }
366
367
368 /**
369  * Processes the ready queue by calling the operation start callback of the
370  * operation at the head.  The operation is then removed from the queue.  The
371  * task is scheduled to run again immediately until no more operations are in
372  * the ready queue.
373  *
374  * @param cls NULL
375  * @param tc scheduler task context.  Not used.
376  */
377 static void
378 process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
379 {
380   struct GNUNET_TESTBED_Operation *op;
381
382   process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
383   GNUNET_assert (NULL != rq_head);
384   GNUNET_assert (NULL != (op = rq_head->op));
385   rq_remove (op);
386   if (NULL != rq_head)
387     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
388   change_state (op, OP_STATE_ACTIVE);
389   if (NULL != op->start)
390     op->start (op->cb_cls);  
391 }
392
393
394 /**
395  * Adds the operation to the ready queue and starts the 'process_rq_task'
396  *
397  * @param op the operation to be queued
398  */
399 static void
400 rq_add (struct GNUNET_TESTBED_Operation *op)
401 {
402   struct ReadyQueueEntry *rq_entry;
403
404   GNUNET_assert (NULL == op->rq_entry);
405   rq_entry = GNUNET_malloc (sizeof (struct ReadyQueueEntry));
406   rq_entry->op = op;
407   GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
408   op->rq_entry = rq_entry;
409   if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id)
410     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
411 }
412
413
414 /**
415  * Checks if the given operation queue is empty or not
416  *
417  * @param opq the operation queue
418  * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
419  *           otherwise
420  */
421 static int
422 is_queue_empty (struct OperationQueue *opq)
423 {
424   if ( (NULL != opq->wq_head)
425        || (NULL != opq->rq_head)
426        || (NULL != opq->aq_head)
427        || (NULL != opq->nq_head) )
428     return GNUNET_NO;
429   return GNUNET_YES;
430 }
431
432
433 /**
434  * Checks if the given operation queue has enough resources to provide for the
435  * operation of the given queue entry.  It also checks if any inactive
436  * operations are to be released in order to accommodate the needed resources
437  * and returns them as an array.
438  *
439  * @param opq the operation queue to check for resource accommodation
440  * @param entry the operation queue entry whose operation's resources are to be
441  *          accommodated
442  * @param ops_ pointer to return the array of operations which are to be released
443  *          in order to accommodate the new operation.  Can be NULL
444  * @param n_ops_ the number of operations in ops_
445  * @return GNUNET_YES if the given entry's operation can be accommodated in this
446  *           queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
447  *           be set to NULL and 0 respectively.
448  */
449 static int
450 decide_capacity (struct OperationQueue *opq,
451                  struct QueueEntry *entry,
452                  struct GNUNET_TESTBED_Operation ***ops_,
453                  unsigned int *n_ops_)
454 {
455   struct QueueEntry **evict_entries;
456   struct GNUNET_TESTBED_Operation **ops;
457   struct GNUNET_TESTBED_Operation *op;
458   unsigned int n_ops;
459   unsigned int n_evict_entries;
460   unsigned int need;
461   int deficit;
462   int rval;
463
464   GNUNET_assert (NULL != (op = entry->op));
465   GNUNET_assert (0 < (need = entry->nres));
466   ops = NULL;
467   n_ops = 0;
468   evict_entries = NULL;
469   n_evict_entries = 0;
470   rval = GNUNET_YES;
471   if (opq->active > opq->max_active)
472   {
473     rval = GNUNET_NO;
474     goto ret;
475   }
476   if ((opq->active + need) <= opq->max_active)
477     goto ret;
478   deficit = need - (opq->max_active - opq->active);
479   for (entry = opq->nq_head;
480        (0 < deficit) && (NULL != entry);
481        entry = entry->next)
482   {
483     GNUNET_array_append (evict_entries, n_evict_entries, entry);
484     deficit -= entry->nres;
485   }
486   if (0 < deficit)
487   {
488     rval = GNUNET_NO;
489     goto ret;
490   }
491   for (n_ops = 0; n_ops < n_evict_entries;)
492   {
493     op = evict_entries[n_ops]->op;
494     GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
495   }
496
497  ret:
498   GNUNET_free_non_null (evict_entries);  
499   if (NULL != ops_)
500     *ops_ = ops;
501   else
502     GNUNET_free (ops);
503   if (NULL != n_ops_)
504     *n_ops_ = n_ops;
505   return rval;
506 }
507
508
509 /**
510  * Merges an array of operations into another, eliminating duplicates.  No
511  * ordering is guaranteed.
512  *
513  * @param old the array into which the merging is done.
514  * @param n_old the number of operations in old array
515  * @param new the array from which operations are to be merged
516  * @param n_new the number of operations in new array
517  */
518 static void
519 merge_ops (struct GNUNET_TESTBED_Operation ***old,
520            unsigned int *n_old,
521            struct GNUNET_TESTBED_Operation **new,
522            unsigned int n_new)
523 {
524   struct GNUNET_TESTBED_Operation **cur;
525   unsigned int i;
526   unsigned int j;
527   unsigned int n_cur;
528  
529   GNUNET_assert (NULL != old);
530   n_cur = *n_old;
531   cur = *old;
532   for (i = 0; i < n_new; i++)
533   {    
534     for (j = 0; j < *n_old; j++)
535     {
536       if (new[i] == cur[j])
537         break;
538     }
539     if (j < *n_old)
540       continue;
541     GNUNET_array_append (cur, n_cur, new[j]);
542   }
543   *old = cur;
544   *n_old = n_cur;
545 }
546            
547
548
549 /**
550  * Checks for the readiness of an operation and schedules a operation start task
551  *
552  * @param op the operation
553  */
554 static int
555 check_readiness (struct GNUNET_TESTBED_Operation *op)
556 {
557   struct GNUNET_TESTBED_Operation **evict_ops;
558   struct GNUNET_TESTBED_Operation **ops;
559   unsigned int n_ops;
560   unsigned int n_evict_ops;
561   unsigned int i;
562
563   GNUNET_assert (NULL == op->rq_entry);
564   GNUNET_assert (OP_STATE_WAITING == op->state);
565   evict_ops = NULL;
566   n_evict_ops = 0;
567   for (i = 0; i < op->nqueues; i++)
568   {
569     ops = NULL;
570     n_ops = 0;
571     if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
572                                       &ops, &n_ops))
573     {
574       GNUNET_free_non_null (evict_ops);
575       return GNUNET_NO;
576     }
577     if (NULL == ops)
578       continue;
579     merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
580     GNUNET_free (ops);    
581   }
582   if (NULL != evict_ops)
583   {
584     for (i = 0; i < n_evict_ops; i++)
585       GNUNET_TESTBED_operation_release_ (evict_ops[i]);
586     GNUNET_free (evict_ops);
587     evict_ops = NULL;
588     /* Evicting the operations should schedule this operation */
589     GNUNET_assert (OP_STATE_READY == op->state);
590     return GNUNET_YES;
591   }
592   for (i = 0; i < op->nqueues; i++)
593     op->queues[i]->active += op->nres[i];
594   change_state (op, OP_STATE_READY);
595   rq_add (op);
596   return GNUNET_YES;
597 }
598
599
600 /**
601  * Defers a ready to be executed operation back to waiting
602  *
603  * @param op the operation to defer
604  */
605 static void
606 defer (struct GNUNET_TESTBED_Operation *op)
607 {
608   unsigned int i;
609
610   GNUNET_assert (OP_STATE_READY == op->state);
611   rq_remove (op);
612   for (i = 0; i < op->nqueues; i++)
613     op->queues[i]->active--;
614   change_state (op, OP_STATE_WAITING);
615 }
616
617
618 /**
619  * Create an 'operation' to be performed.
620  *
621  * @param cls closure for the callbacks
622  * @param start function to call to start the operation
623  * @param release function to call to close down the operation
624  * @return handle to the operation
625  */
626 struct GNUNET_TESTBED_Operation *
627 GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
628                                   OperationRelease release)
629 {
630   struct GNUNET_TESTBED_Operation *op;
631
632   op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation));
633   op->start = start;
634   op->state = OP_STATE_INIT;
635   op->release = release;
636   op->cb_cls = cls;
637   return op;
638 }
639
640
641 /**
642  * Create an operation queue.
643  *
644  * @param max_active maximum number of operations in this
645  *        queue that can be active in parallel at the same time
646  * @return handle to the queue
647  */
648 struct OperationQueue *
649 GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
650 {
651   struct OperationQueue *queue;
652
653   queue = GNUNET_malloc (sizeof (struct OperationQueue));
654   queue->max_active = max_active;
655   return queue;
656 }
657
658
659 /**
660  * Destroy an operation queue.  The queue MUST be empty
661  * at this time.
662  *
663  * @param queue queue to destroy
664  */
665 void
666 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
667 {
668   GNUNET_break (GNUNET_YES == is_queue_empty (queue));
669   GNUNET_free (queue);
670 }
671
672
673 /**
674  * Destroys the operation queue if it is empty.  If not empty return GNUNET_NO.
675  *
676  * @param queue the queue to destroy if empty
677  * @return GNUNET_YES if the queue is destroyed.  GNUNET_NO if not (because it
678  *           is not empty)
679  */
680 int
681 GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
682 {
683   if (GNUNET_NO == is_queue_empty (queue))
684     return GNUNET_NO;
685   GNUNET_TESTBED_operation_queue_destroy_ (queue);
686   return GNUNET_YES;
687 }
688
689
690 /**
691  * Rechecks if any of the operations in the given operation queue's waiting list
692  * can be made active
693  *
694  * @param opq the operation queue
695  */
696 static void
697 recheck_waiting (struct OperationQueue *opq)
698 {
699   struct QueueEntry *entry;
700   struct QueueEntry *entry2;
701
702   entry = opq->wq_head;
703   while (NULL != entry)
704   {
705     entry2 = entry->next;
706     if (GNUNET_NO == check_readiness (entry->op))
707       break;
708     entry = entry2;
709   }
710 }
711
712
713 /**
714  * Function to reset the maximum number of operations in the given queue. If
715  * max_active is lesser than the number of currently active operations, the
716  * active operations are not stopped immediately.
717  *
718  * @param queue the operation queue which has to be modified
719  * @param max_active the new maximum number of active operations
720  */
721 void
722 GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
723                                                   unsigned int max_active)
724 {
725   struct QueueEntry *entry;
726
727   queue->max_active = max_active;
728   while ( (queue->active > queue->max_active)
729           && (NULL != (entry = queue->rq_head)) )
730     defer (entry->op);
731   recheck_waiting (queue);
732 }
733
734
735 /**
736  * Add an operation to a queue.  An operation can be in multiple queues at
737  * once. Once the operation is inserted into all the queues
738  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
739  * waiting for the operation to become active.
740  *
741  * @param queue queue to add the operation to
742  * @param op operation to add to the queue
743  * @param nres the number of units of the resources of queue needed by the
744  *          operation. Should be greater than 0.
745  */
746 void
747 GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
748                                          struct GNUNET_TESTBED_Operation *op,
749                                          unsigned int nres)
750 {
751   unsigned int qsize;
752
753   GNUNET_assert (0 < nres);
754   qsize = op->nqueues;
755   GNUNET_array_append (op->queues, op->nqueues, queue);
756   GNUNET_array_append (op->nres, qsize, nres);
757   GNUNET_assert (qsize == op->nqueues);
758 }
759
760
761 /**
762  * Add an operation to a queue.  An operation can be in multiple queues at
763  * once. Once the operation is inserted into all the queues
764  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
765  * waiting for the operation to become active. The operation is assumed to take
766  * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
767  * requires more than 1
768  *
769  * @param queue queue to add the operation to
770  * @param op operation to add to the queue
771  */
772 void
773 GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
774                                         struct GNUNET_TESTBED_Operation *op)
775 {
776   return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
777 }
778
779
780 /**
781  * Marks the given operation as waiting on the queues.  Once all queues permit
782  * the operation to become active, the operation will be activated.  The actual
783  * activation will occur in a separate task (thus allowing multiple queue
784  * insertions to be made without having the first one instantly trigger the
785  * operation if the first queue has sufficient resources).
786  *
787  * @param op the operation to marks as waiting
788  */
789 void
790 GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
791 {
792   GNUNET_assert (NULL == op->rq_entry);
793   change_state (op, OP_STATE_WAITING);
794   (void) check_readiness (op);
795 }
796
797
798 /**
799  * Marks an active operation as inactive - the operation will be kept in a
800  * ready-to-be-released state and continues to hold resources until another
801  * operation contents for them.
802  *
803  * @param op the operation to be marked as inactive.  The operation start
804  *          callback should have been called before for this operation to mark
805  *          it as inactive.
806  */
807 void
808 GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
809 {
810   struct OperationQueue **queues;
811   size_t ms;
812   unsigned int nqueues;
813   unsigned int i;
814
815   GNUNET_assert (OP_STATE_ACTIVE == op->state);
816   change_state (op, OP_STATE_INACTIVE);
817   nqueues = op->nqueues;
818   ms = sizeof (struct OperationQueue *) * nqueues;
819   queues = GNUNET_malloc (ms);
820   /* Cloning is needed as the operation be released by waiting operations and
821      hence its nqueues memory ptr will be freed */
822   GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
823   for (i = 0; i < nqueues; i++)
824     recheck_waiting (queues[i]);
825   GNUNET_free (queues);
826 }
827
828
829 /**
830  * Marks and inactive operation as active.  This fuction should be called to
831  * ensure that the oprelease callback will not be called until it is either
832  * marked as inactive or released.
833  *
834  * @param op the operation to be marked as active
835  */
836 void
837 GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
838 {
839
840   GNUNET_assert (OP_STATE_INACTIVE == op->state);
841   change_state (op, OP_STATE_ACTIVE);
842 }
843
844
845 /**
846  * An operation is 'done' (was cancelled or finished); remove
847  * it from the queues and release associated resources.
848  *
849  * @param op operation that finished
850  */
851 void
852 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
853 {
854   struct QueueEntry *entry;  
855   struct OperationQueue *opq;
856   unsigned int i;
857
858   if (OP_STATE_INIT == op->state)
859   {
860     GNUNET_free (op);
861     return;
862   }
863   if (OP_STATE_READY == op->state)
864     rq_remove (op);
865   if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
866     GNUNET_TESTBED_operation_activate_ (op);
867   GNUNET_assert (NULL != op->queues);
868   GNUNET_assert (NULL != op->qentries);
869   for (i = 0; i < op->nqueues; i++)
870   {
871     entry = op->qentries[i];
872     remove_queue_entry (op, i);
873     opq = op->queues[i];
874     switch (op->state)
875     {      
876     case OP_STATE_INIT:
877     case OP_STATE_INACTIVE:
878       GNUNET_assert (0);
879       break;
880     case OP_STATE_WAITING:      
881       break;
882     case OP_STATE_READY:
883     case OP_STATE_ACTIVE:
884       GNUNET_assert (0 != opq->active);
885       GNUNET_assert (opq->active >= entry->nres);
886       opq->active -= entry->nres;
887       recheck_waiting (opq);
888       break;
889     }    
890     GNUNET_free (entry);
891   }
892   GNUNET_free_non_null (op->qentries);
893   GNUNET_free (op->queues);
894   GNUNET_free (op->nres);
895   if (NULL != op->release)
896     op->release (op->cb_cls);
897   GNUNET_free (op);
898 }
899
900
901 /* end of testbed_api_operations.c */