2a559a90e4a8c68c04c44e9bfc2c1a68e33d5631
[oweals/gnunet.git] / src / testbed / testbed_api_operations.c
1 /*
2       This file is part of GNUnet
3       (C) 2008--2013 Christian Grothoff (and other contributing authors)
4
5       GNUnet is free software; you can redistribute it and/or modify
6       it under the terms of the GNU General Public License as published
7       by the Free Software Foundation; either version 3, or (at your
8       option) any later version.
9
10       GNUnet is distributed in the hope that it will be useful, but
11       WITHOUT ANY WARRANTY; without even the implied warranty of
12       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13       General Public License for more details.
14
15       You should have received a copy of the GNU General Public License
16       along with GNUnet; see the file COPYING.  If not, write to the
17       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18       Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file testbed/testbed_api_operations.c
23  * @brief functions to manage operation queues
24  * @author Christian Grothoff
25  * @author Sree Harsha Totakura
26  */
27
28 #include "platform.h"
29 #include "testbed_api_operations.h"
30 #include "testbed_api_sd.h"
31
32
33 /**
34  * An entry in the operation queue
35  */
36 struct QueueEntry
37 {
38   /**
39    * The next DLL pointer
40    */
41   struct QueueEntry *next;
42
43   /**
44    * The prev DLL pointer
45    */
46   struct QueueEntry *prev;
47
48   /**
49    * The operation this entry holds
50    */
51   struct GNUNET_TESTBED_Operation *op;
52
53   /**
54    * How many units of resources does the operation need
55    */
56   unsigned int nres;
57 };
58
59
60 /**
61  * Queue of operations where we can only support a certain
62  * number of concurrent operations of a particular type.
63  */
64 struct OperationQueue;
65
66
67 /**
68  * A slot to record time taken by an operation
69  */
70 struct TimeSlot
71 {
72   /**
73    * DLL next pointer
74    */
75   struct TimeSlot *next;
76
77   /**
78    * DLL prev pointer
79    */
80   struct TimeSlot *prev;
81
82   /**
83    * This operation queue to which this time slot belongs to
84    */
85   struct OperationQueue *queue;
86
87   /**
88    * The operation to which this timeslot is currently allocated to
89    */
90   struct GNUNET_TESTBED_Operation *op;
91
92   /**
93    * Accumulated time
94    */
95   struct GNUNET_TIME_Relative tsum;
96
97   /**
98    * Number of timing values accumulated
99    */
100   unsigned int nvals;
101 };
102
103
104 /**
105  * Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE
106  */
107 struct FeedbackCtx
108 {
109   /**
110    * Handle for calculating standard deviation
111    */
112   struct SDHandle *sd;
113
114   /**
115    * Head for DLL of time slots which are free to be allocated to operations
116    */
117   struct TimeSlot *alloc_head;
118
119   /**
120    * Tail for DLL of time slots which are free to be allocated to operations
121    */
122   struct TimeSlot *alloc_tail;
123
124   /**
125    * Pointer to the chunk of time slots.  Free all time slots at a time using
126    * this pointer.
127    */
128   struct TimeSlot *tslots_freeptr;
129
130   /**
131    * Number of time slots filled so far
132    */
133   unsigned int tslots_filled;
134
135   /**
136    * Bound on the maximum number of operations which can be active
137    */
138   unsigned int max_active_bound;
139
140   /**
141    * Number of operations that have failed
142    */
143   unsigned int nfailed;
144
145 };
146
147
148 /**
149  * Queue of operations where we can only support a certain
150  * number of concurrent operations of a particular type.
151  */
152 struct OperationQueue
153 {
154   /**
155    * DLL head for the wait queue.  Operations which are waiting for this
156    * operation queue are put here
157    */
158   struct QueueEntry *wq_head;
159
160   /**
161    * DLL tail for the wait queue.
162    */
163   struct QueueEntry *wq_tail;
164
165   /**
166    * DLL head for the ready queue.  Operations which are in this operation queue
167    * and are in ready state are put here
168    */
169   struct QueueEntry *rq_head;
170
171   /**
172    * DLL tail for the ready queue
173    */
174   struct QueueEntry *rq_tail;
175
176   /**
177    * DLL head for the active queue.  Operations which are in this operation
178    * queue and are currently active are put here
179    */
180   struct QueueEntry *aq_head;
181
182   /**
183    * DLL tail for the active queue.
184    */
185   struct QueueEntry *aq_tail;
186
187   /**
188    * DLL head for the inactive queue.  Operations which are inactive and can be
189    * evicted if the queues it holds are maxed out and another operation begins
190    * to wait on them.
191    */
192   struct QueueEntry *nq_head;
193
194   /**
195    * DLL tail for the inactive queue.
196    */
197   struct QueueEntry *nq_tail;
198
199   /**
200    * Feedback context; only relevant for adaptive operation queues.  NULL for
201    * fixed operation queues
202    */
203   struct FeedbackCtx *fctx;
204
205   /**
206    * The type of this opeartion queue
207    */
208   enum OperationQueueType type;
209
210   /**
211    * Number of operations that are currently active in this queue.
212    */
213   unsigned int active;
214
215   /**
216    * Max number of operations which can be active at any time in this queue.
217    * This value can be changed either by calling
218    * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
219    * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
220    */
221   unsigned int max_active;
222
223 };
224
225
226 /**
227  * Operation state
228  */
229 enum OperationState
230 {
231   /**
232    * The operation is just created and is in initial state
233    */
234   OP_STATE_INIT,
235
236   /**
237    * The operation is currently waiting for resources
238    */
239   OP_STATE_WAITING,
240
241   /**
242    * The operation is ready to be started
243    */
244   OP_STATE_READY,
245
246   /**
247    * The operation has started and is active
248    */
249   OP_STATE_ACTIVE,
250
251   /**
252    * The operation is inactive.  It still holds resources on the operation
253    * queues.  However, this operation will be evicted when another operation
254    * requires resources from the maxed out queues this operation is holding
255    * resources from.
256    */
257   OP_STATE_INACTIVE
258 };
259
260
261 /**
262  * An entry in the ready queue (implemented as DLL)
263  */
264 struct ReadyQueueEntry
265 {
266   /**
267    * next ptr for DLL
268    */
269   struct ReadyQueueEntry *next;
270
271   /**
272    * prev ptr for DLL
273    */
274   struct ReadyQueueEntry *prev;
275
276   /**
277    * The operation associated with this entry
278    */
279   struct GNUNET_TESTBED_Operation *op;
280 };
281
282
283 /**
284  * Opaque handle to an abstract operation to be executed by the testing framework.
285  */
286 struct GNUNET_TESTBED_Operation
287 {
288   /**
289    * Function to call when we have the resources to begin the operation.
290    */
291   OperationStart start;
292
293   /**
294    * Function to call to clean up after the operation (which may or may
295    * not have been started yet).
296    */
297   OperationRelease release;
298
299   /**
300    * Closure for callbacks.
301    */
302   void *cb_cls;
303
304   /**
305    * Array of operation queues this Operation belongs to.
306    */
307   struct OperationQueue **queues;
308
309   /**
310    * Array of operation queue entries corresponding to this operation in
311    * operation queues for this operation
312    */
313   struct QueueEntry **qentries;
314
315   /**
316    * Array of number of resources an operation need from each queue. The numbers
317    * in this array should correspond to the queues array
318    */
319   unsigned int *nres;
320
321   /**
322    * Entry corresponding to this operation in ready queue.  Will be NULL if the
323    * operation is not marked as READY
324    */
325   struct ReadyQueueEntry *rq_entry;
326
327   /**
328    * Head pointer for DLL of tslots allocated to this operation
329    */
330   struct TimeSlot *tslots_head;
331
332   /**
333    * Tail pointer for DLL of tslots allocated to this operation
334    */
335   struct TimeSlot *tslots_tail;
336
337   /**
338    * The time at which the operation is started
339    */
340   struct GNUNET_TIME_Absolute tstart;
341
342   /**
343    * Number of queues in the operation queues array
344    */
345   unsigned int nqueues;
346
347   /**
348    * The state of the operation
349    */
350   enum OperationState state;
351
352   /**
353    * Is this a failed operation?
354    */
355   int failed;
356
357 };
358
359 /**
360  * DLL head for the ready queue
361  */
362 struct ReadyQueueEntry *rq_head;
363
364 /**
365  * DLL tail for the ready queue
366  */
367 struct ReadyQueueEntry *rq_tail;
368
369 /**
370  * The id of the task to process the ready queue
371  */
372 GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
373
374
375 /**
376  * Assigns the given operation a time slot from the given operation queue
377  *
378  * @param op the operation
379  * @param queue the operation queue
380  * @return the timeslot
381  */
382 static void
383 assign_timeslot (struct GNUNET_TESTBED_Operation *op,
384                  struct OperationQueue *queue)
385 {
386   struct FeedbackCtx *fctx = queue->fctx;
387   struct TimeSlot *tslot;
388
389   GNUNET_assert (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type);
390   tslot = fctx->alloc_head;
391   GNUNET_assert (NULL != tslot);
392   GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
393   GNUNET_CONTAINER_DLL_insert_tail (op->tslots_head, op->tslots_tail, tslot);
394   tslot->op = op;
395 }
396
397
398 /**
399  * Removes a queue entry of an operation from one of the operation queues' lists
400  * depending on the state of the operation
401  *
402  * @param op the operation whose entry has to be removed
403  * @param index the index of the entry in the operation's array of queue entries
404  */
405 static void
406 remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
407 {
408   struct OperationQueue *opq;
409   struct QueueEntry *entry;
410
411   opq = op->queues[index];
412   entry = op->qentries[index];
413   switch (op->state)
414   {
415   case OP_STATE_INIT:
416     GNUNET_assert (0);
417     break;
418   case OP_STATE_WAITING:
419     GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
420     break;
421   case OP_STATE_READY:
422     GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
423     break;
424   case OP_STATE_ACTIVE:
425     GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
426     break;
427   case OP_STATE_INACTIVE:
428     GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
429     break;
430   }
431 }
432
433
434 /**
435  * Changes the state of the operation while moving its associated queue entries
436  * in the operation's operation queues
437  *
438  * @param op the operation whose state has to be changed
439  * @param state the state the operation should have.  It cannot be OP_STATE_INIT
440  */
441 static void
442 change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
443 {
444   struct QueueEntry *entry;
445   struct OperationQueue *opq;
446   unsigned int cnt;
447   unsigned int s;
448
449   GNUNET_assert (OP_STATE_INIT != state);
450   GNUNET_assert (NULL != op->queues);
451   GNUNET_assert (NULL != op->nres);
452   GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
453   GNUNET_assert (op->state != state);
454   for (cnt = 0; cnt < op->nqueues; cnt++)
455   {
456     if (OP_STATE_INIT == op->state)
457     {
458       entry = GNUNET_new (struct QueueEntry);
459       entry->op = op;
460       entry->nres = op->nres[cnt];
461       s = cnt;
462       GNUNET_array_append (op->qentries, s, entry);
463     }
464     else
465     {
466       entry = op->qentries[cnt];
467       remove_queue_entry (op, cnt);
468     }
469     opq = op->queues[cnt];
470     switch (state)
471     {
472     case OP_STATE_INIT:
473       GNUNET_assert (0);
474       break;
475     case OP_STATE_WAITING:
476       GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry);
477       break;
478     case OP_STATE_READY:
479       GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry);
480       break;
481     case OP_STATE_ACTIVE:
482       GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry);
483       break;
484     case OP_STATE_INACTIVE:
485       GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry);
486       break;
487     }
488   }
489   op->state = state;
490 }
491
492
493 /**
494  * Removes an operation from the ready queue.  Also stops the 'process_rq_task'
495  * if the given operation is the last one in the queue.
496  *
497  * @param op the operation to be removed
498  */
499 static void
500 rq_remove (struct GNUNET_TESTBED_Operation *op)
501 {
502   GNUNET_assert (NULL != op->rq_entry);
503   GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
504   GNUNET_free (op->rq_entry);
505   op->rq_entry = NULL;
506   if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) )
507   {
508     GNUNET_SCHEDULER_cancel (process_rq_task_id);
509     process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
510   }
511 }
512
513
514 /**
515  * Processes the ready queue by calling the operation start callback of the
516  * operation at the head.  The operation is then removed from the queue.  The
517  * task is scheduled to run again immediately until no more operations are in
518  * the ready queue.
519  *
520  * @param cls NULL
521  * @param tc scheduler task context.  Not used.
522  */
523 static void
524 process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
525 {
526   struct GNUNET_TESTBED_Operation *op;
527   struct OperationQueue *queue;
528   unsigned int cnt;
529
530   process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
531   GNUNET_assert (NULL != rq_head);
532   GNUNET_assert (NULL != (op = rq_head->op));
533   rq_remove (op);
534   if (NULL != rq_head)
535     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
536   change_state (op, OP_STATE_ACTIVE);
537   for (cnt = 0; cnt < op->nqueues; cnt++)
538   {
539     queue = op->queues[cnt];
540     if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
541       assign_timeslot (op, queue);
542   }
543   op->tstart = GNUNET_TIME_absolute_get ();
544   if (NULL != op->start)
545     op->start (op->cb_cls);
546 }
547
548
549 /**
550  * Adds the operation to the ready queue and starts the 'process_rq_task'
551  *
552  * @param op the operation to be queued
553  */
554 static void
555 rq_add (struct GNUNET_TESTBED_Operation *op)
556 {
557   struct ReadyQueueEntry *rq_entry;
558
559   GNUNET_assert (NULL == op->rq_entry);
560   rq_entry = GNUNET_new (struct ReadyQueueEntry);
561   rq_entry->op = op;
562   GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
563   op->rq_entry = rq_entry;
564   if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id)
565     process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
566 }
567
568
569 /**
570  * Checks if the given operation queue is empty or not
571  *
572  * @param opq the operation queue
573  * @return GNUNET_YES if the given operation queue has no operations; GNUNET_NO
574  *           otherwise
575  */
576 static int
577 is_queue_empty (struct OperationQueue *opq)
578 {
579   if ( (NULL != opq->wq_head)
580        || (NULL != opq->rq_head)
581        || (NULL != opq->aq_head)
582        || (NULL != opq->nq_head) )
583     return GNUNET_NO;
584   return GNUNET_YES;
585 }
586
587
588 /**
589  * Checks if the given operation queue has enough resources to provide for the
590  * operation of the given queue entry.  It also checks if any inactive
591  * operations are to be released in order to accommodate the needed resources
592  * and returns them as an array.
593  *
594  * @param opq the operation queue to check for resource accommodation
595  * @param entry the operation queue entry whose operation's resources are to be
596  *          accommodated
597  * @param ops_ pointer to return the array of operations which are to be released
598  *          in order to accommodate the new operation.  Can be NULL
599  * @param n_ops_ the number of operations in ops_
600  * @return GNUNET_YES if the given entry's operation can be accommodated in this
601  *           queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
602  *           be set to NULL and 0 respectively.
603  */
604 static int
605 decide_capacity (struct OperationQueue *opq,
606                  struct QueueEntry *entry,
607                  struct GNUNET_TESTBED_Operation ***ops_,
608                  unsigned int *n_ops_)
609 {
610   struct QueueEntry **evict_entries;
611   struct GNUNET_TESTBED_Operation **ops;
612   struct GNUNET_TESTBED_Operation *op;
613   unsigned int n_ops;
614   unsigned int n_evict_entries;
615   unsigned int need;
616   int deficit;
617   int rval;
618
619   GNUNET_assert (NULL != (op = entry->op));
620   GNUNET_assert (0 < (need = entry->nres));
621   ops = NULL;
622   n_ops = 0;
623   evict_entries = NULL;
624   n_evict_entries = 0;
625   rval = GNUNET_YES;
626   if (opq->active > opq->max_active)
627   {
628     rval = GNUNET_NO;
629     goto ret;
630   }
631   if ((opq->active + need) <= opq->max_active)
632     goto ret;
633   deficit = need - (opq->max_active - opq->active);
634   for (entry = opq->nq_head;
635        (0 < deficit) && (NULL != entry);
636        entry = entry->next)
637   {
638     GNUNET_array_append (evict_entries, n_evict_entries, entry);
639     deficit -= entry->nres;
640   }
641   if (0 < deficit)
642   {
643     rval = GNUNET_NO;
644     goto ret;
645   }
646   for (n_ops = 0; n_ops < n_evict_entries;)
647   {
648     op = evict_entries[n_ops]->op;
649     GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
650   }
651
652  ret:
653   GNUNET_free_non_null (evict_entries);
654   if (NULL != ops_)
655     *ops_ = ops;
656   else
657     GNUNET_free (ops);
658   if (NULL != n_ops_)
659     *n_ops_ = n_ops;
660   return rval;
661 }
662
663
664 /**
665  * Merges an array of operations into another, eliminating duplicates.  No
666  * ordering is guaranteed.
667  *
668  * @param old the array into which the merging is done.
669  * @param n_old the number of operations in old array
670  * @param new the array from which operations are to be merged
671  * @param n_new the number of operations in new array
672  */
673 static void
674 merge_ops (struct GNUNET_TESTBED_Operation ***old,
675            unsigned int *n_old,
676            struct GNUNET_TESTBED_Operation **new,
677            unsigned int n_new)
678 {
679   struct GNUNET_TESTBED_Operation **cur;
680   unsigned int i;
681   unsigned int j;
682   unsigned int n_cur;
683
684   GNUNET_assert (NULL != old);
685   n_cur = *n_old;
686   cur = *old;
687   for (i = 0; i < n_new; i++)
688   {
689     for (j = 0; j < *n_old; j++)
690     {
691       if (new[i] == cur[j])
692         break;
693     }
694     if (j < *n_old)
695       continue;
696     GNUNET_array_append (cur, n_cur, new[j]);
697   }
698   *old = cur;
699   *n_old = n_cur;
700 }
701
702
703
704 /**
705  * Checks for the readiness of an operation and schedules a operation start task
706  *
707  * @param op the operation
708  */
709 static int
710 check_readiness (struct GNUNET_TESTBED_Operation *op)
711 {
712   struct GNUNET_TESTBED_Operation **evict_ops;
713   struct GNUNET_TESTBED_Operation **ops;
714   unsigned int n_ops;
715   unsigned int n_evict_ops;
716   unsigned int i;
717
718   GNUNET_assert (NULL == op->rq_entry);
719   GNUNET_assert (OP_STATE_WAITING == op->state);
720   evict_ops = NULL;
721   n_evict_ops = 0;
722   for (i = 0; i < op->nqueues; i++)
723   {
724     ops = NULL;
725     n_ops = 0;
726     if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
727                                       &ops, &n_ops))
728     {
729       GNUNET_free_non_null (evict_ops);
730       return GNUNET_NO;
731     }
732     if (NULL == ops)
733       continue;
734     merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
735     GNUNET_free (ops);
736   }
737   if (NULL != evict_ops)
738   {
739     for (i = 0; i < n_evict_ops; i++)
740       GNUNET_TESTBED_operation_release_ (evict_ops[i]);
741     GNUNET_free (evict_ops);
742     evict_ops = NULL;
743     /* Evicting the operations should schedule this operation */
744     GNUNET_assert (OP_STATE_READY == op->state);
745     return GNUNET_YES;
746   }
747   for (i = 0; i < op->nqueues; i++)
748     op->queues[i]->active += op->nres[i];
749   change_state (op, OP_STATE_READY);
750   rq_add (op);
751   return GNUNET_YES;
752 }
753
754
755 /**
756  * Defers a ready to be executed operation back to waiting
757  *
758  * @param op the operation to defer
759  */
760 static void
761 defer (struct GNUNET_TESTBED_Operation *op)
762 {
763   unsigned int i;
764
765   GNUNET_assert (OP_STATE_READY == op->state);
766   rq_remove (op);
767   for (i = 0; i < op->nqueues; i++)
768   {
769     GNUNET_assert (op->queues[i]->active >= op->nres[i]);
770     op->queues[i]->active -= op->nres[i];
771   }
772   change_state (op, OP_STATE_WAITING);
773 }
774
775
776 /**
777  * Cleanups the array of timeslots of an operation queue.  For each time slot in
778  * the array, if it is allocated to an operation, it will be deallocated from
779  * the operation
780  *
781  * @param queue the operation queue
782  */
783 static void
784 cleanup_tslots (struct OperationQueue *queue)
785 {
786   struct FeedbackCtx *fctx = queue->fctx;
787   struct TimeSlot *tslot;
788   struct GNUNET_TESTBED_Operation *op;
789   unsigned int cnt;
790
791   GNUNET_assert (NULL != fctx);
792   for (cnt = 0; cnt < queue->max_active; cnt++)
793   {
794     tslot = &fctx->tslots_freeptr[cnt];
795     op = tslot->op;
796     if (NULL == op)
797       continue;
798     GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
799   }
800   GNUNET_free_non_null (fctx->tslots_freeptr);
801   fctx->tslots_freeptr = NULL;
802   fctx->alloc_head = NULL;
803   fctx->alloc_tail = NULL;
804   fctx->tslots_filled = 0;
805 }
806
807
808 /**
809  * Cleansup the existing timing slots and sets new timing slots in the given
810  * queue to accommodate given number of max active operations.
811  *
812  * @param queue the queue
813  * @param n the number of maximum active operations.  If n is greater than the
814  *   maximum limit set while creating the queue, then the minimum of these two
815  *   will be selected as n
816  */
817 static void
818 adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
819 {
820   struct FeedbackCtx *fctx = queue->fctx;
821   struct TimeSlot *tslot;
822   unsigned int cnt;
823
824   cleanup_tslots (queue);
825   n = GNUNET_MIN (n ,fctx->max_active_bound);
826   fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
827   fctx->nfailed = 0;
828   for (cnt = 0; cnt < n; cnt++)
829   {
830     tslot = &fctx->tslots_freeptr[cnt];
831     tslot->queue = queue;
832     GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot);
833   }
834   GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
835 }
836
837
838 /**
839  * Adapts parallelism in an adaptive queue by using the statistical data from
840  * the feedback context.
841  *
842  * @param queue the queue
843  */
844 static void
845 adapt_parallelism (struct OperationQueue *queue)
846 {
847   struct GNUNET_TIME_Relative avg;
848   struct FeedbackCtx *fctx;
849   struct TimeSlot *tslot;
850   int sd;
851   unsigned int nvals;
852   unsigned int cnt;
853
854   avg = GNUNET_TIME_UNIT_ZERO;
855   nvals = 0;
856   fctx = queue->fctx;
857   for (cnt = 0; cnt < queue->max_active; cnt++)
858   {
859     tslot = &fctx->tslots_freeptr[cnt];
860     avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
861     nvals += tslot->nvals;
862   }
863   GNUNET_assert (nvals >= queue->max_active);
864   GNUNET_assert (fctx->nfailed <= nvals);
865   nvals -= fctx->nfailed;
866   if (0 == nvals)
867   {
868     if (1 == queue->max_active)
869       adaptive_queue_set_max_active (queue, 1);
870     else
871       adaptive_queue_set_max_active (queue, queue->max_active / 2);
872     return;
873   }
874   avg = GNUNET_TIME_relative_divide (avg, nvals);
875   if (GNUNET_SYSERR ==
876       GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
877                                            (unsigned int) avg.rel_value_us,
878                                            &sd))
879   {
880     GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
881     adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
882     return;
883   }
884   if (1 == sd)
885     adaptive_queue_set_max_active (queue, queue->max_active - 1);
886   if (2 <= sd)
887     adaptive_queue_set_max_active (queue, queue->max_active / 2);
888   if (-1 == sd)
889     adaptive_queue_set_max_active (queue, queue->max_active + 1);
890   if (sd <= -2)
891     adaptive_queue_set_max_active (queue, queue->max_active * 2);
892
893 #if 0                           /* old algorithm */
894   if (sd < 0)
895     sd = 0;
896   GNUNET_assert (0 <= sd);
897   GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
898   if (0 == sd)
899   {
900     adaptive_queue_set_max_active (queue, queue->max_active * 2);
901     return;
902   }
903   if (1 == sd)
904   {
905     adaptive_queue_set_max_active (queue, queue->max_active + 1);
906     return;
907   }
908   if (1 == queue->max_active)
909   {
910     adaptive_queue_set_max_active (queue, 1);
911     return;
912   }
913   if (2 == sd)
914   {
915     adaptive_queue_set_max_active (queue, queue->max_active - 1);
916     return;
917   }
918   adaptive_queue_set_max_active (queue, queue->max_active / 2);
919 #endif
920 }
921
922
923 /**
924  * update tslots with the operation's completion time.  Additionally, if
925  * updating a timeslot makes all timeslots filled in an adaptive operation
926  * queue, call adapt_parallelism() for that queue.
927  *
928  * @param op the operation
929  */
930 static void
931 update_tslots (struct GNUNET_TESTBED_Operation *op)
932 {
933   struct OperationQueue *queue;
934   struct GNUNET_TIME_Relative t;
935   struct TimeSlot *tslot;
936   struct FeedbackCtx *fctx;
937
938   t = GNUNET_TIME_absolute_get_duration (op->tstart);
939   while (NULL != (tslot = op->tslots_head)) /* update time slots */
940   {
941     queue = tslot->queue;
942     fctx = queue->fctx;
943     GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
944     tslot->op = NULL;
945     GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
946                                       tslot);
947     if (op->failed)
948       fctx->nfailed++;
949     tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
950     if (0 != tslot->nvals++)
951       continue;
952     fctx->tslots_filled++;
953     if (queue->max_active == fctx->tslots_filled)
954       adapt_parallelism (queue);
955   }
956 }
957
958
959 /**
960  * Create an 'operation' to be performed.
961  *
962  * @param cls closure for the callbacks
963  * @param start function to call to start the operation
964  * @param release function to call to close down the operation
965  * @return handle to the operation
966  */
967 struct GNUNET_TESTBED_Operation *
968 GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
969                                   OperationRelease release)
970 {
971   struct GNUNET_TESTBED_Operation *op;
972
973   op = GNUNET_new (struct GNUNET_TESTBED_Operation);
974   op->start = start;
975   op->state = OP_STATE_INIT;
976   op->release = release;
977   op->cb_cls = cls;
978   return op;
979 }
980
981
982 /**
983  * Create an operation queue.
984  *
985  * @param type the type of operation queue
986  * @param max_active maximum number of operations in this
987  *        queue that can be active in parallel at the same time
988  * @return handle to the queue
989  */
990 struct OperationQueue *
991 GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
992                                         unsigned int max_active)
993 {
994   struct OperationQueue *queue;
995   struct FeedbackCtx *fctx;
996
997   queue = GNUNET_new (struct OperationQueue);
998   queue->type = type;
999   if (OPERATION_QUEUE_TYPE_FIXED == type)
1000   {
1001     queue->max_active = max_active;
1002   }
1003   else
1004   {
1005     fctx = GNUNET_new (struct FeedbackCtx);
1006     fctx->max_active_bound = max_active;
1007     fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
1008     queue->fctx = fctx;
1009     adaptive_queue_set_max_active (queue, 4); /* start with 4 */
1010   }
1011   return queue;
1012 }
1013
1014
1015 /**
1016  * Destroy an operation queue.  The queue MUST be empty
1017  * at this time.
1018  *
1019  * @param queue queue to destroy
1020  */
1021 void
1022 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
1023 {
1024   struct FeedbackCtx *fctx;
1025
1026   GNUNET_break (GNUNET_YES == is_queue_empty (queue));
1027   if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
1028   {
1029     cleanup_tslots (queue);
1030     fctx = queue->fctx;
1031     GNUNET_TESTBED_SD_destroy_ (fctx->sd);
1032     GNUNET_free (fctx);
1033   }
1034   GNUNET_free (queue);
1035 }
1036
1037
1038 /**
1039  * Destroys the operation queue if it is empty.  If not empty return GNUNET_NO.
1040  *
1041  * @param queue the queue to destroy if empty
1042  * @return GNUNET_YES if the queue is destroyed.  GNUNET_NO if not (because it
1043  *           is not empty)
1044  */
1045 int
1046 GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
1047 {
1048   if (GNUNET_NO == is_queue_empty (queue))
1049     return GNUNET_NO;
1050   GNUNET_TESTBED_operation_queue_destroy_ (queue);
1051   return GNUNET_YES;
1052 }
1053
1054
1055 /**
1056  * Rechecks if any of the operations in the given operation queue's waiting list
1057  * can be made active
1058  *
1059  * @param opq the operation queue
1060  */
1061 static void
1062 recheck_waiting (struct OperationQueue *opq)
1063 {
1064   struct QueueEntry *entry;
1065   struct QueueEntry *entry2;
1066
1067   entry = opq->wq_head;
1068   while (NULL != entry)
1069   {
1070     entry2 = entry->next;
1071     if (GNUNET_NO == check_readiness (entry->op))
1072       break;
1073     entry = entry2;
1074   }
1075 }
1076
1077
1078 /**
1079  * Function to reset the maximum number of operations in the given queue. If
1080  * max_active is lesser than the number of currently active operations, the
1081  * active operations are not stopped immediately.
1082  *
1083  * @param queue the operation queue which has to be modified
1084  * @param max_active the new maximum number of active operations
1085  */
1086 void
1087 GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
1088                                                   unsigned int max_active)
1089 {
1090   struct QueueEntry *entry;
1091
1092   queue->max_active = max_active;
1093   while ( (queue->active > queue->max_active)
1094           && (NULL != (entry = queue->rq_head)) )
1095     defer (entry->op);
1096   recheck_waiting (queue);
1097 }
1098
1099
1100 /**
1101  * Add an operation to a queue.  An operation can be in multiple queues at
1102  * once. Once the operation is inserted into all the queues
1103  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
1104  * waiting for the operation to become active.
1105  *
1106  * @param queue queue to add the operation to
1107  * @param op operation to add to the queue
1108  * @param nres the number of units of the resources of queue needed by the
1109  *          operation. Should be greater than 0.
1110  */
1111 void
1112 GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
1113                                          struct GNUNET_TESTBED_Operation *op,
1114                                          unsigned int nres)
1115 {
1116   unsigned int qsize;
1117
1118   GNUNET_assert (0 < nres);
1119   qsize = op->nqueues;
1120   GNUNET_array_append (op->queues, op->nqueues, queue);
1121   GNUNET_array_append (op->nres, qsize, nres);
1122   GNUNET_assert (qsize == op->nqueues);
1123 }
1124
1125
1126 /**
1127  * Add an operation to a queue.  An operation can be in multiple queues at
1128  * once. Once the operation is inserted into all the queues
1129  * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
1130  * waiting for the operation to become active. The operation is assumed to take
1131  * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
1132  * requires more than 1
1133  *
1134  * @param queue queue to add the operation to
1135  * @param op operation to add to the queue
1136  */
1137 void
1138 GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
1139                                         struct GNUNET_TESTBED_Operation *op)
1140 {
1141   return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
1142 }
1143
1144
1145 /**
1146  * Marks the given operation as waiting on the queues.  Once all queues permit
1147  * the operation to become active, the operation will be activated.  The actual
1148  * activation will occur in a separate task (thus allowing multiple queue
1149  * insertions to be made without having the first one instantly trigger the
1150  * operation if the first queue has sufficient resources).
1151  *
1152  * @param op the operation to marks as waiting
1153  */
1154 void
1155 GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
1156 {
1157   GNUNET_assert (NULL == op->rq_entry);
1158   change_state (op, OP_STATE_WAITING);
1159   (void) check_readiness (op);
1160 }
1161
1162
1163 /**
1164  * Marks an active operation as inactive - the operation will be kept in a
1165  * ready-to-be-released state and continues to hold resources until another
1166  * operation contents for them.
1167  *
1168  * @param op the operation to be marked as inactive.  The operation start
1169  *          callback should have been called before for this operation to mark
1170  *          it as inactive.
1171  */
1172 void
1173 GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
1174 {
1175   struct OperationQueue **queues;
1176   size_t ms;
1177   unsigned int nqueues;
1178   unsigned int i;
1179
1180   GNUNET_assert (OP_STATE_ACTIVE == op->state);
1181   change_state (op, OP_STATE_INACTIVE);
1182   nqueues = op->nqueues;
1183   ms = sizeof (struct OperationQueue *) * nqueues;
1184   queues = GNUNET_malloc (ms);
1185   /* Cloning is needed as the operation be released by waiting operations and
1186      hence its nqueues memory ptr will be freed */
1187   GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms)));
1188   for (i = 0; i < nqueues; i++)
1189     recheck_waiting (queues[i]);
1190   GNUNET_free (queues);
1191 }
1192
1193
1194 /**
1195  * Marks and inactive operation as active.  This fuction should be called to
1196  * ensure that the oprelease callback will not be called until it is either
1197  * marked as inactive or released.
1198  *
1199  * @param op the operation to be marked as active
1200  */
1201 void
1202 GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
1203 {
1204
1205   GNUNET_assert (OP_STATE_INACTIVE == op->state);
1206   change_state (op, OP_STATE_ACTIVE);
1207 }
1208
1209
1210 /**
1211  * An operation is 'done' (was cancelled or finished); remove
1212  * it from the queues and release associated resources.
1213  *
1214  * @param op operation that finished
1215  */
1216 void
1217 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
1218 {
1219   struct QueueEntry *entry;
1220   struct OperationQueue *opq;
1221   unsigned int i;
1222
1223   if (OP_STATE_INIT == op->state)
1224   {
1225     GNUNET_free (op);
1226     return;
1227   }
1228   if (OP_STATE_READY == op->state)
1229     rq_remove (op);
1230   if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
1231     GNUNET_TESTBED_operation_activate_ (op);
1232   if (OP_STATE_ACTIVE == op->state)
1233     update_tslots (op);
1234   GNUNET_assert (NULL != op->queues);
1235   GNUNET_assert (NULL != op->qentries);
1236   for (i = 0; i < op->nqueues; i++)
1237   {
1238     entry = op->qentries[i];
1239     remove_queue_entry (op, i);
1240     opq = op->queues[i];
1241     switch (op->state)
1242     {
1243     case OP_STATE_INIT:
1244     case OP_STATE_INACTIVE:
1245       GNUNET_assert (0);
1246       break;
1247     case OP_STATE_WAITING:
1248       break;
1249     case OP_STATE_ACTIVE:
1250     case OP_STATE_READY:
1251       GNUNET_assert (0 != opq->active);
1252       GNUNET_assert (opq->active >= entry->nres);
1253       opq->active -= entry->nres;
1254       recheck_waiting (opq);
1255       break;
1256     }
1257     GNUNET_free (entry);
1258   }
1259   GNUNET_free_non_null (op->qentries);
1260   GNUNET_free (op->queues);
1261   GNUNET_free (op->nres);
1262   if (NULL != op->release)
1263     op->release (op->cb_cls);
1264   GNUNET_free (op);
1265 }
1266
1267
1268 /**
1269  * Marks an operation as failed
1270  *
1271  * @param op the operation to be marked as failed
1272  */
1273 void
1274 GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op)
1275 {
1276   op->failed = GNUNET_YES;
1277 }
1278
1279
1280 /* end of testbed_api_operations.c */