-options to play with
[oweals/gnunet.git] / src / fs / fs_api.c
index 6d9af1187992493d2bb38d9989683de2d9091c7e..678187a27124ba60208c7a3221ca705607eb2e8b 100644 (file)
@@ -59,7 +59,12 @@ start_job (struct GNUNET_FS_QueueEntry *qe)
   qe->start (qe->cls, qe->client);
   qe->start_times++;
   qe->h->active_blocks += qe->blocks;
+  qe->h->active_downloads++;
   qe->start_time = GNUNET_TIME_absolute_get ();
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Starting job %p (%u active)\n",
+             qe,
+             qe->h->active_downloads);
   GNUNET_CONTAINER_DLL_remove (qe->h->pending_head, qe->h->pending_tail, qe);
   GNUNET_CONTAINER_DLL_insert_after (qe->h->running_head, qe->h->running_tail,
                                      qe->h->running_tail, qe);
@@ -77,12 +82,17 @@ stop_job (struct GNUNET_FS_QueueEntry *qe)
 {
   qe->client = NULL;
   qe->stop (qe->cls);
+  GNUNET_assert (0 < qe->h->active_downloads);
   qe->h->active_downloads--;
   qe->h->active_blocks -= qe->blocks;
   qe->run_time =
       GNUNET_TIME_relative_add (qe->run_time,
                                 GNUNET_TIME_absolute_get_duration
                                 (qe->start_time));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Stopping job %p (%u active)\n",
+             qe,
+             qe->h->active_downloads);
   GNUNET_CONTAINER_DLL_remove (qe->h->running_head, qe->h->running_tail, qe);
   GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head, qe->h->pending_tail,
                                      qe->h->pending_tail, qe);
@@ -106,20 +116,24 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   struct GNUNET_TIME_Relative restart_at;
   struct GNUNET_TIME_Relative rst;
   struct GNUNET_TIME_Absolute end_time;
-  unsigned int num_download_waiting;
-  unsigned int num_download_active;
-  unsigned int num_download_expired;
+  unsigned int num_downloads_waiting;
+  unsigned int num_downloads_active;
+  unsigned int num_downloads_expired;
   unsigned int num_probes_active;
   unsigned int num_probes_waiting;
   unsigned int num_probes_expired;
   int num_probes_change;
-  int num_download_change;
+  int num_downloads_change;
+  int block_limit_hit;
 
   h->queue_job = GNUNET_SCHEDULER_NO_TASK;
+  /* restart_at will be set to the time when it makes sense to
+     re-evaluate the job queue (unless, of course, jobs complete
+     or are added, then we'll be triggered immediately */
   restart_at = GNUNET_TIME_UNIT_FOREVER_REL;
-  /* first, see if we can start all the jobs */
+  /* first, calculate some basic statistics on pending jobs */
   num_probes_waiting = 0;
-  num_download_waiting = 0;
+  num_downloads_waiting = 0;
   for (qe = h->pending_head; NULL != qe; qe = qe->next)
   {
     switch (qe->priority)
@@ -128,190 +142,154 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       num_probes_waiting++;
       break;
     case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
-      num_download_waiting++;
+      num_downloads_waiting++;
       break;
     default:
       GNUNET_break (0);
       break;
     }
   }
+  /* now, calculate some basic statistics on running jobs */
   num_probes_active = 0;
   num_probes_expired = 0;
