/*
This file is part of GNUnet.
- (C) 2001-2012 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2001-2012 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @file fs/fs_download.c
is_recursive_download (struct GNUNET_FS_DownloadContext *dc)
{
return (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_RECURSIVE)) &&
- ((GNUNET_YES == GNUNET_FS_meta_data_test_for_directory (dc->meta)) ||
- ((NULL == dc->meta) &&
- ((NULL == dc->filename) ||
- ((strlen (dc->filename) >= strlen (GNUNET_FS_DIRECTORY_EXT)) &&
- (NULL !=
- strstr (dc->filename + strlen (dc->filename) -
- strlen (GNUNET_FS_DIRECTORY_EXT),
- GNUNET_FS_DIRECTORY_EXT))))));
+ ( (GNUNET_YES ==
+ GNUNET_FS_meta_data_test_for_directory (dc->meta)) ||
+ ( (NULL == dc->meta) &&
+ ( (NULL == dc->filename) ||
+ ( (strlen (dc->filename) >= strlen (GNUNET_FS_DIRECTORY_EXT)) &&
+ (NULL !=
+ strstr (dc->filename + strlen (dc->filename) -
+ strlen (GNUNET_FS_DIRECTORY_EXT),
+ GNUNET_FS_DIRECTORY_EXT)) ) ) ) );
}
pi->value.download.anonymity = dc->anonymity;
pi->value.download.eta =
GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length);
- pi->value.download.is_active = (NULL == dc->client) ? GNUNET_NO : GNUNET_YES;
+ pi->value.download.is_active = (NULL == dc->mq) ? GNUNET_NO : GNUNET_YES;
pi->fsh = dc->h;
if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE))
dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi);
}
-/**
- * We're ready to transmit a search request to the
- * file-sharing service. Do it. If there is
- * more than one request pending, try to send
- * multiple or request another transmission.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-transmit_download_request (void *cls, size_t size, void *buf);
-
-
/**
* Closure for iterator processing results.
*/
* @param cls closure (our 'struct ProcessResultClosure')
* @param key query for the given value / request
* @param value value in the hash map (a 'struct DownloadRequest')
- * @return GNUNET_YES (we should continue to iterate); unless serious error
+ * @return #GNUNET_YES (we should continue to iterate); unless serious error
*/
static int
-process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
+process_result_with_request (void *cls,
+ const struct GNUNET_HashCode * key,
void *value);
* @param data contents of the file (or NULL if they were not inlined)
*/
static void
-trigger_recursive_download (void *cls, const char *filename,
+trigger_recursive_download (void *cls,
+ const char *filename,
const struct GNUNET_FS_Uri *uri,
const struct GNUNET_CONTAINER_MetaData *meta,
- size_t length, const void *data);
+ size_t length,
+ const void *data);
/**
if (size64 != (uint64_t) size)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- _
- ("Recursive downloads of directories larger than 4 GB are not supported on 32-bit systems\n"));
+ _("Recursive downloads of directories larger than 4 GB are not supported on 32-bit systems\n"));
return;
}
if (NULL != dc->filename)
{
- h = GNUNET_DISK_file_open (dc->filename, GNUNET_DISK_OPEN_READ,
+ h = GNUNET_DISK_file_open (dc->filename,
+ GNUNET_DISK_OPEN_READ,
GNUNET_DISK_PERM_NONE);
}
else
{
GNUNET_assert (NULL != dc->temp_filename);
- h = GNUNET_DISK_file_open (dc->temp_filename, GNUNET_DISK_OPEN_READ,
+ h = GNUNET_DISK_file_open (dc->temp_filename,
+ GNUNET_DISK_OPEN_READ,
GNUNET_DISK_PERM_NONE);
}
if (NULL == h)
return; /* oops */
- data = GNUNET_DISK_file_map (h, &m, GNUNET_DISK_MAP_TYPE_READ, size);
+ data = GNUNET_DISK_file_map (h,
+ &m,
+ GNUNET_DISK_MAP_TYPE_READ,
+ size);
if (NULL == data)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
}
else
{
- GNUNET_FS_directory_list_contents (size, data, 0,
- &trigger_recursive_download, dc);
+ if (GNUNET_OK !=
+ GNUNET_FS_directory_list_contents (size,
+ data,
+ 0,
+ &trigger_recursive_download,
+ dc))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Failed to access full directroy contents of `%s' for recursive download\n"),
+ dc->filename);
+ }
GNUNET_DISK_file_unmap (m);
}
GNUNET_DISK_file_close (h);
if (NULL == dc->filename)
{
if (0 != UNLINK (dc->temp_filename))
- GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, "unlink",
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
+ "unlink",
dc->temp_filename);
GNUNET_free (dc->temp_filename);
dc->temp_filename = NULL;
struct GNUNET_FS_DownloadContext *pos;
/* first, check if we need to download children */
- if ((NULL == dc->child_head) && (is_recursive_download (dc)))
+ if (is_recursive_download (dc))
full_recursive_download (dc);
/* then, check if children are done already */
for (pos = dc->child_head; NULL != pos; pos = pos->next)
{
- if ((pos->emsg == NULL) && (pos->completed < pos->length))
+ if ( (NULL == pos->emsg) &&
+ (pos->completed < pos->length) )
return; /* not done yet */
- if ((pos->child_head != NULL) && (pos->has_finished != GNUNET_YES))
+ if ( (NULL != pos->child_head) &&
+ (pos->has_finished != GNUNET_YES) )
return; /* not transitively done yet */
}
/* All of our children are done, so mark this download done */
}
GNUNET_CRYPTO_hash (&data[dr->offset], dlen, &in_chk.key);
GNUNET_CRYPTO_hash_to_aes_key (&in_chk.key, &sk, &iv);
- if (-1 == GNUNET_CRYPTO_symmetric_encrypt (&data[dr->offset], dlen, &sk, &iv, enc))
+ if (-1 == GNUNET_CRYPTO_symmetric_encrypt (&data[dr->offset],
+ dlen,
+ &sk,
+ &iv,
+ enc))
{
GNUNET_break (0);
return;
dr->state = BRS_RECONSTRUCT_META_UP;
break;
case BRS_CHK_SET:
- if (0 != memcmp (&in_chk, &dr->chk, sizeof (struct ContentHashKey)))
+ if (0 != memcmp (&in_chk,
+ &dr->chk,
+ sizeof (struct ContentHashKey)))
{
/* other peer provided bogus meta data */
GNUNET_break_op (0);
GNUNET_break_op (0);
return 1; /* bogus meta data */
}
- try_match_block (dc, dc->top_request, data, data_len);
+ try_match_block (dc,
+ dc->top_request,
+ data,
+ data_len);
return 1;
}
}
+/**
+ * Add entries to the message queue.
+ *
+ * @param cls our download context
+ * @param key unused
+ * @param entry entry of type `struct DownloadRequest`
+ * @return #GNUNET_OK
+ */
+static int
+retry_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *entry)
+{
+ struct GNUNET_FS_DownloadContext *dc = cls;
+ struct DownloadRequest *dr = entry;
+ struct SearchMessage *sm;
+ struct GNUNET_MQ_Envelope *env;
+
+ env = GNUNET_MQ_msg (sm,
+ GNUNET_MESSAGE_TYPE_FS_START_SEARCH);
+ if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY))
+ sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY);
+ else
+ sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE);
+ if (0 == dr->depth)
+ sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK);
+ else
+ sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK);
+ sm->anonymity_level = htonl (dc->anonymity);
+ sm->target = dc->target;
+ sm->query = dr->chk.query;
+ GNUNET_MQ_send (dc->mq,
+ env);
+ return GNUNET_OK;
+}
+
+
/**
* Schedule the download of the specified block in the tree.
*
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Scheduling download at offset %llu and depth %u for `%s'\n",
- (unsigned long long) dr->offset, dr->depth,
+ (unsigned long long) dr->offset,
+ dr->depth,
GNUNET_h2s (&dr->chk.query));
if (GNUNET_NO !=
- GNUNET_CONTAINER_multihashmap_contains_value (dc->active, &dr->chk.query,
+ GNUNET_CONTAINER_multihashmap_contains_value (dc->active,
+ &dr->chk.query,
dr))
return; /* already active */
- GNUNET_CONTAINER_multihashmap_put (dc->active, &dr->chk.query, dr,
+ GNUNET_CONTAINER_multihashmap_put (dc->active,
+ &dr->chk.query,
+ dr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- if (NULL == dc->client)
+ if (NULL == dc->mq)
return; /* download not active */
- GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr);
- dr->is_pending = GNUNET_YES;
- if (NULL == dc->th)
- dc->th =
- GNUNET_CLIENT_notify_transmit_ready (dc->client,
- sizeof (struct SearchMessage),
- GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- GNUNET_NO,
- &transmit_download_request, dc);
+ retry_entry (dc,
+ &dr->chk.query,
+ dr);
}
* @param data contents of the file (or NULL if they were not inlined)
*/
static void
-trigger_recursive_download (void *cls, const char *filename,
+trigger_recursive_download (void *cls,
+ const char *filename,
const struct GNUNET_FS_Uri *uri,
const struct GNUNET_CONTAINER_MetaData *meta,
- size_t length, const void *data)
+ size_t length,
+ const void *data)
{
struct GNUNET_FS_DownloadContext *dc = cls;
struct GNUNET_FS_DownloadContext *cpos;
(unsigned long long) GNUNET_FS_uri_chk_get_file_size (uri),
(unsigned int)
GNUNET_CONTAINER_meta_data_get_serialized_size (meta));
- GNUNET_FS_download_start (dc->h, uri, meta, full_name, temp_name, 0,
+ GNUNET_FS_download_start (dc->h,
+ uri,
+ meta,
+ full_name,
+ temp_name,
+ 0,
GNUNET_FS_uri_chk_get_file_size (uri),
- dc->anonymity, dc->options, NULL, dc);
+ dc->anonymity,
+ dc->options,
+ NULL,
+ dc);
GNUNET_free_non_null (full_name);
GNUNET_free_non_null (temp_name);
GNUNET_free_non_null (fn);
void
GNUNET_FS_free_download_request_ (struct DownloadRequest *dr)
{
- unsigned int i;
-
if (NULL == dr)
return;
- for (i = 0; i < dr->num_children; i++)
+ for (unsigned int i = 0; i < dr->num_children; i++)
GNUNET_FS_free_download_request_ (dr->children[i]);
GNUNET_free_non_null (dr->children);
GNUNET_free (dr);
* Iterator over entries in the pending requests in the 'active' map for the
* reply that we just got.
*
- * @param cls closure (our 'struct ProcessResultClosure')
+ * @param cls closure (our `struct ProcessResultClosure`)
* @param key query for the given value / request
- * @param value value in the hash map (a 'struct DownloadRequest')
+ * @param value value in the hash map (a `struct DownloadRequest`)
* @return #GNUNET_YES (we should continue to iterate); unless serious error
*/
static int
-process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
+process_result_with_request (void *cls,
+ const struct GNUNET_HashCode *key,
void *value)
{
struct ProcessResultClosure *prc = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n",
(unsigned int) prc->size,
- GNUNET_h2s (key), dr->depth, (unsigned long long) dr->offset,
+ GNUNET_h2s (key),
+ dr->depth,
+ (unsigned long long) dr->offset,
(unsigned long long) GNUNET_ntohll (dc->uri->data.
chk.file_length));
bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll
goto signal_error;
}
- (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &prc->query, dr);
- if (GNUNET_YES == dr->is_pending)
- {
- GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr);
- dr->is_pending = GNUNET_NO;
- }
-
- GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key, &skey, &iv);
- if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data, prc->size, &skey, &iv, pt))
+ (void) GNUNET_CONTAINER_multihashmap_remove (dc->active,
+ &prc->query,
+ dr);
+ GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key,
+ &skey,
+ &iv);
+ if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data,
+ prc->size,
+ &skey,
+ &iv,
+ pt))
{
GNUNET_break (0);
dc->emsg = GNUNET_strdup (_("internal error decrypting content"));
}
off =
compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length),
- dr->offset, dr->depth);
+ dr->offset,
+ dr->depth);
/* save to disk */
if ((GNUNET_YES == prc->do_store) &&
((NULL != dc->filename) || (is_recursive_download (dc))) &&
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Saving decrypted block to disk at offset %llu\n",
(unsigned long long) off);
- if ((off != GNUNET_DISK_file_seek (fh, off, GNUNET_DISK_SEEK_SET)))
+ if ((off != GNUNET_DISK_file_seek (fh,
+ off,
+ GNUNET_DISK_SEEK_SET)))
{
GNUNET_asprintf (&dc->emsg,
_("Failed to seek to offset %llu in file `%s': %s"),
- (unsigned long long) off, dc->filename,
+ (unsigned long long) off,
+ dc->filename,
STRERROR (errno));
goto signal_error;
}
if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size))
{
GNUNET_asprintf (&dc->emsg,
- _
- ("Failed to write block of %u bytes at offset %llu in file `%s': %s"),
- (unsigned int) prc->size, (unsigned long long) off,
- dc->filename, STRERROR (errno));
+ _("Failed to write block of %u bytes at offset %llu in file `%s': %s"),
+ (unsigned int) prc->size,
+ (unsigned long long) off,
+ dc->filename,
+ STRERROR (errno));
goto signal_error;
}
GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh));
pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR;
pi.value.download.specifics.error.message = dc->emsg;
GNUNET_FS_download_make_status_ (&pi, dc);
- /* abort all pending requests */
- if (NULL != dc->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
- dc->th = NULL;
- }
- GNUNET_CLIENT_disconnect (dc->client);
- dc->in_receive = GNUNET_NO;
- dc->client = NULL;
+ GNUNET_MQ_destroy (dc->mq);
+ dc->mq = NULL;
GNUNET_FS_free_download_request_ (dc->top_request);
dc->top_request = NULL;
GNUNET_CONTAINER_multihashmap_destroy (dc->active);
GNUNET_FS_dequeue_ (dc->job_queue);
dc->job_queue = NULL;
}
- dc->pending_head = NULL;
- dc->pending_tail = NULL;
GNUNET_FS_download_sync_ (dc);
return GNUNET_NO;
}
/**
- * Process a download result.
+ * Type of a function to call when we check the PUT message
+ * from the service.
*
- * @param dc our download context
- * @param type type of the result
- * @param respect_offered how much respect did we offer to get this reply?
- * @param num_transmissions how often did we transmit the query?
- * @param last_transmission when was this block requested the last time? (FOREVER if unknown/not applicable)
- * @param data the (encrypted) response
- * @param size size of data
+ * @param cls closure
+ * @param msg message received
*/
-static void
-process_result (struct GNUNET_FS_DownloadContext *dc,
- enum GNUNET_BLOCK_Type type,
- uint32_t respect_offered,
- uint32_t num_transmissions,
- struct GNUNET_TIME_Absolute last_transmission,
- const void *data, size_t size)
+static int
+check_put (void *cls,
+ const struct ClientPutMessage *cm)
{
- struct ProcessResultClosure prc;
-
- prc.dc = dc;
- prc.data = data;
- prc.last_transmission = last_transmission;
- prc.size = size;
- prc.type = type;
- prc.do_store = GNUNET_YES;
- prc.respect_offered = respect_offered;
- prc.num_transmissions = num_transmissions;
- GNUNET_CRYPTO_hash (data, size, &prc.query);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received result for query `%s' from `%s'-service\n",
- GNUNET_h2s (&prc.query), "FS");
- GNUNET_CONTAINER_multihashmap_get_multiple (dc->active, &prc.query,
- &process_result_with_request,
- &prc);
+ /* any varsize length is OK */
+ return GNUNET_OK;
}
* from the service.
*
* @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * @param msg message received
*/
static void
-receive_results (void *cls, const struct GNUNET_MessageHeader *msg)
+handle_put (void *cls,
+ const struct ClientPutMessage *cm)
{
struct GNUNET_FS_DownloadContext *dc = cls;
- const struct ClientPutMessage *cm;
- uint16_t msize;
+ uint16_t msize = ntohs (cm->header.size) - sizeof (*cm);
+ struct ProcessResultClosure prc;
- if ((NULL == msg) || (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_PUT) ||
- (sizeof (struct ClientPutMessage) > ntohs (msg->size)))
- {
- GNUNET_break (NULL == msg);
- try_reconnect (dc);
- return;
- }
- msize = ntohs (msg->size);
- cm = (const struct ClientPutMessage *) msg;
- process_result (dc, ntohl (cm->type),
- ntohl (cm->respect_offered),
- ntohl (cm->num_transmissions),
- GNUNET_TIME_absolute_ntoh (cm->last_transmission), &cm[1],
- msize - sizeof (struct ClientPutMessage));
- if (NULL == dc->client)
- return; /* fatal error */
- /* continue receiving */
- GNUNET_CLIENT_receive (dc->client, &receive_results, dc,
- GNUNET_TIME_UNIT_FOREVER_REL);
+ prc.dc = dc;
+ prc.data = &cm[1];
+ prc.last_transmission = GNUNET_TIME_absolute_ntoh (cm->last_transmission);
+ prc.size = msize;
+ prc.type = ntohl (cm->type);
+ prc.do_store = GNUNET_YES;
+ prc.respect_offered = ntohl (cm->respect_offered);
+ prc.num_transmissions = ntohl (cm->num_transmissions);
+ GNUNET_CRYPTO_hash (prc.data,
+ msize,
+ &prc.query);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received result for query `%s' from FS service\n",
+ GNUNET_h2s (&prc.query));
+ GNUNET_CONTAINER_multihashmap_get_multiple (dc->active,
+ &prc.query,
+ &process_result_with_request,
+ &prc);
}
/**
- * We're ready to transmit a search request to the
- * file-sharing service. Do it. If there is
- * more than one request pending, try to send
- * multiple or request another transmission.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
*
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param cls closure with the `struct GNUNET_FS_DownloadContext *`
+ * @param error error code
*/
-static size_t
-transmit_download_request (void *cls, size_t size, void *buf)
+static void
+download_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
{
struct GNUNET_FS_DownloadContext *dc = cls;
- size_t msize;
- struct SearchMessage *sm;
- struct DownloadRequest *dr;
- dc->th = NULL;
- if (NULL == buf)
+ if (NULL != dc->mq)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting download request failed, trying to reconnect\n");
- try_reconnect (dc);
- return 0;
+ GNUNET_MQ_destroy (dc->mq);
+ dc->mq = NULL;
}
- GNUNET_assert (size >= sizeof (struct SearchMessage));
- msize = 0;
- sm = buf;
- while ((NULL != (dr = dc->pending_head)) &&
- (size >= msize + sizeof (struct SearchMessage)))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting download request for `%s' to `%s'-service\n",
- GNUNET_h2s (&dr->chk.query), "FS");
- memset (sm, 0, sizeof (struct SearchMessage));
- sm->header.size = htons (sizeof (struct SearchMessage));
- sm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_START_SEARCH);
- if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY))
- sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY);
- else
- sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE);
- if (0 == dr->depth)
- sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK);
- else
- sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK);
- sm->anonymity_level = htonl (dc->anonymity);
- sm->target = dc->target;
- sm->query = dr->chk.query;
- GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr);
- dr->is_pending = GNUNET_NO;
- msize += sizeof (struct SearchMessage);
- sm++;
- }
- if (NULL != dc->pending_head)
- {
- dc->th =
- GNUNET_CLIENT_notify_transmit_ready (dc->client,
- sizeof (struct SearchMessage),
- GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- GNUNET_NO,
- &transmit_download_request, dc);
- GNUNET_assert (NULL != dc->th);
- }
- if (GNUNET_NO == dc->in_receive)
- {
- dc->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (dc->client, &receive_results, dc,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
- return msize;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting download request failed, trying to reconnect\n");
+ try_reconnect (dc);
}
* Reconnect to the FS service and transmit our queries NOW.
*
* @param cls our download context
- * @param tc unused
*/
static void
-do_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+do_reconnect (void *cls)
{
struct GNUNET_FS_DownloadContext *dc = cls;
- struct GNUNET_CLIENT_Connection *client;
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (put,
+ GNUNET_MESSAGE_TYPE_FS_PUT,
+ struct ClientPutMessage,
+ dc),
+ GNUNET_MQ_handler_end ()
+ };
dc->task = NULL;
- client = GNUNET_CLIENT_connect ("fs", dc->h->cfg);
- if (NULL == client)
+ dc->mq = GNUNET_CLIENT_connect (dc->h->cfg,
+ "fs",
+ handlers,
+ &download_mq_error_handler,
+ dc);
+ if (NULL == dc->mq)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Connecting to `%s'-service failed, will try again.\n", "FS");
try_reconnect (dc);
return;
}
- dc->client = client;
- if (NULL != dc->pending_head)
- {
- dc->th =
- GNUNET_CLIENT_notify_transmit_ready (client,
- sizeof (struct SearchMessage),
- GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- GNUNET_NO,
- &transmit_download_request, dc);
- GNUNET_assert (NULL != dc->th);
- }
-}
-
-
-/**
- * Add entries to the pending list.
- *
- * @param cls our download context
- * @param key unused
- * @param entry entry of type "struct DownloadRequest"
- * @return GNUNET_OK
- */
-static int
-retry_entry (void *cls, const struct GNUNET_HashCode * key, void *entry)
-{
- struct GNUNET_FS_DownloadContext *dc = cls;
- struct DownloadRequest *dr = entry;
-
- dr->next = NULL;
- dr->prev = NULL;
- GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr);
- dr->is_pending = GNUNET_YES;
- return GNUNET_OK;
+ GNUNET_CONTAINER_multihashmap_iterate (dc->active,
+ &retry_entry,
+ dc);
}
static void
try_reconnect (struct GNUNET_FS_DownloadContext *dc)
{
-
- if (NULL != dc->client)
+ if (NULL != dc->mq)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Moving all requests back to pending list\n");
- if (NULL != dc->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
- dc->th = NULL;
- }
- /* full reset of the pending list */
- dc->pending_head = NULL;
- dc->pending_tail = NULL;
- GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc);
- GNUNET_CLIENT_disconnect (dc->client);
- dc->in_receive = GNUNET_NO;
- dc->client = NULL;
+ GNUNET_MQ_destroy (dc->mq);
+ dc->mq = NULL;
}
if (0 == dc->reconnect_backoff.rel_value_us)
dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
else
dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Will try to reconnect in %s\n",
- GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff, GNUNET_YES));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Will try to reconnect in %s\n",
+ GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff,
+ GNUNET_YES));
+ GNUNET_break (NULL != dc->job_queue);
dc->task =
GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff,
&do_reconnect,
* We're allowed to ask the FS service for our blocks. Start the download.
*
* @param cls the 'struct GNUNET_FS_DownloadContext'
- * @param client handle to use for communcation with FS (we must destroy it!)
+ * @param mq handle to use for communcation with FS (we must destroy it!)
*/
static void
-activate_fs_download (void *cls, struct GNUNET_CLIENT_Connection *client)
+activate_fs_download (void *cls)
{
struct GNUNET_FS_DownloadContext *dc = cls;
struct GNUNET_FS_ProgressInfo pi;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download activated\n");
- GNUNET_assert (NULL != client);
- GNUNET_assert (NULL == dc->client);
- GNUNET_assert (NULL == dc->th);
+ GNUNET_assert (NULL == dc->mq);
GNUNET_assert (NULL != dc->active);
- dc->client = client;
+ do_reconnect (dc);
+ if (NULL != dc->mq)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Download activated\n");
pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
- GNUNET_FS_download_make_status_ (&pi, dc);
- dc->pending_head = NULL;
- dc->pending_tail = NULL;
- GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asking for transmission to FS service\n");
- if (NULL != dc->pending_head)
- {
- dc->th =
- GNUNET_CLIENT_notify_transmit_ready (dc->client,
- sizeof (struct SearchMessage),
- GNUNET_CONSTANTS_SERVICE_TIMEOUT,
- GNUNET_NO,
- &transmit_download_request, dc);
- GNUNET_assert (NULL != dc->th);
- }
+ GNUNET_FS_download_make_status_ (&pi,
+ dc);
}
struct GNUNET_FS_DownloadContext *dc = cls;
struct GNUNET_FS_ProgressInfo pi;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download deactivated\n");
- if (NULL != dc->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
- dc->th = NULL;
- }
- if (NULL != dc->client)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Download deactivated\n");
+ if (NULL != dc->mq)
{
- GNUNET_CLIENT_disconnect (dc->client);
- dc->in_receive = GNUNET_NO;
- dc->client = NULL;
+ GNUNET_MQ_destroy (dc->mq);
+ dc->mq = NULL;
}
- dc->pending_head = NULL;
- dc->pending_tail = NULL;
pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE;
- GNUNET_FS_download_make_status_ (&pi, dc);
+ GNUNET_FS_download_make_status_ (&pi,
+ dc);
}
create_download_request (struct DownloadRequest *parent,
unsigned int chk_idx,
unsigned int depth,
- uint64_t dr_offset, uint64_t file_start_offset,
+ uint64_t dr_offset,
+ uint64_t file_start_offset,
uint64_t desired_length)
{
struct DownloadRequest *dr;
GNUNET_assert (dr->num_children > 0);
dr->children =
- GNUNET_malloc (dr->num_children * sizeof (struct DownloadRequest *));
+ GNUNET_new_array (dr->num_children,
+ struct DownloadRequest *);
for (i = 0; i < dr->num_children; i++)
{
dr->children[i] =
- create_download_request (dr, i + head_skip, depth - 1,
+ create_download_request (dr,
+ i + head_skip,
+ depth - 1,
dr_offset + (i + head_skip) * child_block_size,
- file_start_offset, desired_length);
+ file_start_offset,
+ desired_length);
}
return dr;
}
* the current IBlock from the existing file.
*
* @param cls the 'struct ReconstructContext'
- * @param tc scheduler context
*/
static void
-reconstruct_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+reconstruct_cont (void *cls)
{
struct GNUNET_FS_DownloadContext *dc = cls;
/* clean up state from tree encoder */
- if (dc->task != NULL)
+ if (NULL != dc->task)
{
GNUNET_SCHEDULER_cancel (dc->task);
dc->task = NULL;
* Task requesting the next block from the tree encoder.
*
* @param cls the 'struct GNUJNET_FS_DownloadContext' we're processing
- * @param tc task context
*/
static void
-get_next_block (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+get_next_block (void *cls)
{
struct GNUNET_FS_DownloadContext *dc = cls;
* @param block_size size of block (in bytes)
*/
static void
-reconstruct_cb (void *cls, const struct ContentHashKey *chk, uint64_t offset,
- unsigned int depth, enum GNUNET_BLOCK_Type type,
- const void *block, uint16_t block_size)
+reconstruct_cb (void *cls,
+ const struct ContentHashKey *chk,
+ uint64_t offset,
+ unsigned int depth,
+ enum GNUNET_BLOCK_Type type,
+ const void *block,
+ uint16_t block_size)
{
struct GNUNET_FS_DownloadContext *dc = cls;
struct GNUNET_FS_ProgressInfo pi;
"Block %u < %u irrelevant for our range\n",
chld,
dr->children[0]->chk_idx);
- dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc);
+ dc->task = GNUNET_SCHEDULER_add_now (&get_next_block,
+ dc);
return; /* irrelevant block */
}
if (chld > dr->children[dr->num_children-1]->chk_idx)
/* block matches, hence tree below matches;
* this request is done! */
dr->state = BRS_DOWNLOAD_UP;
- (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &dr->chk.query, dr);
- if (GNUNET_YES == dr->is_pending)
- {
- GNUNET_break (0); /* how did we get here? */
- GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr);
- dr->is_pending = GNUNET_NO;
- }
+ (void) GNUNET_CONTAINER_multihashmap_remove (dc->active,
+ &dr->chk.query,
+ dr);
/* calculate how many bytes of payload this block
* corresponds to */
blen = GNUNET_FS_tree_compute_tree_size (dr->depth);
GNUNET_assert (0);
break;
}
- dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc);
- if ((dr == dc->top_request) && (dr->state == BRS_DOWNLOAD_UP))
+ dc->task = GNUNET_SCHEDULER_add_now (&get_next_block,
+ dc);
+ if ( (dr == dc->top_request) &&
+ (dr->state == BRS_DOWNLOAD_UP) )
check_completed (dc);
}
* request for the file.
*
* @param cls the 'struct GNUNET_FS_DownloadContext'
- * @param tc scheduler context
*/
void
-GNUNET_FS_download_start_task_ (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+GNUNET_FS_download_start_task_ (void *cls)
{
struct GNUNET_FS_DownloadContext *dc = cls;
struct GNUNET_FS_ProgressInfo pi;
struct GNUNET_DISK_FileHandle *fh;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Start task running...\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Start task running...\n");
dc->task = NULL;
if (0 == dc->length)
{
dc->te =
GNUNET_FS_tree_encoder_create (dc->h,
GNUNET_FS_uri_chk_get_file_size (dc->uri),
- dc, &fh_reader,
- &reconstruct_cb, NULL,
+ dc,
+ &fh_reader,
+ &reconstruct_cb,
+ NULL,
&reconstruct_cont);
- dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc);
+ dc->task = GNUNET_SCHEDULER_add_now (&get_next_block,
+ dc);
}
else
{
create_download_context (struct GNUNET_FS_Handle *h,
const struct GNUNET_FS_Uri *uri,
const struct GNUNET_CONTAINER_MetaData *meta,
- const char *filename, const char *tempname,
- uint64_t offset, uint64_t length, uint32_t anonymity,
- enum GNUNET_FS_DownloadOptions options, void *cctx)
+ const char *filename,
+ const char *tempname,
+ uint64_t offset,
+ uint64_t length,
+ uint32_t anonymity,
+ enum GNUNET_FS_DownloadOptions options,
+ void *cctx)
{
struct GNUNET_FS_DownloadContext *dc;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting download %p, %u bytes at offset %llu\n",
dc,
- (unsigned long long) length,
+ (unsigned int) length,
(unsigned long long) offset);
dc->h = h;
dc->uri = GNUNET_FS_uri_dup (uri);
filename,
(unsigned long long) length,
dc->treedepth);
- dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, dc);
+ GNUNET_assert (NULL == dc->job_queue);
+ dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_,
+ dc);
return dc;
}
GNUNET_FS_download_start (struct GNUNET_FS_Handle *h,
const struct GNUNET_FS_Uri *uri,
const struct GNUNET_CONTAINER_MetaData *meta,
- const char *filename, const char *tempname,
- uint64_t offset, uint64_t length, uint32_t anonymity,
- enum GNUNET_FS_DownloadOptions options, void *cctx,
+ const char *filename,
+ const char *tempname,
+ uint64_t offset,
+ uint64_t length,
+ uint32_t anonymity,
+ enum GNUNET_FS_DownloadOptions options,
+ void *cctx,
struct GNUNET_FS_DownloadContext *parent)
{
struct GNUNET_FS_DownloadContext *dc;
- dc = create_download_context (h, uri, meta, filename, tempname,
- offset, length, anonymity, options, cctx);
+ dc = create_download_context (h,
+ uri,
+ meta,
+ filename,
+ tempname,
+ offset,
+ length,
+ anonymity,
+ options,
+ cctx);
if (NULL == dc)
return NULL;
dc->parent = parent;
GNUNET_FS_download_start_from_search (struct GNUNET_FS_Handle *h,
struct GNUNET_FS_SearchResult *sr,
const char *filename,
- const char *tempname, uint64_t offset,
- uint64_t length, uint32_t anonymity,
+ const char *tempname,
+ uint64_t offset,
+ uint64_t length,
+ uint32_t anonymity,
enum GNUNET_FS_DownloadOptions options,
void *cctx)
{
GNUNET_break (0);
return NULL;
}
- dc = create_download_context (h, sr->uri, sr->meta, filename, tempname,
- offset, length, anonymity, options, cctx);
+ dc = create_download_context (h,
+ sr->uri,
+ sr->meta,
+ filename,
+ tempname,
+ offset,
+ length,
+ anonymity,
+ options,
+ cctx);
if (NULL == dc)
return NULL;
dc->search = sr;
{
if (dc->completed == dc->length)
return;
+ if (NULL != dc->mq)
+ return; /* already running */
GNUNET_assert (NULL == dc->job_queue);
+ GNUNET_assert (NULL == dc->task);
GNUNET_assert (NULL != dc->active);
- dc->job_queue =
- GNUNET_FS_queue_ (dc->h, &activate_fs_download, &deactivate_fs_download,
- dc, (dc->length + DBLOCK_SIZE - 1) / DBLOCK_SIZE,
+ dc->job_queue
+ = GNUNET_FS_queue_ (dc->h,
+ &activate_fs_download,
+ &deactivate_fs_download,
+ dc,
+ (dc->length + DBLOCK_SIZE - 1) / DBLOCK_SIZE,
(0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE))
? GNUNET_FS_QUEUE_PRIORITY_NORMAL
: GNUNET_FS_QUEUE_PRIORITY_PROBE);
void
GNUNET_FS_download_suspend (struct GNUNET_FS_DownloadContext *dc)
{
- deactivate_fs_download(dc);
+ deactivate_fs_download(dc);
}
+
/**
* Resume a suspended download.
*
void
GNUNET_FS_download_resume (struct GNUNET_FS_DownloadContext *dc)
{
- struct GNUNET_FS_ProgressInfo pi;
+ struct GNUNET_FS_ProgressInfo pi;
- pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
- GNUNET_FS_download_make_status_ (&pi, dc);
+ pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
+ GNUNET_FS_download_make_status_ (&pi, dc);
- dc->job_queue =
- GNUNET_FS_queue_ (dc->h, &activate_fs_download, &deactivate_fs_download,
+ GNUNET_assert (NULL == dc->task);
+ dc->job_queue
+ = GNUNET_FS_queue_ (dc->h,
+ &activate_fs_download,
+ &deactivate_fs_download,
dc, (dc->length + DBLOCK_SIZE - 1) / DBLOCK_SIZE,
- (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE))
- ? GNUNET_FS_QUEUE_PRIORITY_NORMAL
- : GNUNET_FS_QUEUE_PRIORITY_PROBE);
+ (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE))
+ ? GNUNET_FS_QUEUE_PRIORITY_NORMAL
+ : GNUNET_FS_QUEUE_PRIORITY_PROBE);
}
* @param do_delete delete files of incomplete downloads
*/
void
-GNUNET_FS_download_stop (struct GNUNET_FS_DownloadContext *dc, int do_delete)
+GNUNET_FS_download_stop (struct GNUNET_FS_DownloadContext *dc,
+ int do_delete)
{
struct GNUNET_FS_ProgressInfo pi;
int have_children;
while (NULL != dc->child_head)
GNUNET_FS_download_stop (dc->child_head, do_delete);
if (NULL != dc->parent)
- GNUNET_CONTAINER_DLL_remove (dc->parent->child_head, dc->parent->child_tail,
+ GNUNET_CONTAINER_DLL_remove (dc->parent->child_head,
+ dc->parent->child_tail,
dc);
if (NULL != dc->serialization)
GNUNET_FS_remove_sync_file_ (dc->h,
{
if ( (0 != UNLINK (dc->filename)) &&
(ENOENT != errno) )
- GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING, "unlink",
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
+ "unlink",
dc->filename);
}
GNUNET_free (dc->filename);
if (NULL != dc->temp_filename)
{
if (0 != UNLINK (dc->temp_filename))
- GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR, "unlink",
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
+ "unlink",
dc->temp_filename);
GNUNET_free (dc->temp_filename);
}