+/**
+ * Cleanups the array of timeslots of an operation queue. For each time slot in
+ * the array, if it is allocated to an operation, it will be deallocated from
+ * the operation
+ *
+ * @param queue the operation queue
+ */
+static void
+cleanup_tslots (struct OperationQueue *queue)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+ struct GNUNET_TESTBED_Operation *op;
+ unsigned int cnt;
+
+ GNUNET_assert (NULL != fctx);
+ for (cnt = 0; cnt < queue->max_active; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ op = tslot->op;
+ if (NULL == op)
+ continue;
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ }
+ GNUNET_free_non_null (fctx->tslots_freeptr);
+ fctx->tslots_freeptr = NULL;
+ fctx->alloc_head = NULL;
+ fctx->alloc_tail = NULL;
+ fctx->tslots_filled = 0;
+}
+
+
+/**
+ * Cleansup the existing timing slots and sets new timing slots in the given
+ * queue to accommodate given number of max active operations.
+ *
+ * @param queue the queue
+ * @param n the number of maximum active operations. If n is greater than the
+ * maximum limit set while creating the queue, then the minimum of these two
+ * will be selected as n
+ */
+static void
+adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+ unsigned int cnt;
+
+ cleanup_tslots (queue);
+ n = GNUNET_MIN (n ,fctx->max_active_bound);
+ fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
+ fctx->nfailed = 0;
+ for (cnt = 0; cnt < n; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ tslot->queue = queue;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot);
+ }
+ GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
+}
+
+
+/**
+ * Adapts parallelism in an adaptive queue by using the statistical data from
+ * the feedback context.
+ *
+ * @param queue the queue
+ */
+static void
+adapt_parallelism (struct OperationQueue *queue)
+{
+ struct GNUNET_TIME_Relative avg;
+ struct FeedbackCtx *fctx;
+ struct TimeSlot *tslot;
+ int sd;
+ unsigned int nvals;
+ unsigned int cnt;
+ unsigned int parallelism;
+
+ avg = GNUNET_TIME_UNIT_ZERO;
+ nvals = 0;
+ fctx = queue->fctx;
+ for (cnt = 0; cnt < queue->max_active; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
+ nvals += tslot->nvals;
+ }
+ GNUNET_assert (nvals >= queue->max_active);
+ GNUNET_assert (fctx->nfailed <= nvals);
+ nvals -= fctx->nfailed;
+ if (0 == nvals)
+ {
+ if (1 == queue->max_active)
+ adaptive_queue_set_max_active (queue, 1);
+ else
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
+ return;
+ }
+ avg = GNUNET_TIME_relative_divide (avg, nvals);
+ GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ if (GNUNET_SYSERR ==
+ GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
+ (unsigned int) avg.rel_value_us,
+ &sd))
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
+ return;
+ }
+
+ parallelism = 0;
+ if (-1 == sd)
+ parallelism = queue->max_active + 1;
+ if (sd <= -2)
+ parallelism = queue->max_active * 2;
+ if (1 == sd)
+ parallelism = queue->max_active - 1;
+ if (2 <= sd)
+ parallelism = queue->max_active / 2;
+ parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
+ adaptive_queue_set_max_active (queue, parallelism);
+
+#if 0
+ /* old algorithm */
+ if (sd < 0)
+ sd = 0;
+ GNUNET_assert (0 <= sd);
+ //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ if (0 == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active * 2);
+ return;
+ }
+ if (1 == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active + 1);
+ return;
+ }
+ if (1 == queue->max_active)
+ {
+ adaptive_queue_set_max_active (queue, 1);
+ return;
+ }
+ if (2 == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active - 1);
+ return;
+ }
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
+#endif
+}
+
+
+/**
+ * update tslots with the operation's completion time. Additionally, if
+ * updating a timeslot makes all timeslots filled in an adaptive operation
+ * queue, call adapt_parallelism() for that queue.
+ *
+ * @param op the operation
+ */
+static void
+update_tslots (struct GNUNET_TESTBED_Operation *op)
+{
+ struct OperationQueue *queue;
+ struct GNUNET_TIME_Relative t;
+ struct TimeSlot *tslot;
+ struct FeedbackCtx *fctx;
+ unsigned int i;
+
+ t = GNUNET_TIME_absolute_get_duration (op->tstart);
+ while (NULL != (tslot = op->tslots_head)) /* update time slots */
+ {
+ queue = tslot->queue;
+ fctx = queue->fctx;
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ tslot->op = NULL;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
+ tslot);
+ if (op->failed)
+ {
+ fctx->nfailed++;
+ for (i = 0; i < op->nqueues; i++)
+ if (queue == op->queues[i])
+ break;
+ GNUNET_assert (i != op->nqueues);
+ op->queues[i]->overload += op->nres[i];
+ }
+ tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
+ if (0 != tslot->nvals++)
+ continue;
+ fctx->tslots_filled++;
+ if (queue->max_active == fctx->tslots_filled)
+ adapt_parallelism (queue);
+ }
+}
+
+