-  num_download_active = 0;
-  num_download_expired = 0;
-  for (qe = h->running_head; NULL != qe; qe = qe->next)
+  num_downloads_active = 0;
+  num_downloads_expired = 0;
+  next = h->running_head;
+  while (NULL != (qe = next))
   {
-    run_time =
-        GNUNET_TIME_relative_multiply (h->avg_block_latency,
-                                       qe->blocks * qe->start_times);
+    next = qe->next;
     switch (qe->priority)
     {
-      case GNUNET_FS_QUEUE_PRIORITY_PROBE:
+    case GNUNET_FS_QUEUE_PRIORITY_PROBE:
+      run_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2);
+      end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
+      rst = GNUNET_TIME_absolute_get_remaining (end_time);
+      if (0 == rst.rel_value)
+      {
+       num_probes_expired++;
+       stop_job (qe);
+      }
+      else
+      {
        num_probes_active++;
-       /* run probes for at most 1s * number-of-restarts; note that
-          as the total runtime of a probe is limited to 2m, we don't
-          need to additionally limit the total time of a probe to 
-          strictly limit its lifetime. */
-       run_time = GNUNET_TIME_relative_min (run_time,
-                                            GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
-                                                                           1 + qe->start_times));
-       end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
-       rst = GNUNET_TIME_absolute_get_remaining (end_time);
        restart_at = GNUNET_TIME_relative_min (rst, restart_at);
-       if (0 == rst.rel_value)
-         num_probes_expired++;
-       break;
-      case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
-       num_download_active++;
-       end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
-       rst = GNUNET_TIME_absolute_get_remaining (end_time);
-       restart_at = GNUNET_TIME_relative_min (rst, restart_at);
-       if (0 == rst.rel_value)
-         num_download_expired++;
-       break;
-      default:
-       GNUNET_break (0);
-       break;
-    }
-  }
-  
-  /* calculate stop decisions */
-  num_probes_change = 0;
-  num_download_change = 0;
-  if (h->active_downloads + num_download_waiting > h->max_parallel_requests)
-  {
-    if (num_probes_active > 0)
-      num_probes_change = - GNUNET_MIN (num_probes_active,
-                                       h->max_parallel_requests - (h->active_downloads + num_download_waiting));
-    else if (h->active_downloads + num_download_waiting > h->max_parallel_requests)
-      num_download_change = - GNUNET_MIN (num_download_expired,
-                                         h->max_parallel_requests - (h->active_downloads + num_download_waiting));
-  }
-
-  /* then, check if we should stop some jobs */
-  next = h->running_head;
-  while (NULL != (qe = next))
-  {
-    next = qe->next;
-    run_time =
+      }
+      break;
+    case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
+      run_time =
         GNUNET_TIME_relative_multiply (h->avg_block_latency,
                                        qe->blocks * qe->start_times);
-    switch (qe->priority)
+      end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
+      rst = GNUNET_TIME_absolute_get_remaining (end_time);
+      if (0 == rst.rel_value)
       {
-      case GNUNET_FS_QUEUE_PRIORITY_PROBE:
-       /* run probes for at most 1s * number-of-restarts; note that
-          as the total runtime of a probe is limited to 2m, we don't
-          need to additionally limit the total time of a probe to 
-          strictly limit its lifetime. */
-       run_time = GNUNET_TIME_relative_min (run_time,
-                                            GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
-                                                                           1 + qe->start_times));
-       end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
-       rst = GNUNET_TIME_absolute_get_remaining (end_time);
-       restart_at = GNUNET_TIME_relative_min (rst, restart_at);
-       if ( (num_probes_change < 0) &&
-            ( (num_probes_expired < - num_probes_change) ||
-              (0 == rst.rel_value) ) )
-       {
-         stop_job (qe);
-         num_probes_change++;
-         if (0 == rst.rel_value)
-           num_probes_expired--;
-       }
-       break;
-      case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
-       end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
-       rst = GNUNET_TIME_absolute_get_remaining (end_time);
+       num_downloads_expired++;
+       stop_job (qe);
+      }
+      else
+      {
+       num_downloads_active++;
        restart_at = GNUNET_TIME_relative_min (rst, restart_at);
-       if ( (num_download_change < 0) &&
-            ( (num_download_expired < - num_download_change) ||
-              (0 == rst.rel_value) ) )
-       {
-         stop_job (qe);
-         num_download_change++;
-         if (0 == rst.rel_value)
-           num_download_expired--;
-       }
-       break;
-      default:
-       GNUNET_break (0);
-       break;
       }
+      break;
+    default:
+      GNUNET_break (0);
+      break;
+    }
   }
