X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ftestbed%2Ftestbed_api_operations.c;h=a406c0e0b834b01344cec29ac3034173eb85d9b1;hb=d17a17ea785f91c18b5694eab3372c4e4564d95e;hp=ce715785f867b28c1366efccb097e2a069f32e51;hpb=a8f1dacf3569a6428c7e0cb83e5e8309be1922b1;p=oweals%2Fgnunet.git diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index ce715785f..a406c0e0b 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c @@ -1,21 +1,21 @@ /* This file is part of GNUnet - (C) 2008--2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2008--2013 GNUnet e.V. - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. + GNUnet is free software: you can redistribute it and/or modify it + under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. + Affero General Public License for more details. - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + + SPDX-License-Identifier: AGPL3.0-or-later */ /** @@ -29,6 +29,17 @@ #include "testbed_api_operations.h" #include "testbed_api_sd.h" +/** + * The number of readings containing past operation's timing information that we + * keep track of for adaptive queues + */ +#define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40 + +/** + * The number of parallel opeartions we start with by default for adaptive + * queues + */ +#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4 /** * An entry in the operation queue @@ -115,7 +126,7 @@ struct FeedbackCtx * Head for DLL of time slots which are free to be allocated to operations */ struct TimeSlot *alloc_head; - + /** * Tail for DLL of time slots which are free to be allocated to operations */ @@ -131,7 +142,7 @@ struct FeedbackCtx * Number of time slots filled so far */ unsigned int tslots_filled; - + /** * Bound on the maximum number of operations which can be active */ @@ -141,7 +152,6 @@ struct FeedbackCtx * Number of operations that have failed */ unsigned int nfailed; - }; @@ -216,10 +226,21 @@ struct OperationQueue * Max number of operations which can be active at any time in this queue. * This value can be changed either by calling * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive - * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE + * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE */ unsigned int max_active; + /** + * The number of resources occupied by failed operations in the current shot. + * This is only relavant if the operation queue is of type + * #OPERATION_QUEUE_TYPE_ADAPTIVE + */ + unsigned int overload; + + /** + * Is this queue marked for expiry? + */ + unsigned int expired; }; @@ -267,7 +288,7 @@ struct ReadyQueueEntry * next ptr for DLL */ struct ReadyQueueEntry *next; - + /** * prev ptr for DLL */ @@ -353,23 +374,32 @@ struct GNUNET_TESTBED_Operation * Is this a failed operation? */ int failed; - }; /** * DLL head for the ready queue */ -struct ReadyQueueEntry *rq_head; +static struct ReadyQueueEntry *rq_head; /** * DLL tail for the ready queue */ -struct ReadyQueueEntry *rq_tail; +static struct ReadyQueueEntry *rq_tail; + +/** + * Array of operation queues which are to be destroyed + */ +static struct OperationQueue **expired_opqs; + +/** + * Number of expired operation queues in the above array + */ +static unsigned int n_expired_opqs; /** * The id of the task to process the ready queue */ -GNUNET_SCHEDULER_TaskIdentifier process_rq_task_id; +struct GNUNET_SCHEDULER_Task *process_rq_task_id; /** @@ -407,7 +437,7 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) { struct OperationQueue *opq; struct QueueEntry *entry; - + opq = op->queues[index]; entry = op->qentries[index]; switch (op->state) @@ -415,15 +445,19 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index) case OP_STATE_INIT: GNUNET_assert (0); break; + case OP_STATE_WAITING: GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry); break; + case OP_STATE_READY: GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry); break; + case OP_STATE_ACTIVE: GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry); break; + case OP_STATE_INACTIVE: GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry); break; @@ -445,7 +479,7 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) struct OperationQueue *opq; unsigned int cnt; unsigned int s; - + GNUNET_assert (OP_STATE_INIT != state); GNUNET_assert (NULL != op->queues); GNUNET_assert (NULL != op->nres); @@ -455,11 +489,11 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) { if (OP_STATE_INIT == op->state) { - entry = GNUNET_malloc (sizeof (struct QueueEntry)); + entry = GNUNET_new (struct QueueEntry); entry->op = op; entry->nres = op->nres[cnt]; s = cnt; - GNUNET_array_append (op->qentries, s, entry); + GNUNET_array_append (op->qentries, s, entry); } else { @@ -472,15 +506,19 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) case OP_STATE_INIT: GNUNET_assert (0); break; + case OP_STATE_WAITING: GNUNET_CONTAINER_DLL_insert_tail (opq->wq_head, opq->wq_tail, entry); break; + case OP_STATE_READY: GNUNET_CONTAINER_DLL_insert_tail (opq->rq_head, opq->rq_tail, entry); break; + case OP_STATE_ACTIVE: GNUNET_CONTAINER_DLL_insert_tail (opq->aq_head, opq->aq_tail, entry); break; + case OP_STATE_INACTIVE: GNUNET_CONTAINER_DLL_insert_tail (opq->nq_head, opq->nq_tail, entry); break; @@ -498,15 +536,15 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state) */ static void rq_remove (struct GNUNET_TESTBED_Operation *op) -{ +{ GNUNET_assert (NULL != op->rq_entry); GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry); GNUNET_free (op->rq_entry); op->rq_entry = NULL; - if ( (NULL == rq_head) && (GNUNET_SCHEDULER_NO_TASK != process_rq_task_id) ) + if ((NULL == rq_head) && (NULL != process_rq_task_id)) { GNUNET_SCHEDULER_cancel (process_rq_task_id); - process_rq_task_id = GNUNET_SCHEDULER_NO_TASK; + process_rq_task_id = NULL; } } @@ -518,16 +556,15 @@ rq_remove (struct GNUNET_TESTBED_Operation *op) * the ready queue. * * @param cls NULL - * @param tc scheduler task context. Not used. */ static void -process_rq_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +process_rq_task (void *cls) { struct GNUNET_TESTBED_Operation *op; struct OperationQueue *queue; unsigned int cnt; - process_rq_task_id = GNUNET_SCHEDULER_NO_TASK; + process_rq_task_id = NULL; GNUNET_assert (NULL != rq_head); GNUNET_assert (NULL != (op = rq_head->op)); rq_remove (op); @@ -557,11 +594,11 @@ rq_add (struct GNUNET_TESTBED_Operation *op) struct ReadyQueueEntry *rq_entry; GNUNET_assert (NULL == op->rq_entry); - rq_entry = GNUNET_malloc (sizeof (struct ReadyQueueEntry)); + rq_entry = GNUNET_new (struct ReadyQueueEntry); rq_entry->op = op; GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry); op->rq_entry = rq_entry; - if (GNUNET_SCHEDULER_NO_TASK == process_rq_task_id) + if (NULL == process_rq_task_id) process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL); } @@ -576,10 +613,10 @@ rq_add (struct GNUNET_TESTBED_Operation *op) static int is_queue_empty (struct OperationQueue *opq) { - if ( (NULL != opq->wq_head) - || (NULL != opq->rq_head) - || (NULL != opq->aq_head) - || (NULL != opq->nq_head) ) + if ((NULL != opq->wq_head) + || (NULL != opq->rq_head) + || (NULL != opq->aq_head) + || (NULL != opq->nq_head)) return GNUNET_NO; return GNUNET_YES; } @@ -613,6 +650,7 @@ decide_capacity (struct OperationQueue *opq, unsigned int n_ops; unsigned int n_evict_entries; unsigned int need; + unsigned int max; int deficit; int rval; @@ -623,14 +661,22 @@ decide_capacity (struct OperationQueue *opq, evict_entries = NULL; n_evict_entries = 0; rval = GNUNET_YES; - if (opq->active > opq->max_active) + if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) + { + GNUNET_assert (NULL != opq->fctx); + GNUNET_assert (opq->max_active >= opq->overload); + max = opq->max_active - opq->overload; + } + else + max = opq->max_active; + if (opq->active > max) { rval = GNUNET_NO; goto ret; } - if ((opq->active + need) <= opq->max_active) + if ((opq->active + need) <= max) goto ret; - deficit = need - (opq->max_active - opq->active); + deficit = need - (max - opq->active); for (entry = opq->nq_head; (0 < deficit) && (NULL != entry); entry = entry->next) @@ -646,11 +692,11 @@ decide_capacity (struct OperationQueue *opq, for (n_ops = 0; n_ops < n_evict_entries;) { op = evict_entries[n_ops]->op; - GNUNET_array_append (ops, n_ops, op); /* increments n-ops */ + GNUNET_array_append (ops, n_ops, op); /* increments n-ops */ } - ret: - GNUNET_free_non_null (evict_entries); +ret: + GNUNET_free_non_null (evict_entries); if (NULL != ops_) *ops_ = ops; else @@ -680,12 +726,12 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old, unsigned int i; unsigned int j; unsigned int n_cur; - + GNUNET_assert (NULL != old); n_cur = *n_old; cur = *old; for (i = 0; i < n_new; i++) - { + { for (j = 0; j < *n_old; j++) { if (new[i] == cur[j]) @@ -698,7 +744,6 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old, *old = cur; *n_old = n_cur; } - /** @@ -732,12 +777,12 @@ check_readiness (struct GNUNET_TESTBED_Operation *op) if (NULL == ops) continue; merge_ops (&evict_ops, &n_evict_ops, ops, n_ops); - GNUNET_free (ops); + GNUNET_free (ops); } if (NULL != evict_ops) { for (i = 0; i < n_evict_ops; i++) - GNUNET_TESTBED_operation_release_ (evict_ops[i]); + GNUNET_TESTBED_operation_release_ (evict_ops[i]); GNUNET_free (evict_ops); evict_ops = NULL; /* Evicting the operations should schedule this operation */ @@ -767,7 +812,7 @@ defer (struct GNUNET_TESTBED_Operation *op) for (i = 0; i < op->nqueues; i++) { GNUNET_assert (op->queues[i]->active >= op->nres[i]); - op->queues[i]->active -= op->nres[i]; + op->queues[i]->active -= op->nres[i]; } change_state (op, OP_STATE_WAITING); } @@ -809,7 +854,7 @@ cleanup_tslots (struct OperationQueue *queue) * Cleansup the existing timing slots and sets new timing slots in the given * queue to accommodate given number of max active operations. * - * @param queue the queue + * @param queue the queue * @param n the number of maximum active operations. If n is greater than the * maximum limit set while creating the queue, then the minimum of these two * will be selected as n @@ -820,16 +865,17 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n) struct FeedbackCtx *fctx = queue->fctx; struct TimeSlot *tslot; unsigned int cnt; - + cleanup_tslots (queue); - n = GNUNET_MIN (n ,fctx->max_active_bound); - fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot)); + n = GNUNET_MIN (n, fctx->max_active_bound); + fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot)); fctx->nfailed = 0; for (cnt = 0; cnt < n; cnt++) { tslot = &fctx->tslots_freeptr[cnt]; tslot->queue = queue; - GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot); + GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, + tslot); } GNUNET_TESTBED_operation_queue_reset_max_active_ (queue, n); } @@ -850,6 +896,7 @@ adapt_parallelism (struct OperationQueue *queue) int sd; unsigned int nvals; unsigned int cnt; + unsigned int parallelism; avg = GNUNET_TIME_UNIT_ZERO; nvals = 0; @@ -872,19 +919,34 @@ adapt_parallelism (struct OperationQueue *queue) return; } avg = GNUNET_TIME_relative_divide (avg, nvals); - if (GNUNET_SYSERR == + GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); + if (GNUNET_SYSERR == GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd, (unsigned int) avg.rel_value_us, &sd)) { - GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); - adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ + adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ return; } + + parallelism = 0; + if (-1 == sd) + parallelism = queue->max_active + 1; + if (sd <= -2) + parallelism = queue->max_active * 2; + if (1 == sd) + parallelism = queue->max_active - 1; + if (2 <= sd) + parallelism = queue->max_active / 2; + parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); + adaptive_queue_set_max_active (queue, parallelism); + +#if 0 + /* old algorithm */ if (sd < 0) sd = 0; GNUNET_assert (0 <= sd); - GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); + // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us); if (0 == sd) { adaptive_queue_set_max_active (queue, queue->max_active * 2); @@ -906,6 +968,7 @@ adapt_parallelism (struct OperationQueue *queue) return; } adaptive_queue_set_max_active (queue, queue->max_active / 2); +#endif } @@ -923,18 +986,26 @@ update_tslots (struct GNUNET_TESTBED_Operation *op) struct GNUNET_TIME_Relative t; struct TimeSlot *tslot; struct FeedbackCtx *fctx; - + unsigned int i; + t = GNUNET_TIME_absolute_get_duration (op->tstart); while (NULL != (tslot = op->tslots_head)) /* update time slots */ { queue = tslot->queue; fctx = queue->fctx; GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot); - tslot->op = NULL; + tslot->op = NULL; GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot); if (op->failed) + { fctx->nfailed++; + for (i = 0; i < op->nqueues; i++) + if (queue == op->queues[i]) + break; + GNUNET_assert (i != op->nqueues); + op->queues[i]->overload += op->nres[i]; + } tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t); if (0 != tslot->nvals++) continue; @@ -959,7 +1030,7 @@ GNUNET_TESTBED_operation_create_ (void *cls, OperationStart start, { struct GNUNET_TESTBED_Operation *op; - op = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_Operation)); + op = GNUNET_new (struct GNUNET_TESTBED_Operation); op->start = start; op->state = OP_STATE_INIT; op->release = release; @@ -983,7 +1054,7 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type, struct OperationQueue *queue; struct FeedbackCtx *fctx; - queue = GNUNET_malloc (sizeof (struct OperationQueue)); + queue = GNUNET_new (struct OperationQueue); queue->type = type; if (OPERATION_QUEUE_TYPE_FIXED == type) { @@ -991,28 +1062,26 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type, } else { - fctx = GNUNET_malloc (sizeof (struct FeedbackCtx)); + fctx = GNUNET_new (struct FeedbackCtx); fctx->max_active_bound = max_active; - fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */ + fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY); queue->fctx = fctx; - adaptive_queue_set_max_active (queue, 4); /* start with 4 */ + adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); } return queue; } /** - * Destroy an operation queue. The queue MUST be empty - * at this time. + * Cleanup the given operation queue. * - * @param queue queue to destroy + * @param queue the operation queue to destroy */ -void -GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) +static void +queue_destroy (struct OperationQueue *queue) { struct FeedbackCtx *fctx; - - GNUNET_break (GNUNET_YES == is_queue_empty (queue)); + if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type) { cleanup_tslots (queue); @@ -1024,6 +1093,27 @@ GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) } +/** + * Destroys an operation queue. If the queue is still in use by operations it + * is marked as expired and its resources are released in the destructor + * GNUNET_TESTBED_operations_fini(). + * + * @param queue queue to destroy + */ +void +GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue) +{ + if (GNUNET_YES != is_queue_empty (queue)) + { + GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */ + queue->expired = 1; + GNUNET_array_append (expired_opqs, n_expired_opqs, queue); + return; + } + queue_destroy (queue); +} + + /** * Destroys the operation queue if it is empty. If not empty return GNUNET_NO. * @@ -1079,8 +1169,9 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, struct QueueEntry *entry; queue->max_active = max_active; - while ( (queue->active > queue->max_active) - && (NULL != (entry = queue->rq_head)) ) + queue->overload = 0; + while ((queue->active > queue->max_active) + && (NULL != (entry = queue->rq_head))) defer (entry->op); recheck_waiting (queue); } @@ -1169,11 +1260,11 @@ GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) GNUNET_assert (OP_STATE_ACTIVE == op->state); change_state (op, OP_STATE_INACTIVE); nqueues = op->nqueues; - ms = sizeof (struct OperationQueue *) * nqueues; + ms = sizeof(struct OperationQueue *) * nqueues; queues = GNUNET_malloc (ms); /* Cloning is needed as the operation be released by waiting operations and hence its nqueues memory ptr will be freed */ - GNUNET_assert (NULL != (queues = memcpy (queues, op->queues, ms))); + GNUNET_memcpy (queues, op->queues, ms); for (i = 0; i < nqueues; i++) recheck_waiting (queues[i]); GNUNET_free (queues); @@ -1190,7 +1281,6 @@ GNUNET_TESTBED_operation_inactivate_ (struct GNUNET_TESTBED_Operation *op) void GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) { - GNUNET_assert (OP_STATE_INACTIVE == op->state); change_state (op, OP_STATE_ACTIVE); } @@ -1205,7 +1295,7 @@ GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op) void GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) { - struct QueueEntry *entry; + struct QueueEntry *entry; struct OperationQueue *opq; unsigned int i; @@ -1221,20 +1311,22 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) if (OP_STATE_ACTIVE == op->state) update_tslots (op); GNUNET_assert (NULL != op->queues); - GNUNET_assert (NULL != op->qentries); + GNUNET_assert (NULL != op->qentries); for (i = 0; i < op->nqueues; i++) { entry = op->qentries[i]; remove_queue_entry (op, i); opq = op->queues[i]; switch (op->state) - { + { case OP_STATE_INIT: case OP_STATE_INACTIVE: GNUNET_assert (0); break; - case OP_STATE_WAITING: + + case OP_STATE_WAITING: break; + case OP_STATE_ACTIVE: case OP_STATE_READY: GNUNET_assert (0 != opq->active); @@ -1242,7 +1334,7 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op) opq->active -= entry->nres; recheck_waiting (opq); break; - } + } GNUNET_free (entry); } GNUNET_free_non_null (op->qentries); @@ -1266,4 +1358,30 @@ GNUNET_TESTBED_operation_mark_failed (struct GNUNET_TESTBED_Operation *op) } +/** + * Cleanup expired operation queues. While doing so, also check for any + * operations which are not completed and warn about them. + */ +void __attribute__ ((destructor)) +GNUNET_TESTBED_operations_fini () +{ + struct OperationQueue *queue; + unsigned int i; + int warn = 0; + + for (i = 0; i < n_expired_opqs; i++) + { + queue = expired_opqs[i]; + if (GNUNET_NO == is_queue_empty (queue)) + warn = 1; + queue_destroy (queue); + } + GNUNET_free_non_null (expired_opqs); + n_expired_opqs = 0; + if (warn) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Be disciplined. Some operations were not marked as done.\n"); +} + + /* end of testbed_api_operations.c */