/*
This file is part of GNUnet.
- (C) 2001--2012 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2001--2012 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ SPDX-License-Identifier: AGPL3.0-or-later
*/
/**
static void
start_job (struct GNUNET_FS_QueueEntry *qe)
{
- GNUNET_assert (NULL == qe->client);
- qe->client = GNUNET_CLIENT_connect ("fs", qe->h->cfg);
- if (NULL == qe->client)
- {
- GNUNET_break (0);
- return;
- }
- qe->start (qe->cls, qe->client);
+ qe->active = GNUNET_YES;
+ qe->start (qe->cls);
qe->start_times++;
qe->h->active_blocks += qe->blocks;
qe->h->active_downloads++;
"Starting job %p (%u active)\n",
qe,
qe->h->active_downloads);
- GNUNET_CONTAINER_DLL_remove (qe->h->pending_head, qe->h->pending_tail, qe);
- GNUNET_CONTAINER_DLL_insert_after (qe->h->running_head, qe->h->running_tail,
- qe->h->running_tail, qe);
+ GNUNET_CONTAINER_DLL_remove (qe->h->pending_head,
+ qe->h->pending_tail,
+ qe);
+ GNUNET_CONTAINER_DLL_insert_after (qe->h->running_head,
+ qe->h->running_tail,
+ qe->h->running_tail,
+ qe);
}
static void
stop_job (struct GNUNET_FS_QueueEntry *qe)
{
- qe->client = NULL;
+ qe->active = GNUNET_NO;
qe->stop (qe->cls);
GNUNET_assert (0 < qe->h->active_downloads);
qe->h->active_downloads--;
"Stopping job %p (%u active)\n",
qe,
qe->h->active_downloads);
- GNUNET_CONTAINER_DLL_remove (qe->h->running_head, qe->h->running_tail, qe);
- GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head, qe->h->pending_tail,
- qe->h->pending_tail, qe);
+ GNUNET_CONTAINER_DLL_remove (qe->h->running_head,
+ qe->h->running_tail,
+ qe);
+ GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head,
+ qe->h->pending_tail,
+ qe->h->pending_tail,
+ qe);
}
* and stopping others.
*
* @param cls the `struct GNUNET_FS_Handle *`
- * @param tc scheduler context
*/
static void
-process_job_queue (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+process_job_queue (void *cls)
{
struct GNUNET_FS_Handle *h = cls;
struct GNUNET_FS_QueueEntry *qe;
int num_downloads_change;
int block_limit_hit;
- h->queue_job = GNUNET_SCHEDULER_NO_TASK;
+ h->queue_job = NULL;
/* restart_at will be set to the time when it makes sense to
re-evaluate the job queue (unless, of course, jobs complete
or are added, then we'll be triggered immediately */
break;
case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
run_time =
- GNUNET_TIME_relative_multiply (h->avg_block_latency,
- qe->blocks * qe->start_times);
+ GNUNET_TIME_relative_saturating_multiply (h->avg_block_latency,
+ qe->blocks * qe->start_times);
end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
rst = GNUNET_TIME_absolute_get_remaining (end_time);
if (0 == rst.rel_value_us)
break;
}
}
+ GNUNET_break (h->active_downloads ==
+ num_downloads_active + num_probes_active);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"PA: %u, PE: %u, PW: %u; DA: %u, DE: %u, DW: %u\n",
num_probes_active,
num_downloads_active,
num_downloads_expired,
num_downloads_waiting);
+ GNUNET_break (h->active_downloads + num_probes_active <=
+ h->max_parallel_downloads);
/* calculate start/stop decisions */
- if (h->active_downloads + num_downloads_waiting > h->max_parallel_requests)
+ if (h->active_downloads + num_downloads_waiting > h->max_parallel_downloads)
{
- /* stop probes if possible */
- num_probes_change = - num_probes_active;
- num_downloads_change = h->max_parallel_requests - h->active_downloads;
+ /* stop as many probes as there are downloads and probes */
+ num_probes_change = - GNUNET_MIN (num_probes_active,
+ num_downloads_waiting);
+ /* start as many downloads as there are free slots, including those
+ we just opened up */
+ num_downloads_change = h->max_parallel_downloads - h->active_downloads - num_probes_change;
}
else
{
- /* start all downloads */
+ /* start all downloads (we can) */
num_downloads_change = num_downloads_waiting;
- /* start as many probes as we can */
- num_probes_change = GNUNET_MIN (num_probes_waiting,
- h->max_parallel_requests - (h->active_downloads + num_downloads_waiting));
- }
-
+ /* also start probes if there is room, but use a lower cap of (mpd/4) + 1 */
+ if (1 + h->max_parallel_downloads / 4 >= (h->active_downloads + num_downloads_change))
+ num_probes_change = GNUNET_MIN (num_probes_waiting,
+ (1 + h->max_parallel_downloads / 4) - (h->active_downloads + num_downloads_change));
+ else
+ num_probes_change = 0;
+ }
+ GNUNET_break (num_downloads_change <= num_downloads_waiting);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Changing %d probes and %d downloads\n",
+ "Changing %d probes and %d/%u/%u downloads\n",
num_probes_change,
- num_downloads_change);
+ num_downloads_change,
+ (unsigned int) h->active_downloads,
+ (unsigned int) h->max_parallel_downloads);
/* actually stop probes */
next = h->running_head;
while (NULL != (qe = next))
break;
}
}
- GNUNET_break ( (0 == num_downloads_change) || (GNUNET_YES == block_limit_hit) );
+ GNUNET_break ( (0 == num_downloads_change) ||
+ (GNUNET_YES == block_limit_hit) );
GNUNET_break (0 == num_probes_change);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
num_downloads_change,
GNUNET_STRINGS_relative_time_to_string (restart_at, GNUNET_YES));
- /* make sure we run again */
+ /* make sure we run again, callbacks might have
+ already re-scheduled the job, so cancel such
+ an operation (if it exists) */
+ if (NULL != h->queue_job)
+ GNUNET_SCHEDULER_cancel (h->queue_job);
h->queue_job =
GNUNET_SCHEDULER_add_delayed (restart_at, &process_job_queue, h);
}
*/
struct GNUNET_FS_QueueEntry *
GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h,
- GNUNET_FS_QueueStart start,
- GNUNET_FS_QueueStop stop, void *cls,
+ GNUNET_SCHEDULER_TaskCallback start,
+ GNUNET_SCHEDULER_TaskCallback stop,
+ void *cls,
unsigned int blocks,
enum GNUNET_FS_QueuePriority priority)
{
qe->priority = priority;
GNUNET_CONTAINER_DLL_insert_after (h->pending_head, h->pending_tail,
h->pending_tail, qe);
- if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+ if (NULL != h->queue_job)
GNUNET_SCHEDULER_cancel (h->queue_job);
h->queue_job = GNUNET_SCHEDULER_add_now (&process_job_queue, h);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dequeueing job %p\n",
qe);
- if (NULL != qe->client)
+ if (GNUNET_YES == qe->active)
stop_job (qe);
- GNUNET_CONTAINER_DLL_remove (h->pending_head, h->pending_tail, qe);
+ GNUNET_CONTAINER_DLL_remove (h->pending_head,
+ h->pending_tail,
+ qe);
GNUNET_free (qe);
- if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+ if (NULL != h->queue_job)
GNUNET_SCHEDULER_cancel (h->queue_job);
- h->queue_job = GNUNET_SCHEDULER_add_now (&process_job_queue, h);
+ h->queue_job = GNUNET_SCHEDULER_add_now (&process_job_queue,
+ h);
}
ret = GNUNET_new (struct TopLevelActivity);
ret->ssf = ssf;
ret->ssf_cls = ssf_cls;
- GNUNET_CONTAINER_DLL_insert (h->top_head, h->top_tail, ret);
+ GNUNET_CONTAINER_DLL_insert (h->top_head,
+ h->top_tail,
+ ret);
return ret;
}
GNUNET_FS_end_top (struct GNUNET_FS_Handle *h,
struct TopLevelActivity *top)
{
- GNUNET_CONTAINER_DLL_remove (h->top_head, h->top_tail, top);
+ GNUNET_CONTAINER_DLL_remove (h->top_head,
+ h->top_tail,
+ top);
GNUNET_free (top);
}
GNUNET_free_non_null (data);
return 0;
}
- memcpy (buf, &data[offset], max);
+ GNUNET_memcpy (buf, &data[offset], max);
return max;
}
char b;
char *ksks;
char *chks;
+ char *skss;
char *filename;
uint32_t dsize;
ret->h = h;
ksks = NULL;
chks = NULL;
+ skss = NULL;
filename = NULL;
if ((GNUNET_OK != GNUNET_BIO_read_meta_data (rh, "metadata", &ret->meta)) ||
(GNUNET_OK != GNUNET_BIO_read_string (rh, "ksk-uri", &ksks, 32 * 1024)) ||
( (NULL != chks) &&
( (NULL == (ret->chk_uri = GNUNET_FS_uri_parse (chks, NULL))) ||
(GNUNET_YES != GNUNET_FS_uri_test_chk (ret->chk_uri))) ) ||
+ (GNUNET_OK != GNUNET_BIO_read_string (rh, "sks-uri", &skss, 1024)) ||
+ ( (NULL != skss) &&
+ ( (NULL == (ret->sks_uri = GNUNET_FS_uri_parse (skss, NULL))) ||
+ (GNUNET_YES != GNUNET_FS_uri_test_sks (ret->sks_uri))) ) ||
(GNUNET_OK != read_start_time (rh, &ret->start_time)) ||
(GNUNET_OK != GNUNET_BIO_read_string (rh, "emsg", &ret->emsg, 16 * 1024))
|| (GNUNET_OK !=
filename = NULL;
}
GNUNET_free_non_null (ksks);
+ GNUNET_free_non_null (skss);
GNUNET_free_non_null (chks);
return ret;
cleanup:
GNUNET_free_non_null (ksks);
GNUNET_free_non_null (chks);
+ GNUNET_free_non_null (skss);
GNUNET_free_non_null (filename);
GNUNET_FS_file_information_destroy (ret, NULL, NULL);
return NULL;
char b;
char *ksks;
char *chks;
+ char *skss;
if (NULL == fi->serialization)
fi->serialization =
chks = GNUNET_FS_uri_to_string (fi->chk_uri);
else
chks = NULL;
+ if (NULL != fi->sks_uri)
+ skss = GNUNET_FS_uri_to_string (fi->sks_uri);
+ else
+ skss = NULL;
if ((GNUNET_OK != GNUNET_BIO_write (wh, &b, sizeof (b))) ||
(GNUNET_OK != GNUNET_BIO_write_meta_data (wh, fi->meta)) ||
(GNUNET_OK != GNUNET_BIO_write_string (wh, ksks)) ||
(GNUNET_OK != GNUNET_BIO_write_string (wh, chks)) ||
+ (GNUNET_OK != GNUNET_BIO_write_string (wh, skss)) ||
(GNUNET_OK != write_start_time (wh, fi->start_time)) ||
(GNUNET_OK != GNUNET_BIO_write_string (wh, fi->emsg)) ||
(GNUNET_OK != GNUNET_BIO_write_string (wh, fi->filename)) ||
chks = NULL;
GNUNET_free_non_null (ksks);
ksks = NULL;
+ GNUNET_free_non_null (skss);
+ skss = NULL;
switch (b)
{
(void) GNUNET_BIO_write_close (wh);
GNUNET_free_non_null (chks);
GNUNET_free_non_null (ksks);
+ GNUNET_free_non_null (skss);
fn = get_serialization_file_name (fi->h, GNUNET_FS_SYNC_PATH_FILE_INFO,
fi->serialization);
if (NULL != fn)
/* re-start publishing (if needed)... */
if (GNUNET_YES != pc->all_done)
{
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pc->upload_task);
+ GNUNET_assert (NULL == pc->upload_task);
pc->upload_task =
GNUNET_SCHEDULER_add_with_priority
(GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
filename, emsg);
GNUNET_free (emsg);
}
- pc->top = GNUNET_FS_make_top (h, &GNUNET_FS_publish_signal_suspend_, pc);
+ pc->top = GNUNET_FS_make_top (h,
+ &GNUNET_FS_publish_signal_suspend_,
+ pc);
return GNUNET_OK;
cleanup:
GNUNET_free_non_null (pc->nid);
GNUNET_assert ((GNUNET_YES == GNUNET_FS_uri_test_ksk (sc->uri)) ||
(GNUNET_YES == GNUNET_FS_uri_test_sks (sc->uri)));
uris = GNUNET_FS_uri_to_string (sc->uri);
- in_pause = (sc->task != GNUNET_SCHEDULER_NO_TASK) ? 'r' : '\0';
+ in_pause = (sc->task != NULL) ? 'r' : '\0';
if ((GNUNET_OK != GNUNET_BIO_write_string (wh, uris)) ||
(GNUNET_OK != write_start_time (wh, sc->start_time)) ||
(GNUNET_OK != GNUNET_BIO_write_string (wh, sc->emsg)) ||
GNUNET_break (0);
goto cleanup;
}
- uc->top = GNUNET_FS_make_top (h, &GNUNET_FS_unindex_signal_suspend_, uc);
+ uc->top = GNUNET_FS_make_top (h,
+ &GNUNET_FS_unindex_signal_suspend_,
+ uc);
pi.status = GNUNET_FS_STATUS_UNINDEX_RESUME;
pi.value.unindex.specifics.resume.message = uc->emsg;
GNUNET_FS_unindex_make_status_ (&pi, uc,
signal_download_resume (dcc);
dcc = dcc->next;
}
- if (NULL != dc->pending_head)
- GNUNET_FS_download_start_downloading_ (dc);
}
GNUNET_FS_compute_depth (GNUNET_FS_uri_chk_get_file_size (dc->uri));
if (GNUNET_FS_uri_test_loc (dc->uri))
GNUNET_assert (GNUNET_OK ==
- GNUNET_FS_uri_loc_get_peer_identity (dc->uri, &dc->target));
+ GNUNET_FS_uri_loc_get_peer_identity (dc->uri,
+ &dc->target));
if (NULL == dc->emsg)
{
dc->top_request = read_download_request (rh);
goto cleanup;
}
}
- dn = get_download_sync_filename (dc, dc->serialization, ".dir");
+ dn = get_download_sync_filename (dc,
+ dc->serialization,
+ ".dir");
if (NULL != dn)
{
- if (GNUNET_YES == GNUNET_DISK_directory_test (dn, GNUNET_YES))
- GNUNET_DISK_directory_scan (dn, &deserialize_subdownload, dc);
+ if (GNUNET_YES ==
+ GNUNET_DISK_directory_test (dn,
+ GNUNET_YES))
+ GNUNET_DISK_directory_scan (dn,
+ &deserialize_subdownload,
+ dc);
GNUNET_free (dn);
}
if (NULL != parent)
{
- GNUNET_CONTAINER_DLL_insert (parent->child_head, parent->child_tail, dc);
+ GNUNET_CONTAINER_DLL_insert (parent->child_head,
+ parent->child_tail,
+ dc);
}
if (NULL != search)
{
dc->search = search;
search->download = dc;
}
- if ((NULL == parent) && (NULL == search))
+ if ( (NULL == parent) &&
+ (NULL == search) )
{
- dc->top =
- GNUNET_FS_make_top (dc->h, &GNUNET_FS_download_signal_suspend_, dc);
+ dc->top
+ = GNUNET_FS_make_top (dc->h,
+ &GNUNET_FS_download_signal_suspend_,
+ dc);
signal_download_resume (dc);
}
GNUNET_free (uris);
- dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, dc);
+ GNUNET_assert (NULL == dc->job_queue);
+ dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_,
+ dc);
return;
cleanup:
GNUNET_free_non_null (uris);
pi.status = GNUNET_FS_STATUS_SEARCH_RESUME;
pi.value.search.specifics.resume.message = sc->emsg;
pi.value.search.specifics.resume.is_paused =
- (NULL == sc->client) ? GNUNET_YES : GNUNET_NO;
+ (NULL == sc->mq) ? GNUNET_YES : GNUNET_NO;
sc->client_info = GNUNET_FS_search_make_status_ (&pi, sc->h, sc);
GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map,
&signal_result_resume, sc);
}
sc->h = h;
sc->serialization = GNUNET_strdup (serialization);
- if ((GNUNET_OK != GNUNET_BIO_read_string (rh, "search-uri", &uris, 10 * 1024))
- || (NULL == (sc->uri = GNUNET_FS_uri_parse (uris, &emsg))) ||
+ if ((GNUNET_OK !=
+ GNUNET_BIO_read_string (rh, "search-uri", &uris, 10 * 1024)) ||
+ (NULL == (sc->uri = GNUNET_FS_uri_parse (uris, &emsg))) ||
((GNUNET_YES != GNUNET_FS_uri_test_ksk (sc->uri)) &&
(GNUNET_YES != GNUNET_FS_uri_test_sks (sc->uri))) ||
(GNUNET_OK != read_start_time (rh, &sc->start_time)) ||
sc->options = (enum GNUNET_FS_SearchOptions) options;
sc->master_result_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO);
dn = get_serialization_file_name_in_dir (h,
- (sc->psearch_result ==
- NULL) ?
+ (NULL == sc->psearch_result) ?
GNUNET_FS_SYNC_PATH_MASTER_SEARCH :
GNUNET_FS_SYNC_PATH_CHILD_SEARCH,
sc->serialization, "");
* Deserialize informatin about pending operations.
*
* @param master_path which master directory should be scanned
- * @param proc function to call for each entry (will get 'h' for 'cls')
- * @param h the 'struct GNUNET_FS_Handle*'
+ * @param proc function to call for each entry (will get @a h for 'cls')
+ * @param h the `struct GNUNET_FS_Handle *`
*/
static void
-deserialization_master (const char *master_path, GNUNET_FileNameCallback proc,
+deserialization_master (const char *master_path,
+ GNUNET_FileNameCallback proc,
struct GNUNET_FS_Handle *h)
{
char *dn;
ret->max_parallel_requests = DEFAULT_MAX_PARALLEL_REQUESTS;
ret->avg_block_latency = GNUNET_TIME_UNIT_MINUTES; /* conservative starting point */
va_start (ap, flags);
- while (GNUNET_FS_OPTIONS_END != (opt = va_arg (ap, enum GNUNET_FS_OPTIONS)))
+ while (GNUNET_FS_OPTIONS_END != (opt = GNUNET_VA_ARG_ENUM (ap,GNUNET_FS_OPTIONS)))
{
switch (opt)
{
/**
* Close our connection with the file-sharing service.
- * The callback given to GNUNET_FS_start will no longer be
+ * The callback given to #GNUNET_FS_start() will no longer be
* called after this function returns.
+ * This function MUST NOT be called from within the
+ * callback itself.
*
* @param h handle that was returned from #GNUNET_FS_start()
*/
void
GNUNET_FS_stop (struct GNUNET_FS_Handle *h)
{
- while (h->top_head != NULL)
+ while (NULL != h->top_head)
h->top_head->ssf (h->top_head->ssf_cls);
- if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+ if (NULL != h->queue_job)
GNUNET_SCHEDULER_cancel (h->queue_job);
GNUNET_free (h->client_name);
GNUNET_free (h);