-
-  /* FIXME: calculate start decisions */
-  num_probes_change = 0;
-  num_download_change = 0;
-  if (h->active_downloads + num_download_waiting < h->max_parallel_requests)
-  {
-    num_download_change = num_download_waiting;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "PA: %u, PE: %u, PW: %u; DA: %u, DE: %u, DW: %u\n",
+             num_probes_active,
+             num_probes_expired,
+             num_probes_waiting,
+             num_downloads_active,
+             num_downloads_expired,
+             num_downloads_waiting);
+  /* calculate start/stop decisions */
+  if (h->active_downloads + num_downloads_waiting > h->max_parallel_requests)
+  {
+    /* stop probes if possible */
+    num_probes_change = - num_probes_active;
+    num_downloads_change = h->max_parallel_requests - h->active_downloads;
+  } 
+  else 
+  {
+    /* start all downloads */
+    num_downloads_change = num_downloads_waiting;
+    /* start as many probes as we can */
     num_probes_change = GNUNET_MIN (num_probes_waiting,
-                                   h->max_parallel_requests - (h->active_downloads + num_download_waiting)); 
-  }
-
-
-  next = h->pending_head;
+                                   h->max_parallel_requests - (h->active_downloads + num_downloads_waiting));
+  }
+       
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Changing %d probes and %d downloads\n",
+             num_probes_change,
+             num_downloads_change);
+  /* actually stop probes */
+  next = h->running_head;
   while (NULL != (qe = next))
   {
     next = qe->next;
-    if (NULL == h->running_head)
-    {
-      start_job (qe);
+    if (GNUNET_FS_QUEUE_PRIORITY_PROBE != qe->priority)
       continue;
-    }
-    if ((qe->blocks + h->active_blocks <= h->max_parallel_requests) &&
-        (h->active_downloads < h->max_parallel_downloads))
+    if (num_probes_change < 0) 
     {
-      start_job (qe);
-      continue;
+      stop_job (qe);
+      num_probes_change++;
+      if (0 == num_probes_change)
+       break;
     }
   }
-  if (NULL == h->pending_head)
-    return;                     /* no need to stop anything */
-  /* then, check if we should stop some jobs */
-  next = h->running_head;
-  while (NULL != (qe = next))
+  GNUNET_break (0 <= num_probes_change);
+
+  /* start some more tasks if we now have empty slots */
+  block_limit_hit = GNUNET_NO;
+  next = h->pending_head;
+  while ( (NULL != (qe = next)) &&
+         ( (num_probes_change > 0) ||
+           (num_downloads_change > 0) ) )
   {
     next = qe->next;
-    run_time =
-        GNUNET_TIME_relative_multiply (h->avg_block_latency,
-                                       qe->blocks * qe->start_times);
     switch (qe->priority)
+    {
+    case GNUNET_FS_QUEUE_PRIORITY_PROBE:
+      if (num_probes_change > 0)
       {
-      case GNUNET_FS_QUEUE_PRIORITY_PROBE:
-       /* run probes for at most 1s * number-of-restarts; note that
-          as the total runtime of a probe is limited to 2m, we don't
-          need to additionally limit the total time of a probe to 
-          strictly limit its lifetime. */
-       run_time = GNUNET_TIME_relative_min (run_time,
-                                            GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
-                                                                           1 + qe->start_times));
-       break;
-      case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
-       break;
-      default:
-       GNUNET_break (0);
-       break;
+       start_job (qe);
+       num_probes_change--;
+       run_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2);
+       restart_at = GNUNET_TIME_relative_min (run_time, restart_at);
       }
