- expand $ variables in filenames
[oweals/gnunet.git] / src / testbed / testbed_api_operations.c
1 /*
2       This file is part of GNUnet
3       (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file testbed/testbed_api_operations.c
23  * @brief functions to manage operation queues
24  * @author Christian Grothoff
25  * @author Sree Harsha Totakura
26  */
27
28 #include "platform.h"
29 #include "testbed_api_operations.h"
30
31
32 /**
33  * An entry in the operation queue
34  */
35 struct QueueEntry
36 {
37   /**
38    * The next DLL pointer
39    */
40   struct QueueEntry *next;
41
42   /**
43    * The prev DLL pointer
44    */
45   struct QueueEntry *prev;
46
47   /**
48    * The operation this entry holds
49    */
50   struct GNUNET_TESTBED_Operation *op;
51
52   /**
53    * How many units of resources does the operation need
54    */
55   unsigned int nres;
56 };
57
58
59 /**
60  * Queue of operations where we can only support a certain
61  * number of concurrent operations of a particular type.
62  */
63 struct OperationQueue
64 {
65   /**
66    * DLL head for the wait queue.  Operations which are waiting for this
67    * operation queue are put here
68    */
69   struct QueueEntry *wq_head;
70
71   /**
72    * DLL tail for the wait queue.
73    */
74   struct QueueEntry *wq_tail;
75
76   /**
77    * DLL head for the ready queue.  Operations which are in this operation queue
78    * and are in ready state are put here
79    */
80   struct QueueEntry *rq_head;
81
82   /**
83    * DLL tail for the ready queue
84    */
85   struct QueueEntry *rq_tail;
86
87   /**
88    * DLL head for the active queue.  Operations which are in this operation
89    * queue and are currently active are put here
90    */
91   struct QueueEntry *aq_head;
92
93   /**
94    * DLL tail for the active queue.
95    */
96   struct QueueEntry *aq_tail;
97
98   /**
99    * DLL head for the inactive queue.  Operations which are inactive and can be
100    * evicted if the queues it holds are maxed out and another operation begins
101    * to wait on them.
102    */
103   struct QueueEntry *nq_head;
104
105   /**
106    * DLL tail for the inactive queue.
107    */
108   struct QueueEntry *nq_tail;
109
110   /**
111    * Number of operations that are currently active in this queue.
112    */
113   unsigned int active;
114
115   /**
116    * Max number of operations which can be active at any time in this queue
117    */
118   unsigned int max_active;
119
120 };
121
122
123 /**
124  * Operation state
125  */
126 enum OperationState
127 {
128   /**
129    * The operation is just created and is in initial state
130    */
131   OP_STATE_INIT,
132
133   /**
134    * The operation is currently waiting for resources
135    */
136   OP_STATE_WAITING,
137
138   /**
139    * The operation is ready to be started
140    */
141   OP_STATE_READY,
142
143   /**
144    * The operation has started and is active
145    */
146   OP_STATE_ACTIVE,
147
148   /**
149    * The operation is inactive.  It still holds resources on the operation
150    * queues.  However, this operation will be evicted when another operation
151    * requires resources from the maxed out queues this operation is holding
152    * resources from.
153    */
154   OP_STATE_INACTIVE
155 };
156
157
158 /**
159  * An entry in the ready queue (implemented as DLL)
160  */
161 struct ReadyQueueEntry
162 {
163   /**
164    * next ptr for DLL
165    */
166   struct ReadyQueueEntry *next;
167   
168   /**
169    * prev ptr for DLL
170    */
171   struct ReadyQueueEntry *prev;
172
173   /**
174    * The operation associated with this entry
175    */
176   struct GNUNET_TESTBED_Operation *op;
177 };
178
179
180 /**
181  * Opaque handle to an abstract operation to be executed by the testing framework.
182  */
183 struct GNUNET_TESTBED_Operation
184 {
185   /**
186    * Function to call when we have the resources to begin the operation.
187    */
188   OperationStart start;
189
190   /**
191    * Function to call to clean up after the operation (which may or may
192    * not have been started yet).
193    */
194   OperationRelease release;
195
196   /**
197    * Closure for callbacks.
198    */
199   void *cb_cls;
200
201   /**
202    * Array of operation queues this Operation belongs to.
203    */
204   struct OperationQueue **queues;
205
206   /**
207    * Array of operation queue entries corresponding to this operation in
208    * operation queues for this operation
209    */
210   struct QueueEntry **qentries;
211
212   /**
213    * Array of number of resources an operation need from each queue. The numbers
214    * in this array should correspond to the queues array
215    */
216   unsigned int *nres;
217
218   /**
219    * Entry corresponding to this operation in ready queue.  Will be NULL if the
220    * operation is not marked as READY
221    */
222   struct ReadyQueueEntry *rq_entry;
223
224   /**
225    * Number of queues in the operation queues array
226    */
227   unsigned int nqueues;
228
229   /**
230    * The state of the operation
231    */
232   enum OperationState state;
233
234 };
235
236 /**
237  * DLL head for the ready queue
238  */
239 struct ReadyQueueEntry *rq_head;
240
241 /**
242  * DLL tail for the ready queue
243  */
244 struct ReadyQueueEntry *rq_tail;
245
246 /**
247  * The id of the task to process the ready queue
248  */
249 GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
250
251
252 /**
253  * Removes a queue entry of an operation from one of the operation queues' lists
254  * depending on the state of the operation
255  *
256  * @param op the operation whose entry has to be removed
257  * @param index the index of the entry in the operation's array of queue entries
258  */
259 static void
260 remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
261 {
262   struct OperationQueue *opq;
263   struct QueueEntry *entry;
264   
265   opq = op->queues[index];
266   entry = op->qentries[index];
267   switch (op->state)
268   {
269   case OP_STATE_INIT:
270     GNUNET_assert (0);
271     break;
272   case OP_STATE_WAITING:
273     GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
274     break;
275   case OP_STATE_READY:
276     GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
277     break;
278   case OP_STATE_ACTIVE:
279     GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
280     break;
281   case OP_STATE_INACTIVE:
282     GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
283     break;
284   }
285 }
286
287
288 /**
289  * Changes the state of the operation while moving its associated queue entries
290  * in the operation's operation queues
291  *
292  * @param op the operation whose state has to be changed
293  * @param state the state the operation should have.  It cannot be OP_STATE_INIT
294  */
295 static void
296 change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
297 {
298   struct QueueEntry *entry;
299   struct OperationQueue *opq;
300   unsigned int cnt;
301   unsigned int s;
302   
303   GNUNET_assert (OP_STATE_INIT != state);
304   GNUNET_assert (NULL != op->queues);
305   GNUNET_assert (NULL != op->nres);
306   GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
307   GNUNET_assert (op->state != state);
308   for (cnt = 0; cnt < op->nqueues; cnt++)
309   {
310     if (OP_STATE_INIT == op->state)
311     {
312       entry = GNUNET_malloc (sizeof (struct QueueEntry));
313       entry->op = op;
314       entry->nres = op->nres[cnt];
315       s = cnt;
316       GNUNET_array_append (op->qentries, s, entry);      
317     }
318     else
319     {
320       entry = op->qentries[cnt];
321       remove_queue_entry (op, cnt);
322     }
323     opq = op->queues[cnt];
324     switch (state)
325     {
326     case OP_STATE_INIT:
327       GNUNET_assert (0);
328       break;
329     case OP_STATE_WAITING:
330       GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
331       break;
332     case OP_STATE_READY:
333       GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
334       break;
335     case OP_STATE_ACTIVE:
336       GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
337       break;
338     case OP_STATE_INACTIVE:
339       GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
340       break;
341     }
342   }
343   op->state = state;
344 }
345
346
347 /**
348  * Removes an operation from the ready queue.  Also stops the 'process_rq_task'
349  * if the given operation is the last one in the queue.
350  *
351  * @param op the operation to be removed
352  */
353 static void
354 rq_remove (struct GNUNET_TESTBED_Operation *op)
355 {  
356   GNUNET_assert (NULL != op->rq_entry);
357   GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
358   GNUNET_free (op->rq_entry);
359   op->rq_entry = NULL;
360   if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) )
361   {
362     GNUNET_SCHEDULER_cancel (process_rq_task_id);
363     process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
364   }
365 }
366
367
368 /**
369  * Processes the ready queue by calling the operation start callback of the
370  * operation at the head.  The operation is then removed from the queue.  The
371  * task is scheduled to run again immediately until no more operations are in
372  * the ready queue.
373  *
374  * @param cls NULL
375  * @param tc scheduler task context.  Not used.
376  */
377 static void
378 process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
379 {
380   struct GNUNET_TESTBED_Operation *op;
381
382   process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
383   GNUNET_assert (NULL != rq_head);
384   GNUNET_assert (NULL != (op = rq_head->op));
385   rq_remove (op);
386   if (NULL != rq_head)
387     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
388   change_state (op, OP_STATE_ACTIVE);
389   if (NULL != op->start)
390     op->start (op->cb_cls);  
391 }
392
393
394 /**
395  * Adds the operation to the ready queue and starts the 'process_rq_task'
396  *
397  * @param op the operation to be queued
398  */
399 static void
400 rq_add (struct GNUNET_TESTBED_Operation *op)
401 {
402   struct ReadyQueueEntry *rq_entry;
403
404   GNUNET_assert (NULL == op->rq_entry);
405   rq_entry = GNUNET_malloc (sizeof (struct ReadyQueueEntry));
406   rq_entry->op = op;
407   GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
408   op->rq_entry = rq_entry;
409   if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id)
410     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
411 }
412
413
414 /**
415  * Checks if the given operation queue is empty or not
416  *
417  * @param opq the operation queue
418  * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
419  *           otherwise
420  */
421 static int
422 is_queue_empty (struct OperationQueue *opq)
423 {
424   if ( (NULL != opq->wq_head)
425        || (NULL != opq->rq_head)
426        || (NULL != opq->aq_head)
427        || (NULL != opq->nq_head) )
428     return GNUNET_NO;
429   return GNUNET_YES;
430 }
431
432
433 /**
434  * Checks if the given operation queue has enough resources to provide for the
435  * operation of the given queue entry.  It also checks if any inactive
436  * operations are to be released in order to accommodate the needed resources
437  * and returns them as an array.
438  *
439  * @param opq the operation queue to check for resource accommodation
440  * @param entry the operation queue entry whose operation's resources are to be
441  *          accommodated
442  * @param ops_ pointer to return the array of operations which are to be released
443  *          in order to accommodate the new operation.  Can be NULL
444  * @param n_ops_ the number of operations in ops_
445  * @return GNUNET_YES if the given entry's operation can be accommodated in this
446  *           queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
447  *           be set to NULL and 0 respectively.
448  */
449 static int
450 decide_capacity (struct OperationQueue *opq,
451                  struct QueueEntry *entry,
452                  struct GNUNET_TESTBED_Operation ***ops_,
453                  unsigned int *n_ops_)
454 {
455   struct QueueEntry **evict_entries;
456   struct GNUNET_TESTBED_Operation **ops;
457   struct GNUNET_TESTBED_Operation *op;
458   unsigned int n_ops;
459   unsigned int n_evict_entries;
460   unsigned int need;
461   int deficit;
462   int rval;
463
464   GNUNET_assert (NULL != (op = entry->op));
465   GNUNET_assert (0 < (need = entry->nres));
466   ops = NULL;
467   n_ops = 0;
468   evict_entries = NULL;
469   n_evict_entries = 0;
470   rval = GNUNET_YES;
471   if (opq->active > opq->max_active)
472   {
473     need += opq->active - opq->max_active;
474     rval = GNUNET_NO;
475     goto ret;
476   }
477   if ((opq->active + need) <= opq->max_active)
478     goto ret;
479   deficit = need - (opq->max_active - opq->active);
480   for (entry = opq->nq_head;
481        (0 < deficit) && (NULL != entry);
482        entry = entry->next)
483   {
484     GNUNET_array_append (evict_entries, n_evict_entries, entry);
485     deficit -= entry->nres;
486   }
487   if (0 < deficit)
488   {
489     rval = GNUNET_NO;
490     goto ret;
491   }
492   for (n_ops = 0; n_ops < n_evict_entries;)
493   {
494     op = evict_entries[n_ops]->op;
495     GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
496   }
497
498  ret:
499   GNUNET_free_non_null (evict_entries);  
500   if (NULL != ops_) *ops_ = ops;
501   if (NULL != n_ops_) *n_ops_ = n_ops;
502   return rval;
503 }
504
505
506 /**
507  * Merges an array of operations into another, eliminating duplicates.  No
508  * ordering is guaranteed.
509  *
510  * @param old the array into which the merging is done.
511  * @param n_old the number of operations in old array
512  * @param new the array from which operations are to be merged
513  * @param n_new the number of operations in new array
514  */
515 static void
516 merge_ops (struct GNUNET_TESTBED_Operation ***old,
517            unsigned int *n_old,
518            struct GNUNET_TESTBED_Operation **new,
519            unsigned int n_new)
520 {
521   struct GNUNET_TESTBED_Operation **cur;
522   unsigned int i;
523   unsigned int j;
524   unsigned int n_cur;
525  
526   GNUNET_assert (NULL != old);
527   n_cur = *n_old;
528   cur = *old;
529   for (i = 0; i < n_new; i++)
530   {    
531     for (j = 0; j < *n_old; j++)
532     {
533       if (new[i] == cur[j])
534         break;
535     }
536     if (j < *n_old)
537       continue;
538     GNUNET_array_append (cur, n_cur, new[j]);
539   }
540   *old = cur;
541   *n_old = n_cur;
542 }
543            
544
545
546 /**
547  * Checks for the readiness of an operation and schedules a operation start task
548  *
549  * @param op the operation
550  */
551 static int
552 check_readiness (struct GNUNET_TESTBED_Operation *op)
553 {
554   struct GNUNET_TESTBED_Operation **evict_ops;
555   struct GNUNET_TESTBED_Operation **ops;
556   unsigned int n_ops;
557   unsigned int n_evict_ops;
558   unsigned int i;
559
560   GNUNET_assert (NULL == op->rq_entry);
561   GNUNET_assert (OP_STATE_WAITING == op->state);
562   evict_ops = NULL;
563   n_evict_ops = 0;
564   for (i = 0; i < op->nqueues; i++)
565   {
566     ops = NULL;
567     n_ops = 0;
568     if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
569                                       &ops, &n_ops))
570     {
571       GNUNET_free_non_null (evict_ops);
572       return GNUNET_NO;
573     }
574     if (NULL == ops)
575       continue;
576     merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
577     GNUNET_free (ops);    
578   }
579   if (NULL != evict_ops)
580   {
581     for (i = 0; i < n_evict_ops; i++)
582       GNUNET_TESTBED_operation_release_ (evict_ops[i]);
583     GNUNET_free (evict_ops);
584     evict_ops = NULL;
585     /* Evicting the operations should schedule this operation */
586     GNUNET_assert (OP_STATE_READY == op->state);
587     return GNUNET_YES;
588   }
589   for (i = 0; i < op->nqueues; i++)
590     op->queues[i]->active += op->nres[i];
591   change_state (op, OP_STATE_READY);
592   rq_add (op);
593   return GNUNET_YES;
594 }
595
596
597 /**
598  * Defers a ready to be executed operation back to waiting
599  *
600  * @param op the operation to defer
601  */
602 static void
603 defer (struct GNUNET_TESTBED_Operation *op)
604 {
605   unsigned int i;
606
607   GNUNET_assert (OP_STATE_READY == op->state);
608   rq_remove (op);
609   for (i = 0; i < op->nqueues; i++)
610     op->queues[i]->active--;
611   change_state (op, OP_STATE_WAITING);
612 }
613
614
615 /**
616  * Create an 'operation' to be performed.
617  *
618  * @param cls closure for the callbacks
619  * @param start function to call to start the operation
620  * @param release function to call to close down the operation
621  * @return handle to the operation
622  */
623 struct GNUNET_TESTBED_Operation *
624 GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
625                                   OperationRelease release)
626 {
627   struct GNUNET_TESTBED_Operation *op;
628
629   op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation));
630   op->start = start;
631   op->state = OP_STATE_INIT;
632   op->release = release;
633   op->cb_cls = cls;
634   return op;
635 }
636
637
638 /**
639  * Create an operation queue.
640  *
641  * @param max_active maximum number of operations in this
642  *        queue that can be active in parallel at the same time
643  * @return handle to the queue
644  */
645 struct OperationQueue *
646 GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
647 {
648   struct OperationQueue *queue;
649
650   queue = GNUNET_malloc (sizeof (struct OperationQueue));
651   queue->max_active = max_active;
652   return queue;
653 }
654
655
656 /**
657  * Destroy an operation queue.  The queue MUST be empty
658  * at this time.
659  *
660  * @param queue queue to destroy
661  */
662 void
663 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
664 {
665   GNUNET_break (GNUNET_YES == is_queue_empty (queue));
666   GNUNET_free (queue);
667 }
668
669
670 /**
671  * Destroys the operation queue if it is empty.  If not empty return GNUNET_NO.
672  *
673  * @param queue the queue to destroy if empty
674  * @return GNUNET_YES if the queue is destroyed.  GNUNET_NO if not (because it
675  *           is not empty)
676  */
677 int
678 GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
679 {
680   if (GNUNET_NO == is_queue_empty (queue))
681     return GNUNET_NO;
682   GNUNET_TESTBED_operation_queue_destroy_ (queue);
683   return GNUNET_YES;
684 }
685
686
687 /**
688  * Rechecks if any of the operations in the given operation queue's waiting list
689  * can be made active
690  *
691  * @param opq the operation queue
692  */
693 static void
694 recheck_waiting (struct OperationQueue *opq)
695 {
696   struct QueueEntry *entry;
697   struct QueueEntry *entry2;
698
699   entry = opq->wq_head;
700   while (NULL != entry)
701   {
702     entry2 = entry->next;
703     if (GNUNET_NO == check_readiness (entry->op))
704       break;
705     entry = entry2;
706   }
707 }
708
709
710 /**
711  * Function to reset the maximum number of operations in the given queue. If
712  * max_active is lesser than the number of currently active operations, the
713  * active operations are not stopped immediately.
714  *
715  * @param queue the operation queue which has to be modified
716  * @param max_active the new maximum number of active operations
717  */
718 void
719 GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
720                                                   unsigned int max_active)
721 {
722   struct QueueEntry *entry;
723
724   queue->max_active = max_active;
725   while ( (queue->active > queue->max_active)
726           && (NULL != (entry = queue->rq_head)) )
727     defer (entry->op);
728   recheck_waiting (queue);
729 }
730
731
732 /**
733  * Add an operation to a queue.  An operation can be in multiple queues at
734  * once. Once the operation is inserted into all the queues
735  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
736  * waiting for the operation to become active.
737  *
738  * @param queue queue to add the operation to
739  * @param op operation to add to the queue
740  * @param nres the number of units of the resources of queue needed by the
741  *          operation. Should be greater than 0.
742  */
743 void
744 GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
745                                          struct GNUNET_TESTBED_Operation *op,
746                                          unsigned int nres)
747 {
748   unsigned int qsize;
749
750   GNUNET_assert (0 < nres);
751   qsize = op->nqueues;
752   GNUNET_array_append (op->queues, op->nqueues, queue);
753   GNUNET_array_append (op->nres, qsize, nres);
754   GNUNET_assert (qsize == op->nqueues);
755 }
756
757
758 /**
759  * Add an operation to a queue.  An operation can be in multiple queues at
760  * once. Once the operation is inserted into all the queues
761  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
762  * waiting for the operation to become active. The operation is assumed to take
763  * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
764  * requires more than 1
765  *
766  * @param queue queue to add the operation to
767  * @param op operation to add to the queue
768  */
769 void
770 GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
771                                         struct GNUNET_TESTBED_Operation *op)
772 {
773   return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
774 }
775
776
777 /**
778  * Marks the given operation as waiting on the queues.  Once all queues permit
779  * the operation to become active, the operation will be activated.  The actual
780  * activation will occur in a separate task (thus allowing multiple queue
781  * insertions to be made without having the first one instantly trigger the
782  * operation if the first queue has sufficient resources).
783  *
784  * @param op the operation to marks as waiting
785  */
786 void
787 GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
788 {
789   GNUNET_assert (NULL == op->rq_entry);
790   change_state (op, OP_STATE_WAITING);
791   (void) check_readiness (op);
792 }
793
794
795 /**
796  * Marks an active operation as inactive - the operation will be kept in a
797  * ready-to-be-released state and continues to hold resources until another
798  * operation contents for them.
799  *
800  * @param op the operation to be marked as inactive.  The operation start
801  *          callback should have been called before for this operation to mark
802  *          it as inactive.
803  */
804 void
805 GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
806 {
807   struct OperationQueue **queues;
808   size_t ms;
809   unsigned int nqueues;
810   unsigned int i;
811
812   GNUNET_assert (OP_STATE_ACTIVE == op->state);
813   change_state (op, OP_STATE_INACTIVE);
814   nqueues = op->nqueues;
815   ms = sizeof (struct OperationQueue *) * nqueues;
816   queues = GNUNET_malloc (ms);
817   /* Cloning is needed as the operation be released by waiting operations and
818      hence its nqueues memory ptr will be freed */
819   GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
820   for (i = 0; i < nqueues; i++)
821     recheck_waiting (queues[i]);
822   GNUNET_free (queues);
823 }
824
825
826 /**
827  * Marks and inactive operation as active.  This fuction should be called to
828  * ensure that the oprelease callback will not be called until it is either
829  * marked as inactive or released.
830  *
831  * @param op the operation to be marked as active
832  */
833 void
834 GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
835 {
836
837   GNUNET_assert (OP_STATE_INACTIVE == op->state);
838   change_state (op, OP_STATE_ACTIVE);
839 }
840
841
842 /**
843  * An operation is 'done' (was cancelled or finished); remove
844  * it from the queues and release associated resources.
845  *
846  * @param op operation that finished
847  */
848 void
849 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
850 {
851   struct QueueEntry *entry;  
852   struct OperationQueue *opq;
853   unsigned int i;
854
855   if (OP_STATE_INIT == op->state)
856   {
857     GNUNET_free (op);
858     return;
859   }
860   if (OP_STATE_READY == op->state)
861     rq_remove (op);
862   if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
863     GNUNET_TESTBED_operation_activate_ (op);
864   GNUNET_assert (NULL != op->queues);
865   GNUNET_assert (NULL != op->qentries);
866   for (i = 0; i < op->nqueues; i++)
867   {
868     entry = op->qentries[i];
869     remove_queue_entry (op, i);
870     opq = op->queues[i];
871     switch (op->state)
872     {      
873     case OP_STATE_INIT:
874     case OP_STATE_INACTIVE:
875       GNUNET_assert (0);
876       break;
877     case OP_STATE_WAITING:      
878       break;
879     case OP_STATE_READY:
880     case OP_STATE_ACTIVE:
881       GNUNET_assert (0 != opq->active);
882       GNUNET_assert (opq->active >= entry->nres);
883       opq->active -= entry->nres;
884       recheck_waiting (opq);
885       break;
886     }    
887     GNUNET_free (entry);
888   }
889   GNUNET_free_non_null (op->qentries);
890   GNUNET_free (op->queues);
891   GNUNET_free (op->nres);
892   if (NULL != op->release)
893     op->release (op->cb_cls);
894   GNUNET_free (op);
895 }
896
897
898 /* end of testbed_api_operations.c */