Use more-or-equal (some machines are fast enough)
[oweals/gnunet.git] / src / testbed / testbed_api_operations.c
index c09ec366ac388a9ee43b4500ed94a8c6022663d5..ce2ef5715f4624cde8e9f5cb1467342b0e0d2960 100644 (file)
@@ -115,7 +115,7 @@ struct FeedbackCtx
    * Head for DLL of time slots which are free to be allocated to operations
    */
   struct TimeSlot *alloc_head;
-    
+
   /**
    * Tail for DLL of time slots which are free to be allocated to operations
    */
@@ -131,12 +131,17 @@ struct FeedbackCtx
    * Number of time slots filled so far
    */
   unsigned int tslots_filled;
-  
+
   /**
    * Bound on the maximum number of operations which can be active
    */
   unsigned int max_active_bound;
 
+  /**
+   * Number of operations that have failed
+   */
+  unsigned int nfailed;
+
 };
 
 
@@ -262,7 +267,7 @@ struct ReadyQueueEntry
    * next ptr for DLL
    */
   struct ReadyQueueEntry *next;
-  
+
   /**
    * prev ptr for DLL
    */
@@ -402,7 +407,7 @@ remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
 {
   struct OperationQueue *opq;
   struct QueueEntry *entry;
-  
+
   opq = op->queues[index];
   entry = op->qentries[index];
   switch (op->state)
@@ -440,7 +445,7 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
   struct OperationQueue *opq;
   unsigned int cnt;
   unsigned int s;
-  
+
   GNUNET_assert (OP_STATE_INIT != state);
   GNUNET_assert (NULL != op->queues);
   GNUNET_assert (NULL != op->nres);
@@ -454,7 +459,7 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
       entry->op = op;
       entry->nres = op->nres[cnt];
       s = cnt;
-      GNUNET_array_append (op->qentries, s, entry);      
+      GNUNET_array_append (op->qentries, s, entry);
     }
     else
     {
@@ -493,7 +498,7 @@ change_state (struct GNUNET_TESTBED_Operation *op, enum OperationState state)
  */
 static void
 rq_remove (struct GNUNET_TESTBED_Operation *op)
-{  
+{
   GNUNET_assert (NULL != op->rq_entry);
   GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
   GNUNET_free (op->rq_entry);
@@ -645,7 +650,7 @@ decide_capacity (struct OperationQueue *opq,
   }
 
  ret:
-  GNUNET_free_non_null (evict_entries);  
+  GNUNET_free_non_null (evict_entries);
   if (NULL != ops_)
     *ops_ = ops;
   else
@@ -675,12 +680,12 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old,
   unsigned int i;
   unsigned int j;
   unsigned int n_cur;
+
   GNUNET_assert (NULL != old);
   n_cur = *n_old;
   cur = *old;
   for (i = 0; i < n_new; i++)
-  {    
+  {
     for (j = 0; j < *n_old; j++)
     {
       if (new[i] == cur[j])
@@ -693,7 +698,7 @@ merge_ops (struct GNUNET_TESTBED_Operation ***old,
   *old = cur;
   *n_old = n_cur;
 }
-           
+
 
 
 /**
@@ -727,12 +732,12 @@ check_readiness (struct GNUNET_TESTBED_Operation *op)
     if (NULL == ops)
       continue;
     merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
-    GNUNET_free (ops);    
+    GNUNET_free (ops);
   }
   if (NULL != evict_ops)
   {
     for (i = 0; i < n_evict_ops; i++)
-      GNUNET_TESTBED_operation_release_ (evict_ops[i]); 
+      GNUNET_TESTBED_operation_release_ (evict_ops[i]);
     GNUNET_free (evict_ops);
     evict_ops = NULL;
     /* Evicting the operations should schedule this operation */
@@ -762,7 +767,7 @@ defer (struct GNUNET_TESTBED_Operation *op)
   for (i = 0; i < op->nqueues; i++)
   {
     GNUNET_assert (op->queues[i]->active >= op->nres[i]);
-    op->queues[i]->active -= op->nres[i];    
+    op->queues[i]->active -= op->nres[i];
   }
   change_state (op, OP_STATE_WAITING);
 }
@@ -801,10 +806,13 @@ cleanup_tslots (struct OperationQueue *queue)
 
 
 /**
- * Initializes the operation queue for parallel overlay connects
+ * Cleansup the existing timing slots and sets new timing slots in the given
+ * queue to accommodate given number of max active operations.
  *
- * @param h the host handle
- * @param npoc the number of parallel overlay connects - the queue size
+ * @param queue the queue
+ * @param n the number of maximum active operations.  If n is greater than the
+ *   maximum limit set while creating the queue, then the minimum of these two
+ *   will be selected as n
  */
 static void
 adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
@@ -812,10 +820,11 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
   struct FeedbackCtx *fctx = queue->fctx;
   struct TimeSlot *tslot;
   unsigned int cnt;
-  
+
   cleanup_tslots (queue);
   n = GNUNET_MIN (n ,fctx->max_active_bound);
   fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
+  fctx->nfailed = 0;
   for (cnt = 0; cnt < n; cnt++)
   {
     tslot = &fctx->tslots_freeptr[cnt];
@@ -831,10 +840,9 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
  * the feedback context.
  *
  * @param queue the queue
- * @param fail GNUNET_YES if the last operation failed; GNUNET_NO if not;
  */
 static void
-adapt_parallelism (struct OperationQueue *queue, int fail)
+adapt_parallelism (struct OperationQueue *queue)
 {
   struct GNUNET_TIME_Relative avg;
   struct FeedbackCtx *fctx;
@@ -853,8 +861,18 @@ adapt_parallelism (struct OperationQueue *queue, int fail)
     nvals += tslot->nvals;
   }
   GNUNET_assert (nvals >= queue->max_active);
+  GNUNET_assert (fctx->nfailed <= nvals);
+  nvals -= fctx->nfailed;
+  if (0 == nvals)
+  {
+    if (1 == queue->max_active)
+      adaptive_queue_set_max_active (queue, 1);
+    else
+      adaptive_queue_set_max_active (queue, queue->max_active / 2);
+    return;
+  }
   avg = GNUNET_TIME_relative_divide (avg, nvals);
-  if (GNUNET_SYSERR == 
+  if (GNUNET_SYSERR ==
       GNUNET_TESTBED_SD_deviation_factor_ (fctx->sd,
                                            (unsigned int) avg.rel_value_us,
                                            &sd))
@@ -863,17 +881,26 @@ adapt_parallelism (struct OperationQueue *queue, int fail)
     adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
     return;
   }
-  if ((0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, queue->max_active)))
-    GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+  if (1 == sd)
+    adaptive_queue_set_max_active (queue, queue->max_active - 1);
+  if (2 <= sd)
+    adaptive_queue_set_max_active (queue, queue->max_active / 2);
+  if (-1 == sd)
+    adaptive_queue_set_max_active (queue, queue->max_active + 1);
+  if (sd <= -2)
+    adaptive_queue_set_max_active (queue, queue->max_active * 2);
+
+#if 0                           /* old algorithm */
   if (sd < 0)
     sd = 0;
   GNUNET_assert (0 <= sd);
-  if ((0 == sd) && (! fail))
+  GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
+  if (0 == sd)
   {
     adaptive_queue_set_max_active (queue, queue->max_active * 2);
     return;
   }
-  if ((1 == sd) && (! fail))
+  if (1 == sd)
   {
     adaptive_queue_set_max_active (queue, queue->max_active + 1);
     return;
@@ -883,12 +910,13 @@ adapt_parallelism (struct OperationQueue *queue, int fail)
     adaptive_queue_set_max_active (queue, 1);
     return;
   }
-  if (((sd < 2) && (fail)) || (2 == sd))
+  if (2 == sd)
   {
     adaptive_queue_set_max_active (queue, queue->max_active - 1);
     return;
   }
   adaptive_queue_set_max_active (queue, queue->max_active / 2);
+#endif
 }
 
 
@@ -906,22 +934,24 @@ update_tslots (struct GNUNET_TESTBED_Operation *op)
   struct GNUNET_TIME_Relative t;
   struct TimeSlot *tslot;
   struct FeedbackCtx *fctx;
-  
+
   t = GNUNET_TIME_absolute_get_duration (op->tstart);
   while (NULL != (tslot = op->tslots_head)) /* update time slots */
   {
     queue = tslot->queue;
     fctx = queue->fctx;
-    tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
     GNUNET_CONTAINER_DLL_remove (op->tslots_head, op->tslots_tail, tslot);
-    tslot->op = NULL;    
+    tslot->op = NULL;
     GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
-  tslot);
+                                      tslot);
+    if (op->failed)
+      fctx->nfailed++;
+    tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
     if (0 != tslot->nvals++)
       continue;
     fctx->tslots_filled++;
     if (queue->max_active == fctx->tslots_filled)
-      adapt_parallelism (queue, op->failed);
+      adapt_parallelism (queue);
   }
 }
 
@@ -976,7 +1006,7 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
     fctx->max_active_bound = max_active;
     fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
     queue->fctx = fctx;
-    adaptive_queue_set_max_active (queue, 1); /* start with 1 */
+    adaptive_queue_set_max_active (queue, 4); /* start with 4 */
   }
   return queue;
 }