-    end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
-    rst = GNUNET_TIME_absolute_get_remaining (end_time);
-    restart_at = GNUNET_TIME_relative_min (rst, restart_at);
-    if (rst.rel_value > 0)
-      continue;
-    stop_job (qe);
-  }
-  /* finally, start some more tasks if we now have empty slots */
-  next = h->pending_head;
-  while (NULL != (qe = next))
-  {
-    next = qe->next;
-    if ((qe->blocks + h->active_blocks <= h->max_parallel_requests) &&
-        (h->active_downloads < h->max_parallel_downloads))
-    {
-      start_job (qe);
-      continue;
+      break;
+    case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
+      if ( (num_downloads_change > 0) &&
+          ( (qe->blocks + h->active_blocks <= h->max_parallel_requests) ||
+            ( (qe->blocks > h->max_parallel_requests) &&
+              (0 == h->active_downloads) ) ) )
+      {    
+       start_job (qe);
+       num_downloads_change--;
+      }
+      else if (num_downloads_change > 0)
+       block_limit_hit = GNUNET_YES;
+      break;
+    default:
+      GNUNET_break (0);
+      break;
     }
   }
+  GNUNET_break ( (0 == num_downloads_change) || (GNUNET_YES == block_limit_hit) );
+  GNUNET_break (0 == num_probes_change);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "AD: %u, MP: %u; %d probes and %d downloads to start, will run again in %s\n",
+             h->active_downloads,
+             h->max_parallel_requests,       
+             num_probes_change,
+             num_downloads_change,
+             GNUNET_STRINGS_relative_time_to_string (restart_at, GNUNET_YES));
+
+  /* make sure we run again */
   h->queue_job =
       GNUNET_SCHEDULER_add_delayed (restart_at, &process_job_queue, h);
 }
@@ -348,24 +326,31 @@ GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, GNUNET_FS_QueueStart start,
   if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
     GNUNET_SCHEDULER_cancel (h->queue_job);
   h->queue_job = GNUNET_SCHEDULER_add_now (&process_job_queue, h);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Queueing job %p\n",
+             qe);
   return qe;
 }
 
 
 /**
  * Dequeue a job from the queue.
- * @param qh handle for the job
+ *
+ * @param qe handle for the job
  */
 void
-GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh)
+GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qe)
 {
   struct GNUNET_FS_Handle *h;
 
-  h = qh->h;
-  if (NULL != qh->client)
-    stop_job (qh);
-  GNUNET_CONTAINER_DLL_remove (h->pending_head, h->pending_tail, qh);
-  GNUNET_free (qh);
+  h = qe->h;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Dequeueing job %p\n",
+             qe);
+  if (NULL != qe->client)
+    stop_job (qe);
+  GNUNET_CONTAINER_DLL_remove (h->pending_head, h->pending_tail, qe);
+  GNUNET_free (qe);
   if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
     GNUNET_SCHEDULER_cancel (h->queue_job);
   h->queue_job = GNUNET_SCHEDULER_add_now (&process_job_queue, h);
@@ -482,9 +467,9 @@ GNUNET_FS_data_reader_file_ (void *cls, uint64_t offset, size_t max, void *buf,
       return 0;
     }
   }
-  GNUNET_DISK_file_seek (fi->fd, offset, GNUNET_DISK_SEEK_SET);
-  ret = GNUNET_DISK_file_read (fi->fd, buf, max);
-  if (-1 == ret)
+  if ( (GNUNET_SYSERR == 
+       GNUNET_DISK_file_seek (fi->fd, offset, GNUNET_DISK_SEEK_SET)) ||
+       (-1 == (ret = GNUNET_DISK_file_read (fi->fd, buf, max))) )
   {
     GNUNET_asprintf (emsg, _("Could not read file `%s': %s"), fi->filename,
                      STRERROR (errno));
@@ -768,7 +753,7 @@ GNUNET_FS_remove_sync_dir_ (struct GNUNET_FS_Handle *h, const char *ext,
   dn = get_serialization_file_name_in_dir (h, ext, uni, "");
   if (NULL == dn)
     return;
-  if ((GNUNET_OK == GNUNET_DISK_directory_test (dn)) &&
+  if ((GNUNET_YES == GNUNET_DISK_directory_test (dn, GNUNET_YES)) &&
       (GNUNET_OK != GNUNET_DISK_directory_remove (dn)))
     GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, "rmdir", dn);
   GNUNET_free (dn);
@@ -1527,8 +1512,8 @@ deserialize_publish_file (void *cls, const char *filename)
   }
   if (NULL != ns)
   {
-    pc->namespace = GNUNET_FS_namespace_create (h, ns);
-    if (NULL == pc->namespace)
+    pc->ns = GNUNET_FS_namespace_create (h, ns);
+    if (NULL == pc->ns)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   _
@@ -1647,7 +1632,7 @@ GNUNET_FS_publish_sync_ (struct GNUNET_FS_PublishContext *pc)
                                 (NULL == pc->fi_pos) ? NULL : pc->fi_pos->serialization)) ||
       (GNUNET_OK !=
        GNUNET_BIO_write_string (wh,
-                                (NULL == pc->namespace) ? NULL : pc->namespace->name)))
+                                (NULL == pc->ns) ? NULL : pc->ns->name)))
   {
     GNUNET_break (0);
     goto cleanup;
@@ -1792,7 +1777,7 @@ read_download_request (struct GNUNET_BIO_ReadHandle *rh)
   }
   if (dr->num_children > 0)
     dr->children =
