521645b712c9508b5dac27289629f02d20e65d6c
[oweals/gnunet.git] / src / testbed / testbed_api_operations.c
1 /*
2       This file is part of GNUnet
3       (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file testbed/testbed_api_operations.c
23  * @brief functions to manage operation queues
24  * @author Christian Grothoff
25  * @author Sree Harsha Totakura
26  */
27
28 #include "platform.h"
29 #include "testbed_api_operations.h"
30
31
32 /**
33  * An entry in the operation queue
34  */
35 struct QueueEntry
36 {
37   /**
38    * The next DLL pointer
39    */
40   struct QueueEntry *next;
41
42   /**
43    * The prev DLL pointer
44    */
45   struct QueueEntry *prev;
46
47   /**
48    * The operation this entry holds
49    */
50   struct GNUNET_TESTBED_Operation *op;
51
52   /**
53    * How many units of resources does the operation need
54    */
55   unsigned int nres;
56 };
57
58
59 /**
60  * Queue of operations where we can only support a certain
61  * number of concurrent operations of a particular type.
62  */
63 struct OperationQueue
64 {
65   /**
66    * DLL head for the wait queue.  Operations which are waiting for this
67    * operation queue are put here
68    */
69   struct QueueEntry *wq_head;
70
71   /**
72    * DLL tail for the wait queue.
73    */
74   struct QueueEntry *wq_tail;
75
76   /**
77    * DLL head for the ready queue.  Operations which are in this operation queue
78    * and are in ready state are put here
79    */
80   struct QueueEntry *rq_head;
81
82   /**
83    * DLL tail for the ready queue
84    */
85   struct QueueEntry *rq_tail;
86
87   /**
88    * DLL head for the active queue.  Operations which are in this operation
89    * queue and are currently active are put here
90    */
91   struct QueueEntry *aq_head;
92
93   /**
94    * DLL tail for the active queue.
95    */
96   struct QueueEntry *aq_tail;
97
98   /**
99    * DLL head for the inactive queue.  Operations which are inactive and can be
100    * evicted if the queues it holds are maxed out and another operation begins
101    * to wait on them.
102    */
103   struct QueueEntry *nq_head;
104
105   /**
106    * DLL tail for the inactive queue.
107    */
108   struct QueueEntry *nq_tail;
109
110   /**
111    * Number of operations that are currently active in this queue.
112    */
113   unsigned int active;
114
115   /**
116    * Max number of operations which can be active at any time in this queue
117    */
118   unsigned int max_active;
119
120 };
121
122
123 /**
124  * Operation state
125  */
126 enum OperationState
127 {
128   /**
129    * The operation is just created and is in initial state
130    */
131   OP_STATE_INIT,
132
133   /**
134    * The operation is currently waiting for resources
135    */
136   OP_STATE_WAITING,
137
138   /**
139    * The operation is ready to be started
140    */
141   OP_STATE_READY,
142
143   /**
144    * The operation has started and is active
145    */
146   OP_STATE_ACTIVE,
147
148   /**
149    * The operation is inactive.  It still holds resources on the operation
150    * queues.  However, this operation will be evicted when another operation
151    * requires resources from the maxed out queues this operation is holding
152    * resources from.
153    */
154   OP_STATE_INACTIVE
155 };
156
157
158 /**
159  * An entry in the ready queue (implemented as DLL)
160  */
161 struct ReadyQueueEntry
162 {
163   /**
164    * next ptr for DLL
165    */
166   struct ReadyQueueEntry *next;
167   
168   /**
169    * prev ptr for DLL
170    */
171   struct ReadyQueueEntry *prev;
172
173   /**
174    * The operation associated with this entry
175    */
176   struct GNUNET_TESTBED_Operation *op;
177 };
178
179
180 /**
181  * Opaque handle to an abstract operation to be executed by the testing framework.
182  */
183 struct GNUNET_TESTBED_Operation
184 {
185   /**
186    * Function to call when we have the resources to begin the operation.
187    */
188   OperationStart start;
189
190   /**
191    * Function to call to clean up after the operation (which may or may
192    * not have been started yet).
193    */
194   OperationRelease release;
195
196   /**
197    * Closure for callbacks.
198    */
199   void *cb_cls;
200
201   /**
202    * Array of operation queues this Operation belongs to.
203    */
204   struct OperationQueue **queues;
205
206   /**
207    * Array of operation queue entries corresponding to this operation in
208    * operation queues for this operation
209    */
210   struct QueueEntry **qentries;
211
212   /**
213    * Array of number of resources an operation need from each queue. The numbers
214    * in this array should correspond to the queues array
215    */
216   unsigned int *nres;
217
218   /**
219    * Entry corresponding to this operation in ready queue.  Will be NULL if the
220    * operation is not marked as READY
221    */
222   struct ReadyQueueEntry *rq_entry;
223
224   /**
225    * Number of queues in the operation queues array
226    */
227   unsigned int nqueues;
228
229   /**
230    * The state of the operation
231    */
232   enum OperationState state;
233
234 };
235
236 /**
237  * DLL head for the ready queue
238  */
239 struct ReadyQueueEntry *rq_head;
240
241 /**
242  * DLL tail for the ready queue
243  */
244 struct ReadyQueueEntry *rq_tail;
245
246 /**
247  * The id of the task to process the ready queue
248  */
249 GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
250
251
252 /**
253  * Removes a queue entry of an operation from one of the operation queues' lists
254  * depending on the state of the operation
255  *
256  * @param op the operation whose entry has to be removed
257  * @param index the index of the entry in the operation's array of queue entries
258  */
259 static void
260 remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
261 {
262   struct OperationQueue *opq;
263   struct QueueEntry *entry;
264   
265   opq = op->queues[index];
266   entry = op->qentries[index];
267   switch (op->state)
268   {
269   case OP_STATE_INIT:
270     GNUNET_assert (0);
271     break;
272   case OP_STATE_WAITING:
273     GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
274     break;
275   case OP_STATE_READY:
276     GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
277     break;
278   case OP_STATE_ACTIVE:
279     GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
280     break;
281   case OP_STATE_INACTIVE:
282     GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
283     break;
284   }
285 }
286
287
288 /**
289  * Changes the state of the operation while moving its associated queue entries
290  * in the operation's operation queues
291  *
292  * @param op the operation whose state has to be changed
293  * @param state the state the operation should have.  It cannot be OP_STATE_INIT
294  */
295 static void
296 change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
297 {
298   struct QueueEntry *entry;
299   struct OperationQueue *opq;
300   unsigned int cnt;
301   unsigned int s;
302   
303   GNUNET_assert (OP_STATE_INIT != state);
304   GNUNET_assert (NULL != op->queues);
305   GNUNET_assert (NULL != op->nres);
306   GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
307   GNUNET_assert (op->state != state);
308   for (cnt = 0; cnt < op->nqueues; cnt++)
309   {
310     if (OP_STATE_INIT == op->state)
311     {
312       entry = GNUNET_malloc (sizeof (struct QueueEntry));
313       entry->op = op;
314       entry->nres = op->nres[cnt];
315       s = cnt;
316       GNUNET_array_append (op->qentries, s, entry);      
317     }
318     else
319     {
320       entry = op->qentries[cnt];
321       remove_queue_entry (op, cnt);
322     }
323     opq = op->queues[cnt];
324     switch (state)
325     {
326     case OP_STATE_INIT:
327       GNUNET_assert (0);
328       break;
329     case OP_STATE_WAITING:
330       GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
331       break;
332     case OP_STATE_READY:
333       GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
334       break;
335     case OP_STATE_ACTIVE:
336       GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
337       break;
338     case OP_STATE_INACTIVE:
339       GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
340       break;
341     }
342   }
343   op->state = state;
344 }
345
346
347 /**
348  * Removes an operation from the ready queue.  Also stops the 'process_rq_task'
349  * if the given operation is the last one in the queue.
350  *
351  * @param op the operation to be removed
352  */
353 static void
354 rq_remove (struct GNUNET_TESTBED_Operation *op)
355 {  
356   GNUNET_assert (NULL != op->rq_entry);
357   GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
358   GNUNET_free (op->rq_entry);
359   op->rq_entry = NULL;
360   if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) )
361   {
362     GNUNET_SCHEDULER_cancel (process_rq_task_id);
363     process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
364   }
365 }
366
367
368 /**
369  * Processes the ready queue by calling the operation start callback of the
370  * operation at the head.  The operation is then removed from the queue.  The
371  * task is scheduled to run again immediately until no more operations are in
372  * the ready queue.
373  *
374  * @param cls NULL
375  * @param tc scheduler task context.  Not used.
376  */
377 static void
378 process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
379 {
380   struct GNUNET_TESTBED_Operation *op;
381
382   process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
383   GNUNET_assert (NULL != rq_head);
384   GNUNET_assert (NULL != (op = rq_head->op));
385   rq_remove (op);
386   if (NULL != rq_head)
387     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
388   change_state (op, OP_STATE_ACTIVE);
389   if (NULL != op->start)
390     op->start (op->cb_cls);  
391 }
392
393
394 /**
395  * Adds the operation to the ready queue and starts the 'process_rq_task'
396  *
397  * @param op the operation to be queued
398  */
399 static void
400 rq_add (struct GNUNET_TESTBED_Operation *op)
401 {
402   struct ReadyQueueEntry *rq_entry;
403
404   GNUNET_assert (NULL == op->rq_entry);
405   rq_entry = GNUNET_malloc (sizeof (struct ReadyQueueEntry));
406   rq_entry->op = op;
407   GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
408   op->rq_entry = rq_entry;
409   if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id)
410     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
411 }
412
413
414 /**
415  * Checks if the given operation queue is empty or not
416  *
417  * @param opq the operation queue
418  * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
419  *           otherwise
420  */
421 static int
422 is_queue_empty (struct OperationQueue *opq)
423 {
424   if ( (NULL != opq->wq_head)
425        || (NULL != opq->rq_head)
426        || (NULL != opq->aq_head)
427        || (NULL != opq->nq_head) )
428     return GNUNET_NO;
429   return GNUNET_YES;
430 }
431
432
433 /**
434  * Checks if the given operation queue has enough resources to provide for the
435  * operation of the given queue entry.  It also checks if any inactive
436  * operations are to be released in order to accommodate the needed resources
437  * and returns them as an array.
438  *
439  * @param opq the operation queue to check for resource accommodation
440  * @param entry the operation queue entry whose operation's resources are to be
441  *          accommodated
442  * @param ops_ pointer to return the array of operations which are to be released
443  *          in order to accommodate the new operation.  Can be NULL
444  * @param n_ops_ the number of operations in ops_
445  * @return GNUNET_YES if the given entry's operation can be accommodated in this
446  *           queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
447  *           be set to NULL and 0 respectively.
448  */
449 static int
450 decide_capacity (struct OperationQueue *opq,
451                  struct QueueEntry *entry,
452                  struct GNUNET_TESTBED_Operation ***ops_,
453                  unsigned int *n_ops_)
454 {
455   struct QueueEntry **evict_entries;
456   struct GNUNET_TESTBED_Operation **ops;
457   struct GNUNET_TESTBED_Operation *op;
458   unsigned int n_ops;
459   unsigned int n_evict_entries;
460   unsigned int need;
461   int deficit;
462   int rval;
463
464   GNUNET_assert (NULL != (op = entry->op));
465   GNUNET_assert (0 < (need = entry->nres));
466   ops = NULL;
467   n_ops = 0;
468   evict_entries = NULL;
469   n_evict_entries = 0;
470   rval = GNUNET_YES;
471   if (opq->active > opq->max_active)
472   {
473     rval = GNUNET_NO;
474     goto ret;
475   }
476   if ((opq->active + need) <= opq->max_active)
477     goto ret;
478   deficit = need - (opq->max_active - opq->active);
479   for (entry = opq->nq_head;
480        (0 < deficit) && (NULL != entry);
481        entry = entry->next)
482   {
483     GNUNET_array_append (evict_entries, n_evict_entries, entry);
484     deficit -= entry->nres;
485   }
486   if (0 < deficit)
487   {
488     rval = GNUNET_NO;
489     goto ret;
490   }
491   for (n_ops = 0; n_ops < n_evict_entries;)
492   {
493     op = evict_entries[n_ops]->op;
494     GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
495   }
496
497  ret:
498   GNUNET_free_non_null (evict_entries);  
499   if (NULL != ops_)
500     *ops_ = ops;
501   else
502     GNUNET_free (ops);
503   if (NULL != n_ops_)
504     *n_ops_ = n_ops;
505   return rval;
506 }
507
508
509 /**
510  * Merges an array of operations into another, eliminating duplicates.  No
511  * ordering is guaranteed.
512  *
513  * @param old the array into which the merging is done.
514  * @param n_old the number of operations in old array
515  * @param new the array from which operations are to be merged
516  * @param n_new the number of operations in new array
517  */
518 static void
519 merge_ops (struct GNUNET_TESTBED_Operation ***old,
520            unsigned int *n_old,
521            struct GNUNET_TESTBED_Operation **new,
522            unsigned int n_new)
523 {
524   struct GNUNET_TESTBED_Operation **cur;
525   unsigned int i;
526   unsigned int j;
527   unsigned int n_cur;
528  
529   GNUNET_assert (NULL != old);
530   n_cur = *n_old;
531   cur = *old;
532   for (i = 0; i < n_new; i++)
533   {    
534     for (j = 0; j < *n_old; j++)
535     {
536       if (new[i] == cur[j])
537         break;
538     }
539     if (j < *n_old)
540       continue;
541     GNUNET_array_append (cur, n_cur, new[j]);
542   }
543   *old = cur;
544   *n_old = n_cur;
545 }
546            
547
548
549 /**
550  * Checks for the readiness of an operation and schedules a operation start task
551  *
552  * @param op the operation
553  */
554 static int
555 check_readiness (struct GNUNET_TESTBED_Operation *op)
556 {
557   struct GNUNET_TESTBED_Operation **evict_ops;
558   struct GNUNET_TESTBED_Operation **ops;
559   unsigned int n_ops;
560   unsigned int n_evict_ops;
561   unsigned int i;
562
563   GNUNET_assert (NULL == op->rq_entry);
564   GNUNET_assert (OP_STATE_WAITING == op->state);
565   evict_ops = NULL;
566   n_evict_ops = 0;
567   for (i = 0; i < op->nqueues; i++)
568   {
569     ops = NULL;
570     n_ops = 0;
571     if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
572                                       &ops, &n_ops))
573     {
574       GNUNET_free_non_null (evict_ops);
575       return GNUNET_NO;
576     }
577     if (NULL == ops)
578       continue;
579     merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
580     GNUNET_free (ops);    
581   }
582   if (NULL != evict_ops)
583   {
584     for (i = 0; i < n_evict_ops; i++)
585       GNUNET_TESTBED_operation_release_ (evict_ops[i]);
586     GNUNET_free (evict_ops);
587     evict_ops = NULL;
588     /* Evicting the operations should schedule this operation */
589     GNUNET_assert (OP_STATE_READY == op->state);
590     return GNUNET_YES;
591   }
592   for (i = 0; i < op->nqueues; i++)
593     op->queues[i]->active += op->nres[i];
594   change_state (op, OP_STATE_READY);
595   rq_add (op);
596   return GNUNET_YES;
597 }
598
599
600 /**
601  * Defers a ready to be executed operation back to waiting
602  *
603  * @param op the operation to defer
604  */
605 static void
606 defer (struct GNUNET_TESTBED_Operation *op)
607 {
608   unsigned int i;
609
610   GNUNET_assert (OP_STATE_READY == op->state);
611   rq_remove (op);
612   for (i = 0; i < op->nqueues; i++)
613   {
614     GNUNET_assert (op->queues[i]->active >= op->nres[i]);
615     op->queues[i]->active -= op->nres[i];    
616   }
617   change_state (op, OP_STATE_WAITING);
618 }
619
620
621 /**
622  * Create an 'operation' to be performed.
623  *
624  * @param cls closure for the callbacks
625  * @param start function to call to start the operation
626  * @param release function to call to close down the operation
627  * @return handle to the operation
628  */
629 struct GNUNET_TESTBED_Operation *
630 GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
631                                   OperationRelease release)
632 {
633   struct GNUNET_TESTBED_Operation *op;
634
635   op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation));
636   op->start = start;
637   op->state = OP_STATE_INIT;
638   op->release = release;
639   op->cb_cls = cls;
640   return op;
641 }
642
643
644 /**
645  * Create an operation queue.
646  *
647  * @param max_active maximum number of operations in this
648  *        queue that can be active in parallel at the same time
649  * @return handle to the queue
650  */
651 struct OperationQueue *
652 GNUNET_TESTBED_operation_queue_create_ (unsigned int max_active)
653 {
654   struct OperationQueue *queue;
655
656   queue = GNUNET_malloc (sizeof (struct OperationQueue));
657   queue->max_active = max_active;
658   return queue;
659 }
660
661
662 /**
663  * Destroy an operation queue.  The queue MUST be empty
664  * at this time.
665  *
666  * @param queue queue to destroy
667  */
668 void
669 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
670 {
671   GNUNET_break (GNUNET_YES == is_queue_empty (queue));
672   GNUNET_free (queue);
673 }
674
675
676 /**
677  * Destroys the operation queue if it is empty.  If not empty return GNUNET_NO.
678  *
679  * @param queue the queue to destroy if empty
680  * @return GNUNET_YES if the queue is destroyed.  GNUNET_NO if not (because it
681  *           is not empty)
682  */
683 int
684 GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
685 {
686   if (GNUNET_NO == is_queue_empty (queue))
687     return GNUNET_NO;
688   GNUNET_TESTBED_operation_queue_destroy_ (queue);
689   return GNUNET_YES;
690 }
691
692
693 /**
694  * Rechecks if any of the operations in the given operation queue's waiting list
695  * can be made active
696  *
697  * @param opq the operation queue
698  */
699 static void
700 recheck_waiting (struct OperationQueue *opq)
701 {
702   struct QueueEntry *entry;
703   struct QueueEntry *entry2;
704
705   entry = opq->wq_head;
706   while (NULL != entry)
707   {
708     entry2 = entry->next;
709     if (GNUNET_NO == check_readiness (entry->op))
710       break;
711     entry = entry2;
712   }
713 }
714
715
716 /**
717  * Function to reset the maximum number of operations in the given queue. If
718  * max_active is lesser than the number of currently active operations, the
719  * active operations are not stopped immediately.
720  *
721  * @param queue the operation queue which has to be modified
722  * @param max_active the new maximum number of active operations
723  */
724 void
725 GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
726                                                   unsigned int max_active)
727 {
728   struct QueueEntry *entry;
729
730   queue->max_active = max_active;
731   while ( (queue->active > queue->max_active)
732           && (NULL != (entry = queue->rq_head)) )
733     defer (entry->op);
734   recheck_waiting (queue);
735 }
736
737
738 /**
739  * Add an operation to a queue.  An operation can be in multiple queues at
740  * once. Once the operation is inserted into all the queues
741  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
742  * waiting for the operation to become active.
743  *
744  * @param queue queue to add the operation to
745  * @param op operation to add to the queue
746  * @param nres the number of units of the resources of queue needed by the
747  *          operation. Should be greater than 0.
748  */
749 void
750 GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
751                                          struct GNUNET_TESTBED_Operation *op,
752                                          unsigned int nres)
753 {
754   unsigned int qsize;
755
756   GNUNET_assert (0 < nres);
757   qsize = op->nqueues;
758   GNUNET_array_append (op->queues, op->nqueues, queue);
759   GNUNET_array_append (op->nres, qsize, nres);
760   GNUNET_assert (qsize == op->nqueues);
761 }
762
763
764 /**
765  * Add an operation to a queue.  An operation can be in multiple queues at
766  * once. Once the operation is inserted into all the queues
767  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
768  * waiting for the operation to become active. The operation is assumed to take
769  * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
770  * requires more than 1
771  *
772  * @param queue queue to add the operation to
773  * @param op operation to add to the queue
774  */
775 void
776 GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
777                                         struct GNUNET_TESTBED_Operation *op)
778 {
779   return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
780 }
781
782
783 /**
784  * Marks the given operation as waiting on the queues.  Once all queues permit
785  * the operation to become active, the operation will be activated.  The actual
786  * activation will occur in a separate task (thus allowing multiple queue
787  * insertions to be made without having the first one instantly trigger the
788  * operation if the first queue has sufficient resources).
789  *
790  * @param op the operation to marks as waiting
791  */
792 void
793 GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
794 {
795   GNUNET_assert (NULL == op->rq_entry);
796   change_state (op, OP_STATE_WAITING);
797   (void) check_readiness (op);
798 }
799
800
801 /**
802  * Marks an active operation as inactive - the operation will be kept in a
803  * ready-to-be-released state and continues to hold resources until another
804  * operation contents for them.
805  *
806  * @param op the operation to be marked as inactive.  The operation start
807  *          callback should have been called before for this operation to mark
808  *          it as inactive.
809  */
810 void
811 GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
812 {
813   struct OperationQueue **queues;
814   size_t ms;
815   unsigned int nqueues;
816   unsigned int i;
817
818   GNUNET_assert (OP_STATE_ACTIVE == op->state);
819   change_state (op, OP_STATE_INACTIVE);
820   nqueues = op->nqueues;
821   ms = sizeof (struct OperationQueue *) * nqueues;
822   queues = GNUNET_malloc (ms);
823   /* Cloning is needed as the operation be released by waiting operations and
824      hence its nqueues memory ptr will be freed */
825   GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
826   for (i = 0; i < nqueues; i++)
827     recheck_waiting (queues[i]);
828   GNUNET_free (queues);
829 }
830
831
832 /**
833  * Marks and inactive operation as active.  This fuction should be called to
834  * ensure that the oprelease callback will not be called until it is either
835  * marked as inactive or released.
836  *
837  * @param op the operation to be marked as active
838  */
839 void
840 GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
841 {
842
843   GNUNET_assert (OP_STATE_INACTIVE == op->state);
844   change_state (op, OP_STATE_ACTIVE);
845 }
846
847
848 /**
849  * An operation is 'done' (was cancelled or finished); remove
850  * it from the queues and release associated resources.
851  *
852  * @param op operation that finished
853  */
854 void
855 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
856 {
857   struct QueueEntry *entry;  
858   struct OperationQueue *opq;
859   unsigned int i;
860
861   if (OP_STATE_INIT == op->state)
862   {
863     GNUNET_free (op);
864     return;
865   }
866   if (OP_STATE_READY == op->state)
867     rq_remove (op);
868   if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
869     GNUNET_TESTBED_operation_activate_ (op);
870   GNUNET_assert (NULL != op->queues);
871   GNUNET_assert (NULL != op->qentries);
872   for (i = 0; i < op->nqueues; i++)
873   {
874     entry = op->qentries[i];
875     remove_queue_entry (op, i);
876     opq = op->queues[i];
877     switch (op->state)
878     {      
879     case OP_STATE_INIT:
880     case OP_STATE_INACTIVE:
881       GNUNET_assert (0);
882       break;
883     case OP_STATE_WAITING:      
884       break;
885     case OP_STATE_READY:
886     case OP_STATE_ACTIVE:
887       GNUNET_assert (0 != opq->active);
888       GNUNET_assert (opq->active >= entry->nres);
889       opq->active -= entry->nres;
890       recheck_waiting (opq);
891       break;
892     }    
893     GNUNET_free (entry);
894   }
895   GNUNET_free_non_null (op->qentries);
896   GNUNET_free (op->queues);
897   GNUNET_free (op->nres);
898   if (NULL != op->release)
899     op->release (op->cb_cls);
900   GNUNET_free (op);
901 }
902
903
904 /* end of testbed_api_operations.c */