@@ -992,7 +1022,7 @@ void
 GNUNET_TESTBED_operation_queue_destroy_ (struct OperationQueue *queue)
 {
   struct FeedbackCtx *fctx;
-  
+
   GNUNET_break (GNUNET_YES == is_queue_empty (queue));
   if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
   {
@@ -1186,7 +1216,7 @@ GNUNET_TESTBED_operation_activate_ (struct GNUNET_TESTBED_Operation *op)
 void
 GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
 {
-  struct QueueEntry *entry;  
+  struct QueueEntry *entry;
   struct OperationQueue *opq;
   unsigned int i;
 
@@ -1202,19 +1232,19 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
   if (OP_STATE_ACTIVE == op->state)
     update_tslots (op);
   GNUNET_assert (NULL != op->queues);
-  GNUNET_assert (NULL != op->qentries);  
+  GNUNET_assert (NULL != op->qentries);
   for (i = 0; i < op->nqueues; i++)
   {
     entry = op->qentries[i];
     remove_queue_entry (op, i);
     opq = op->queues[i];
     switch (op->state)
-    {      
+    {
     case OP_STATE_INIT:
     case OP_STATE_INACTIVE:
       GNUNET_assert (0);
       break;
-    case OP_STATE_WAITING:      
+    case OP_STATE_WAITING:
       break;
     case OP_STATE_ACTIVE:
     case OP_STATE_READY:
@@ -1223,7 +1253,7 @@ GNUNET_TESTBED_operation_release_ (struct GNUNET_TESTBED_Operation *op)
       opq->active -= entry->nres;
       recheck_waiting (opq);
       break;
-    }    
+    }
     GNUNET_free (entry);
   }
   GNUNET_free_non_null (op->qentries);