+ * Checks if the given operation queue has enough resources to provide for the
+ * operation of the given queue entry. It also checks if any inactive
+ * operations are to be released in order to accommodate the needed resources
+ * and returns them as an array.
+ *
+ * @param opq the operation queue to check for resource accommodation
+ * @param entry the operation queue entry whose operation's resources are to be
+ * accommodated
+ * @param ops_ pointer to return the array of operations which are to be released
+ * in order to accommodate the new operation. Can be NULL
+ * @param n_ops_ the number of operations in ops_
+ * @return GNUNET_YES if the given entry's operation can be accommodated in this
+ * queue. GNUNET_NO if it cannot be accommodated; ops_ and n_ops_ will
+ * be set to NULL and 0 respectively.
+ */
+static int
+decide_capacity (struct OperationQueue *opq,
+ struct QueueEntry *entry,
+ struct GNUNET_TESTBED_Operation ***ops_,
+ unsigned int *n_ops_)
+{
+ struct QueueEntry **evict_entries;
+ struct GNUNET_TESTBED_Operation **ops;
+ struct GNUNET_TESTBED_Operation *op;
+ unsigned int n_ops;
+ unsigned int n_evict_entries;
+ unsigned int need;
+ unsigned int max;
+ int deficit;
+ int rval;
+
+ GNUNET_assert (NULL != (op = entry->op));
+ GNUNET_assert (0 < (need = entry->nres));
+ ops = NULL;
+ n_ops = 0;
+ evict_entries = NULL;
+ n_evict_entries = 0;
+ rval = GNUNET_YES;
+ if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
+ {
+ GNUNET_assert (NULL != opq->fctx);
+ GNUNET_assert (opq->max_active >= opq->overload);
+ max = opq->max_active - opq->overload;
+ }
+ else
+ max = opq->max_active;
+ if (opq->active > max)
+ {
+ rval = GNUNET_NO;
+ goto ret;
+ }
+ if ((opq->active + need) <= max)
+ goto ret;
+ deficit = need - (max - opq->active);
+ for (entry = opq->nq_head;
+ (0 < deficit) && (NULL != entry);
+ entry = entry->next)
+ {
+ GNUNET_array_append (evict_entries, n_evict_entries, entry);
+ deficit -= entry->nres;
+ }
+ if (0 < deficit)
+ {
+ rval = GNUNET_NO;
+ goto ret;
+ }
+ for (n_ops = 0; n_ops < n_evict_entries;)
+ {
+ op = evict_entries[n_ops]->op;
+ GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
+ }
+
+ ret:
+ GNUNET_free_non_null (evict_entries);
+ if (NULL != ops_)
+ *ops_ = ops;
+ else
+ GNUNET_free (ops);
+ if (NULL != n_ops_)
+ *n_ops_ = n_ops;
+ return rval;
+}
+
+
+/**
+ * Merges an array of operations into another, eliminating duplicates. No
+ * ordering is guaranteed.
+ *
+ * @param old the array into which the merging is done.
+ * @param n_old the number of operations in old array
+ * @param new the array from which operations are to be merged
+ * @param n_new the number of operations in new array
+ */
+static void
+merge_ops (struct GNUNET_TESTBED_Operation ***old,
+ unsigned int *n_old,
+ struct GNUNET_TESTBED_Operation **new,
+ unsigned int n_new)
+{
+ struct GNUNET_TESTBED_Operation **cur;
+ unsigned int i;
+ unsigned int j;
+ unsigned int n_cur;
+
+ GNUNET_assert (NULL != old);
+ n_cur = *n_old;
+ cur = *old;
+ for (i = 0; i < n_new; i++)
+ {
+ for (j = 0; j < *n_old; j++)
+ {
+ if (new[i] == cur[j])
+ break;
+ }
+ if (j < *n_old)
+ continue;
+ GNUNET_array_append (cur, n_cur, new[j]);
+ }
+ *old = cur;
+ *n_old = n_cur;
+}
+
+
+
+/**
+ * Checks for the readiness of an operation and schedules a operation start task
+ *
+ * @param op the operation
+ */
+static int
+check_readiness (struct GNUNET_TESTBED_Operation *op)
+{
+ struct GNUNET_TESTBED_Operation **evict_ops;
+ struct GNUNET_TESTBED_Operation **ops;
+ unsigned int n_ops;
+ unsigned int n_evict_ops;
+ unsigned int i;
+
+ GNUNET_assert (NULL == op->rq_entry);
+ GNUNET_assert (OP_STATE_WAITING == op->state);
+ evict_ops = NULL;
+ n_evict_ops = 0;
+ for (i = 0; i < op->nqueues; i++)
+ {
+ ops = NULL;
+ n_ops = 0;
+ if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
+ &ops, &n_ops))
+ {
+ GNUNET_free_non_null (evict_ops);
+ return GNUNET_NO;
+ }
+ if (NULL == ops)
+ continue;
+ merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
+ GNUNET_free (ops);
+ }
+ if (NULL != evict_ops)
+ {
+ for (i = 0; i < n_evict_ops; i++)
+ GNUNET_TESTBED_operation_release_ (evict_ops[i]);
+ GNUNET_free (evict_ops);
+ evict_ops = NULL;
+ /* Evicting the operations should schedule this operation */
+ GNUNET_assert (OP_STATE_READY == op->state);
+ return GNUNET_YES;
+ }
+ for (i = 0; i < op->nqueues; i++)
+ op->queues[i]->active += op->nres[i];
+ change_state (op, OP_STATE_READY);
+ rq_add (op);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Defers a ready to be executed operation back to waiting
+ *
+ * @param op the operation to defer
+ */
+static void
+defer (struct GNUNET_TESTBED_Operation *op)
+{
+ unsigned int i;
+
+ GNUNET_assert (OP_STATE_READY == op->state);
+ rq_remove (op);
+ for (i = 0; i < op->nqueues; i++)
+ {
+ GNUNET_assert (op->queues[i]->active >= op->nres[i]);
+ op->queues[i]->active -= op->nres[i];
+ }
+ change_state (op, OP_STATE_WAITING);
+}
+
+
+/**
+ * Cleanups the array of timeslots of an operation queue. For each time slot in
+ * the array, if it is allocated to an operation, it will be deallocated from
+ * the operation
+ *
+ * @param queue the operation queue
+ */
+static void
+cleanup_tslots (struct OperationQueue *queue)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+ struct GNUNET_TESTBED_Operation *op;
+ unsigned int cnt;
+
+ GNUNET_assert (NULL != fctx);
+ for (cnt = 0; cnt < queue->max_active; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ op = tslot->op;
+ if (NULL == op)
+ continue;
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ }
+ GNUNET_free_non_null (fctx->tslots_freeptr);
+ fctx->tslots_freeptr = NULL;
+ fctx->alloc_head = NULL;
+ fctx->alloc_tail = NULL;
+ fctx->tslots_filled = 0;
+}
+
+
+/**
+ * Cleansup the existing timing slots and sets new timing slots in the given
+ * queue to accommodate given number of max active operations.
+ *
+ * @param queue the queue
+ * @param n the number of maximum active operations. If n is greater than the
+ * maximum limit set while creating the queue, then the minimum of these two
+ * will be selected as n
+ */
+static void
+adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
+{
+ struct FeedbackCtx *fctx = queue->fctx;
+ struct TimeSlot *tslot;
+ unsigned int cnt;
+
+ cleanup_tslots (queue);
+ n = GNUNET_MIN (n ,fctx->max_active_bound);
+ fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
+ fctx->nfailed = 0;
+ for (cnt = 0; cnt < n; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ tslot->queue = queue;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot);
+ }
+ GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n);
+}
+
+
+/**
+ * Adapts parallelism in an adaptive queue by using the statistical data from
+ * the feedback context.
+ *
+ * @param queue the queue
+ */
+static void
+adapt_parallelism (struct OperationQueue *queue)
+{
+ struct GNUNET_TIME_Relative avg;
+ struct FeedbackCtx *fctx;
+ struct TimeSlot *tslot;
+ int sd;
+ unsigned int nvals;
+ unsigned int cnt;
+ unsigned int parallelism;
+
+ avg = GNUNET_TIME_UNIT_ZERO;
+ nvals = 0;
+ fctx = queue->fctx;
+ for (cnt = 0; cnt < queue->max_active; cnt++)
+ {
+ tslot = &fctx->tslots_freeptr[cnt];
+ avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
+ nvals += tslot->nvals;
+ }
+ GNUNET_assert (nvals >= queue->max_active);
+ GNUNET_assert (fctx->nfailed <= nvals);
+ nvals -= fctx->nfailed;
+ if (0 == nvals)
+ {
+ if (1 == queue->max_active)
+ adaptive_queue_set_max_active (queue, 1);
+ else
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
+ return;
+ }
+ avg = GNUNET_TIME_relative_divide (avg, nvals);
+ GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ if (GNUNET_SYSERR ==
+ GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
+ (unsigned int) avg.rel_value_us,
+ &sd))
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
+ return;
+ }
+
+ parallelism = 0;
+ if (-1 == sd)
+ parallelism = queue->max_active + 1;
+ if (sd <= -2)
+ parallelism = queue->max_active * 2;
+ if (1 == sd)
+ parallelism = queue->max_active - 1;
+ if (2 <= sd)
+ parallelism = queue->max_active / 2;
+ parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
+ adaptive_queue_set_max_active (queue, parallelism);
+
+#if 0
+ /* old algorithm */
+ if (sd < 0)
+ sd = 0;
+ GNUNET_assert (0 <= sd);
+ //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ if (0 == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active * 2);
+ return;
+ }
+ if (1 == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active + 1);
+ return;
+ }
+ if (1 == queue->max_active)
+ {
+ adaptive_queue_set_max_active (queue, 1);
+ return;
+ }
+ if (2 == sd)
+ {
+ adaptive_queue_set_max_active (queue, queue->max_active - 1);
+ return;
+ }
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
+#endif
+}
+
+
+/**
+ * update tslots with the operation's completion time. Additionally, if
+ * updating a timeslot makes all timeslots filled in an adaptive operation
+ * queue, call adapt_parallelism() for that queue.
+ *
+ * @param op the operation
+ */
+static void
+update_tslots (struct GNUNET_TESTBED_Operation *op)
+{
+ struct OperationQueue *queue;
+ struct GNUNET_TIME_Relative t;
+ struct TimeSlot *tslot;
+ struct FeedbackCtx *fctx;
+ unsigned int i;
+
+ t = GNUNET_TIME_absolute_get_duration (op->tstart);
+ while (NULL != (tslot = op->tslots_head)) /* update time slots */
+ {
+ queue = tslot->queue;
+ fctx = queue->fctx;
+ GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
+ tslot->op = NULL;
+ GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
+ tslot);
+ if (op->failed)
+ {
+ fctx->nfailed++;
+ for (i = 0; i < op->nqueues; i++)
+ if (queue == op->queues[i])
+ break;
+ GNUNET_assert (i != op->nqueues);
+ op->queues[i]->overload += op->nres[i];
+ }
+ tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
+ if (0 != tslot->nvals++)
+ continue;
+ fctx->tslots_filled++;
+ if (queue->max_active == fctx->tslots_filled)
+ adapt_parallelism (queue);
+ }
+}
+
+
+/**
+ * Create an 'operation' to be performed.
+ *
+ * @param cls closure for the callbacks
+ * @param start function to call to start the operation
+ * @param release function to call to close down the operation
+ * @return handle to the operation
+ */
+struct GNUNET_TESTBED_Operation *
+GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start,
+ OperationRelease release)
+{
+ struct GNUNET_TESTBED_Operation *op;
+
+ op = GNUNET_new (struct GNUNET_TESTBED_Operation);
+ op->start = start;
+ op->state = OP_STATE_INIT;
+ op->release = release;
+ op->cb_cls = cls;
+ return op;
+}
+
+
+/**
+ * Create an operation queue.
+ *
+ * @param type the type of operation queue
+ * @param max_active maximum number of operations in this
+ * queue that can be active in parallel at the same time
+ * @return handle to the queue
+ */
+struct OperationQueue *
+GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
+ unsigned int max_active)
+{
+ struct OperationQueue *queue;
+ struct FeedbackCtx *fctx;
+
+ queue = GNUNET_new (struct OperationQueue);
+ queue->type = type;
+ if (OPERATION_QUEUE_TYPE_FIXED == type)
+ {
+ queue->max_active = max_active;
+ }
+ else
+ {
+ fctx = GNUNET_new (struct FeedbackCtx);
+ fctx->max_active_bound = max_active;
+ fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
+ queue->fctx = fctx;
+ adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
+ }
+ return queue;
+}
+
+
+/**
+ * Cleanup the given operation queue.
+ *
+ * @param queue the operation queue to destroy
+ */
+static void
+queue_destroy (struct OperationQueue *queue)
+{
+ struct FeedbackCtx *fctx;
+
+ if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
+ {
+ cleanup_tslots (queue);
+ fctx = queue->fctx;
+ GNUNET_TESTBED_SD_destroy_ (fctx->sd);
+ GNUNET_free (fctx);
+ }
+ GNUNET_free (queue);
+}
+
+
+/**
+ * Destroys an operation queue. If the queue is still in use by operations it
+ * is marked as expired and its resources are released in the destructor
+ * GNUNET_TESTBED_operations_fini().
+ *
+ * @param queue queue to destroy
+ */
+void
+GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
+{
+ if (GNUNET_YES != is_queue_empty (queue))
+ {
+ GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */
+ queue->expired = 1;
+ GNUNET_array_append (expired_opqs, n_expired_opqs, queue);
+ return;
+ }
+ queue_destroy (queue);
+}
+
+
+/**
+ * Destroys the operation queue if it is empty. If not empty return GNUNET_NO.
+ *
+ * @param queue the queue to destroy if empty
+ * @return GNUNET_YES if the queue is destroyed. GNUNET_NO if not (because it
+ * is not empty)
+ */
+int
+GNUNET_TESTBED_operation_queue_destroy_empty_ (struct OperationQueue *queue)
+{
+ if (GNUNET_NO == is_queue_empty (queue))
+ return GNUNET_NO;
+ GNUNET_TESTBED_operation_queue_destroy_ (queue);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Rechecks if any of the operations in the given operation queue's waiting list
+ * can be made active
+ *
+ * @param opq the operation queue
+ */
+static void
+recheck_waiting (struct OperationQueue *opq)
+{
+ struct QueueEntry *entry;
+ struct QueueEntry *entry2;
+
+ entry = opq->wq_head;
+ while (NULL != entry)
+ {
+ entry2 = entry->next;
+ if (GNUNET_NO == check_readiness (entry->op))
+ break;
+ entry = entry2;
+ }
+}
+
+
+/**
+ * Function to reset the maximum number of operations in the given queue. If
+ * max_active is lesser than the number of currently active operations, the
+ * active operations are not stopped immediately.
+ *
+ * @param queue the operation queue which has to be modified
+ * @param max_active the new maximum number of active operations
+ */
+void
+GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
+ unsigned int max_active)
+{
+ struct QueueEntry *entry;
+
+ queue->max_active = max_active;
+ queue->overload = 0;
+ while ( (queue->active > queue->max_active)
+ && (NULL != (entry = queue->rq_head)) )
+ defer (entry->op);
+ recheck_waiting (queue);
+}
+
+
+/**
+ * Add an operation to a queue. An operation can be in multiple queues at
+ * once. Once the operation is inserted into all the queues
+ * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
+ * waiting for the operation to become active.
+ *
+ * @param queue queue to add the operation to
+ * @param op operation to add to the queue
+ * @param nres the number of units of the resources of queue needed by the
+ * operation. Should be greater than 0.
+ */
+void
+GNUNET_TESTBED_operation_queue_insert2_ (struct OperationQueue *queue,
+ struct GNUNET_TESTBED_Operation *op,
+ unsigned int nres)
+{
+ unsigned int qsize;
+
+ GNUNET_assert (0 < nres);
+ qsize = op->nqueues;
+ GNUNET_array_append (op->queues, op->nqueues, queue);
+ GNUNET_array_append (op->nres, qsize, nres);
+ GNUNET_assert (qsize == op->nqueues);
+}
+
+
+/**
+ * Add an operation to a queue. An operation can be in multiple queues at
+ * once. Once the operation is inserted into all the queues
+ * GNUNET_TESTBED_operation_begin_wait_() has to be called to actually start
+ * waiting for the operation to become active. The operation is assumed to take
+ * 1 queue resource. Use GNUNET_TESTBED_operation_queue_insert2_() if it
+ * requires more than 1
+ *
+ * @param queue queue to add the operation to
+ * @param op operation to add to the queue
+ */
+void
+GNUNET_TESTBED_operation_queue_insert_ (struct OperationQueue *queue,
+ struct GNUNET_TESTBED_Operation *op)
+{
+ return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
+}
+
+
+/**
+ * Marks the given operation as waiting on the queues. Once all queues permit
+ * the operation to become active, the operation will be activated. The actual
+ * activation will occur in a separate task (thus allowing multiple queue
+ * insertions to be made without having the first one instantly trigger the
+ * operation if the first queue has sufficient resources).
+ *
+ * @param op the operation to marks as waiting
+ */
+void
+GNUNET_TESTBED_operation_begin_wait_ (struct GNUNET_TESTBED_Operation *op)
+{
+ GNUNET_assert (NULL == op->rq_entry);
+ change_state (op, OP_STATE_WAITING);
+ (void) check_readiness (op);
+}
+
+
+/**
+ * Marks an active operation as inactive - the operation will be kept in a
+ * ready-to-be-released state and continues to hold resources until another
+ * operation contents for them.
+ *
+ * @param op the operation to be marked as inactive. The operation start
+ * callback should have been called before for this operation to mark
+ * it as inactive.
+ */
+void
+GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op)
+{
+ struct OperationQueue **queues;
+ size_t ms;
+ unsigned int nqueues;
+ unsigned int i;
+
+ GNUNET_assert (OP_STATE_ACTIVE == op->state);
+ change_state (op, OP_STATE_INACTIVE);
+ nqueues = op->nqueues;
+ ms = sizeof (struct OperationQueue *) * nqueues;
+ queues = GNUNET_malloc (ms);
+ /* Cloning is needed as the operation be released by waiting operations and
+ hence its nqueues memory ptr will be freed */
+ GNUNET_memcpy (queues, op->queues, ms);
+ for (i = 0; i < nqueues; i++)
+ recheck_waiting (queues[i]);
+ GNUNET_free (queues);
+}
+
+
+/**
+ * Marks and inactive operation as active. This fuction should be called to
+ * ensure that the oprelease callback will not be called until it is either
+ * marked as inactive or released.
+ *
+ * @param op the operation to be marked as active