-        GNUNET_malloc (dr->num_children * sizeof (struct ContentHashKey));
+        GNUNET_malloc (dr->num_children * sizeof (struct DownloadRequest *));
   switch (dr->state)
   {
   case BRS_INIT:
@@ -2711,7 +2696,7 @@ deserialize_download (struct GNUNET_FS_Handle *h,
   }
   dc->options = (enum GNUNET_FS_DownloadOptions) options;
   dc->active =
-      GNUNET_CONTAINER_multihashmap_create (1 + 2 * (dc->length / DBLOCK_SIZE));
+    GNUNET_CONTAINER_multihashmap_create (1 + 2 * (dc->length / DBLOCK_SIZE), GNUNET_NO);
   dc->has_finished = (int) status;
   dc->treedepth =
       GNUNET_FS_compute_depth (GNUNET_FS_uri_chk_get_file_size (dc->uri));
@@ -2730,7 +2715,7 @@ deserialize_download (struct GNUNET_FS_Handle *h,
   dn = get_download_sync_filename (dc, dc->serialization, ".dir");
   if (NULL != dn)
   {
-    if (GNUNET_YES == GNUNET_DISK_directory_test (dn))
+    if (GNUNET_YES == GNUNET_DISK_directory_test (dn, GNUNET_YES))
       GNUNET_DISK_directory_scan (dn, &deserialize_subdownload, dc);
     GNUNET_free (dn);
   }
@@ -2834,7 +2819,7 @@ deserialize_search (struct GNUNET_FS_Handle *h,
     goto cleanup;
   }
   sc->options = (enum GNUNET_FS_SearchOptions) options;
-  sc->master_result_map = GNUNET_CONTAINER_multihashmap_create (16);
+  sc->master_result_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO);
   dn = get_serialization_file_name_in_dir (h,
                                            (sc->psearch_result ==
                                             NULL) ?
@@ -2843,7 +2828,7 @@ deserialize_search (struct GNUNET_FS_Handle *h,
                                            sc->serialization, "");
   if (NULL != dn)
   {
-    if (GNUNET_YES == GNUNET_DISK_directory_test (dn))
+    if (GNUNET_YES == GNUNET_DISK_directory_test (dn, GNUNET_YES))
       GNUNET_DISK_directory_scan (dn, &deserialize_search_result, sc);
     GNUNET_free (dn);
   }
@@ -2970,7 +2955,7 @@ deserialization_master (const char *master_path, GNUNET_FileNameCallback proc,
   dn = get_serialization_file_name (h, master_path, "");
   if (NULL == dn)
     return;
-  if (GNUNET_YES == GNUNET_DISK_directory_test (dn))
+  if (GNUNET_YES == GNUNET_DISK_directory_test (dn, GNUNET_YES))
     GNUNET_DISK_directory_scan (dn, proc, h);
   GNUNET_free (dn);
 }