/*
This file is part of GNUnet
- (C) 2008--2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2008--2013 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
#include "testbed_api_operations.h"
#include "testbed_api_sd.h"
+/**
+ * The number of readings containing past operation's timing information that we
+ * keep track of for adaptive queues
+ */
+#define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40
+
+/**
+ * The number of parallel opeartions we start with by default for adaptive
+ * queues
+ */
+#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
/**
* An entry in the operation queue
* Head for DLL of time slots which are free to be allocated to operations
*/
struct TimeSlot *alloc_head;
-
+
/**
* Tail for DLL of time slots which are free to be allocated to operations
*/
* Number of time slots filled so far
*/
unsigned int tslots_filled;
-
+
/**
* Bound on the maximum number of operations which can be active
*/
* Number of operations that have failed
*/
unsigned int nfailed;
-
};
* Max number of operations which can be active at any time in this queue.
* This value can be changed either by calling
* GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
- * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
+ * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE
*/
unsigned int max_active;
+ /**
+ * The number of resources occupied by failed operations in the current shot.
+ * This is only relavant if the operation queue is of type
+ * #OPERATION_QUEUE_TYPE_ADAPTIVE
+ */
+ unsigned int overload;
+
+ /**
+ * Is this queue marked for expiry?
+ */
+ unsigned int expired;
};
* next ptr for DLL
*/
struct ReadyQueueEntry *next;
-
+
/**
* prev ptr for DLL
*/
/**
* DLL head for the ready queue
*/
-struct ReadyQueueEntry *rq_head;
+static struct ReadyQueueEntry *rq_head;
/**
* DLL tail for the ready queue
*/
-struct ReadyQueueEntry *rq_tail;
+static struct ReadyQueueEntry *rq_tail;
+
+/**
+ * Array of operation queues which are to be destroyed
+ */
+static struct OperationQueue **expired_opqs;
+
+/**
+ * Number of expired operation queues in the above array
+ */
+static unsigned int n_expired_opqs;
/**
* The id of the task to process the ready queue
*/
-GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id;
+struct GNUNET_SCHEDULER_Task *process_rq_task_id;
/**
{
struct OperationQueue *opq;
struct QueueEntry *entry;
-
+
opq = op->queues[index];
entry = op->qentries[index];
switch (op->state)
struct OperationQueue *opq;
unsigned int cnt;
unsigned int s;
-
+
GNUNET_assert (OP_STATE_INIT != state);
GNUNET_assert (NULL != op->queues);
GNUNET_assert (NULL != op->nres);
{
if (OP_STATE_INIT == op->state)
{
- entry = GNUNET_malloc (sizeof (struct QueueEntry));
+ entry = GNUNET_new (struct QueueEntry);
entry->op = op;
entry->nres = op->nres[cnt];
s = cnt;
- GNUNET_array_append (op->qentries, s, entry);
+ GNUNET_array_append (op->qentries, s, entry);
}
else
{
*/
static void
rq_remove (struct GNUNET_TESTBED_Operation *op)
-{
+{
GNUNET_assert (NULL != op->rq_entry);
GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
GNUNET_free (op->rq_entry);
op->rq_entry = NULL;
- if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) )
+ if ( (NULL == rq_head) && (NULL != process_rq_task_id) )
{
GNUNET_SCHEDULER_cancel (process_rq_task_id);
- process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
+ process_rq_task_id = NULL;
}
}
* the ready queue.
*
* @param cls NULL
- * @param tc scheduler task context. Not used.
*/
static void
-process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+process_rq_task (void *cls)
{
struct GNUNET_TESTBED_Operation *op;
struct OperationQueue *queue;
unsigned int cnt;
- process_rq_task_id = GNUNET_SCHEDULER_NO_TASK;
+ process_rq_task_id = NULL;
GNUNET_assert (NULL != rq_head);
GNUNET_assert (NULL != (op = rq_head->op));
rq_remove (op);
struct ReadyQueueEntry *rq_entry;
GNUNET_assert (NULL == op->rq_entry);
- rq_entry = GNUNET_malloc (sizeof (struct ReadyQueueEntry));
+ rq_entry = GNUNET_new (struct ReadyQueueEntry);
rq_entry->op = op;
GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
op->rq_entry = rq_entry;
- if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id)
+ if (NULL == process_rq_task_id)
process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
}
unsigned int n_ops;
unsigned int n_evict_entries;
unsigned int need;
+ unsigned int max;
int deficit;
int rval;
evict_entries = NULL;
n_evict_entries = 0;
rval = GNUNET_YES;
- if (opq->active > opq->max_active)
+ if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
+ {
+ GNUNET_assert (NULL != opq->fctx);
+ GNUNET_assert (opq->max_active >= opq->overload);
+ max = opq->max_active - opq->overload;
+ }
+ else
+ max = opq->max_active;
+ if (opq->active > max)
{
rval = GNUNET_NO;
goto ret;
}
- if ((opq->active + need) <= opq->max_active)
+ if ((opq->active + need) <= max)
goto ret;
- deficit = need - (opq->max_active - opq->active);
+ deficit = need - (max - opq->active);
for (entry = opq->nq_head;
(0 < deficit) && (NULL != entry);
entry = entry->next)
}
ret:
- GNUNET_free_non_null (evict_entries);
+ GNUNET_free_non_null (evict_entries);
if (NULL != ops_)
*ops_ = ops;
else
unsigned int i;
unsigned int j;
unsigned int n_cur;
-
+
GNUNET_assert (NULL != old);
n_cur = *n_old;
cur = *old;
for (i = 0; i < n_new; i++)
- {
+ {
for (j = 0; j < *n_old; j++)
{
if (new[i] == cur[j])
*old = cur;
*n_old = n_cur;
}
-
+
/**
if (NULL == ops)
continue;
merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
- GNUNET_free (ops);
+ GNUNET_free (ops);
}
if (NULL != evict_ops)
{
for (i = 0; i < n_evict_ops; i++)
- GNUNET_TESTBED_operation_release_ (evict_ops[i]);
+ GNUNET_TESTBED_operation_release_ (evict_ops[i]);
GNUNET_free (evict_ops);
evict_ops = NULL;
/* Evicting the operations should schedule this operation */
for (i = 0; i < op->nqueues; i++)
{
GNUNET_assert (op->queues[i]->active >= op->nres[i]);
- op->queues[i]->active -= op->nres[i];
+ op->queues[i]->active -= op->nres[i];
}
change_state (op, OP_STATE_WAITING);
}
/**
- * Initializes the operation queue for parallel overlay connects
+ * Cleansup the existing timing slots and sets new timing slots in the given
+ * queue to accommodate given number of max active operations.
*
- * @param h the host handle
- * @param npoc the number of parallel overlay connects - the queue size
+ * @param queue the queue
+ * @param n the number of maximum active operations. If n is greater than the
+ * maximum limit set while creating the queue, then the minimum of these two
+ * will be selected as n
*/
static void
adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
struct FeedbackCtx *fctx = queue->fctx;
struct TimeSlot *tslot;
unsigned int cnt;
-
+
cleanup_tslots (queue);
n = GNUNET_MIN (n ,fctx->max_active_bound);
fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
int sd;
unsigned int nvals;
unsigned int cnt;
+ unsigned int parallelism;
avg = GNUNET_TIME_UNIT_ZERO;
nvals = 0;
return;
}
avg = GNUNET_TIME_relative_divide (avg, nvals);
- if (GNUNET_SYSERR ==
+ GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ if (GNUNET_SYSERR ==
GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
(unsigned int) avg.rel_value_us,
&sd))
{
- GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
return;
}
+
+ parallelism = 0;
+ if (-1 == sd)
+ parallelism = queue->max_active + 1;
+ if (sd <= -2)
+ parallelism = queue->max_active * 2;
+ if (1 == sd)
+ parallelism = queue->max_active - 1;
+ if (2 <= sd)
+ parallelism = queue->max_active / 2;
+ parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
+ adaptive_queue_set_max_active (queue, parallelism);
+
+#if 0
+ /* old algorithm */
if (sd < 0)
sd = 0;
GNUNET_assert (0 <= sd);
- GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+ //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
if (0 == sd)
{
adaptive_queue_set_max_active (queue, queue->max_active * 2);
return;
}
adaptive_queue_set_max_active (queue, queue->max_active / 2);
+#endif
}
struct GNUNET_TIME_Relative t;
struct TimeSlot *tslot;
struct FeedbackCtx *fctx;
-
+ unsigned int i;
+
t = GNUNET_TIME_absolute_get_duration (op->tstart);
while (NULL != (tslot = op->tslots_head)) /* update time slots */
{
queue = tslot->queue;
fctx = queue->fctx;
GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
- tslot->op = NULL;
+ tslot->op = NULL;
GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
tslot);
if (op->failed)
+ {
fctx->nfailed++;
+ for (i = 0; i < op->nqueues; i++)
+ if (queue == op->queues[i])
+ break;
+ GNUNET_assert (i != op->nqueues);
+ op->queues[i]->overload += op->nres[i];
+ }
tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
if (0 != tslot->nvals++)
continue;
{
struct GNUNET_TESTBED_Operation *op;
- op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation));
+ op = GNUNET_new (struct GNUNET_TESTBED_Operation);
op->start = start;
op->state = OP_STATE_INIT;
op->release = release;
struct OperationQueue *queue;
struct FeedbackCtx *fctx;
- queue = GNUNET_malloc (sizeof (struct OperationQueue));
+ queue = GNUNET_new (struct OperationQueue);
queue->type = type;
if (OPERATION_QUEUE_TYPE_FIXED == type)
{
}
else
{
- fctx = GNUNET_malloc (sizeof (struct FeedbackCtx));
+ fctx = GNUNET_new (struct FeedbackCtx);
fctx->max_active_bound = max_active;
- fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
+ fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
queue->fctx = fctx;
- adaptive_queue_set_max_active (queue, 1); /* start with 1 */
+ adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
}
return queue;
}
/**
- * Destroy an operation queue. The queue MUST be empty
- * at this time.
+ * Cleanup the given operation queue.
*
- * @param queue queue to destroy
+ * @param queue the operation queue to destroy
*/
-void
-GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
+static void
+queue_destroy (struct OperationQueue *queue)
{
struct FeedbackCtx *fctx;
-
- GNUNET_break (GNUNET_YES == is_queue_empty (queue));
+
if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
{
cleanup_tslots (queue);
}
+/**
+ * Destroys an operation queue. If the queue is still in use by operations it
+ * is marked as expired and its resources are released in the destructor
+ * GNUNET_TESTBED_operations_fini().
+ *
+ * @param queue queue to destroy
+ */
+void
+GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
+{
+ if (GNUNET_YES != is_queue_empty (queue))
+ {
+ GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */
+ queue->expired = 1;
+ GNUNET_array_append (expired_opqs, n_expired_opqs, queue);
+ return;
+ }
+ queue_destroy (queue);
+}
+
+
/**
* Destroys the operation queue if it is empty. If not empty return GNUNET_NO.
*
struct QueueEntry *entry;
queue->max_active = max_active;
+ queue->overload = 0;
while ( (queue->active > queue->max_active)
&& (NULL != (entry = queue->rq_head)) )
defer (entry->op);
void
GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
{
- struct QueueEntry *entry;
+ struct QueueEntry *entry;
struct OperationQueue *opq;
unsigned int i;
if (OP_STATE_ACTIVE == op->state)
update_tslots (op);
GNUNET_assert (NULL != op->queues);
- GNUNET_assert (NULL != op->qentries);
+ GNUNET_assert (NULL != op->qentries);
for (i = 0; i < op->nqueues; i++)
{
entry = op->qentries[i];
remove_queue_entry (op, i);
opq = op->queues[i];
switch (op->state)
- {
+ {
case OP_STATE_INIT:
case OP_STATE_INACTIVE:
GNUNET_assert (0);
break;
- case OP_STATE_WAITING:
+ case OP_STATE_WAITING:
break;
case OP_STATE_ACTIVE:
case OP_STATE_READY:
opq->active -= entry->nres;
recheck_waiting (opq);
break;
- }
+ }
GNUNET_free (entry);
}
GNUNET_free_non_null (op->qentries);
}
+/**
+ * Cleanup expired operation queues. While doing so, also check for any
+ * operations which are not completed and warn about them.
+ */
+void __attribute__ ((destructor))
+GNUNET_TESTBED_operations_fini ()
+{
+ struct OperationQueue *queue;
+ unsigned int i;
+ int warn = 0;
+
+ for (i=0; i < n_expired_opqs; i++)
+ {
+ queue = expired_opqs[i];
+ if (GNUNET_NO == is_queue_empty (queue))
+ warn = 1;
+ queue_destroy (queue);
+ }
+ GNUNET_free_non_null (expired_opqs);
+ n_expired_opqs = 0;
+ if (warn)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Be disciplined. Some operations were not marked as done.\n");
+
+}
/* end of testbed_api_operations.c */