This file is part of GNUnet.
Copyright (C) 2009, 2010 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
+ Affero General Public License for more details.
*/
/**
* @file fs/fs_publish.c
GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO);
pc->dsh = NULL;
}
- if (NULL != pc->client)
+ if (NULL != pc->mq)
{
- GNUNET_CLIENT_disconnect (pc->client);
- pc->client = NULL;
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
}
GNUNET_assert (NULL == pc->upload_task);
GNUNET_free (pc);
{
pt_size = GNUNET_MIN (max, p->data.dir.dir_size - offset);
dd = p->data.dir.dir_data;
- memcpy (buf, &dd[offset], pt_size);
+ GNUNET_memcpy (buf, &dd[offset], pt_size);
}
else
{
p = pc->fi_pos;
if (NULL == pc->dsh)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Waiting for datastore connection\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Waiting for datastore connection\n");
GNUNET_assert (NULL == pc->upload_task);
pc->upload_task =
GNUNET_SCHEDULER_add_with_priority
/**
- * Process the response (or lack thereof) from
- * the "fs" service to our 'start index' request.
+ * Check the response from the "fs" service to our 'start index'
+ * request.
+ *
+ * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
+ * @param msg the response we got
+ */
+static int
+check_index_start_failed (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ size_t msize = ntohs (msg->size) - sizeof (*msg);
+ const char *emsg = (const char *) &msg[1];
+
+ if (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0')
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process the response from the "fs" service to our 'start index'
+ * request.
*
* @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
* @param msg the response we got
*/
static void
-process_index_start_response (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_index_start_failed (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_FS_PublishContext *pc = cls;
struct GNUNET_FS_FileInformation *p;
- const char *emsg;
- uint16_t msize;
+ const char *emsg = (const char *) &msg[1];
- GNUNET_CLIENT_disconnect (pc->client);
- pc->client = NULL;
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
p = pc->fi_pos;
- if (NULL == msg)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Can not index file `%s': %s. Will try to insert instead.\n"),
- p->filename,
- _("timeout on index-start request to `fs' service"));
- p->data.file.do_index = GNUNET_NO;
- GNUNET_FS_file_information_sync_ (p);
- publish_content (pc);
- return;
- }
- if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Can not index file `%s': %s. Will try to insert instead.\n"),
+ p->filename,
+ gettext (emsg));
+ p->data.file.do_index = GNUNET_NO;
+ GNUNET_FS_file_information_sync_ (p);
+ publish_content (pc);
+}
+
+
+/**
+ * Process the response from the "fs" service to our 'start index'
+ * request.
+ *
+ * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
+ * @param msg the response we got
+ */
+static void
+handle_index_start_ok (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_FS_PublishContext *pc = cls;
+ struct GNUNET_FS_FileInformation *p;
+
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
+ p = pc->fi_pos;
+ p->data.file.index_start_confirmed = GNUNET_YES;
+ GNUNET_FS_file_information_sync_ (p);
+ publish_content (pc);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_FS_PublishContext *`
+ * @param error error code
+ */
+static void
+index_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_FS_PublishContext *pc = cls;
+ struct GNUNET_FS_FileInformation *p;
+
+ if (NULL != pc->mq)
{
- msize = ntohs (msg->size);
- emsg = (const char *) &msg[1];
- if ((msize <= sizeof (struct GNUNET_MessageHeader)) ||
- (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0'))
- emsg = gettext_noop ("unknown error");
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _
- ("Can not index file `%s': %s. Will try to insert instead.\n"),
- p->filename, gettext (emsg));
- p->data.file.do_index = GNUNET_NO;
- GNUNET_FS_file_information_sync_ (p);
- publish_content (pc);
- return;
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
}
- p->data.file.index_start_confirmed = GNUNET_YES;
- /* success! continue with indexing */
+ p = pc->fi_pos;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Can not index file `%s': %s. Will try to insert instead.\n"),
+ p->filename,
+ _("error on index-start request to `fs' service"));
+ p->data.file.do_index = GNUNET_NO;
GNUNET_FS_file_information_sync_ (p);
publish_content (pc);
}
const struct GNUNET_HashCode *res)
{
struct GNUNET_FS_PublishContext *pc = cls;
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_fixed_size (index_start_ok,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK,
+ struct GNUNET_MessageHeader,
+ pc),
+ GNUNET_MQ_hd_var_size (index_start_failed,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED,
+ struct GNUNET_MessageHeader,
+ pc),
+ GNUNET_MQ_handler_end ()
+ };
struct GNUNET_FS_FileInformation *p;
+ struct GNUNET_MQ_Envelope *env;
struct IndexStartMessage *ism;
size_t slen;
- struct GNUNET_CLIENT_Connection *client;
uint64_t dev;
uint64_t ino;
char *fn;
GNUNET_assert (fn != NULL);
slen = strlen (fn) + 1;
if (slen >=
- GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct IndexStartMessage))
+ GNUNET_MAX_MESSAGE_SIZE - sizeof (struct IndexStartMessage))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_
publish_content (pc);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Hash of indexed file `%s' is `%s'\n",
- p->filename, GNUNET_h2s (res));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Hash of indexed file `%s' is `%s'\n",
+ p->filename,
+ GNUNET_h2s (res));
if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY))
{
p->data.file.file_id = *res;
GNUNET_free (fn);
return;
}
- client = GNUNET_CLIENT_connect ("fs", pc->h->cfg);
- if (NULL == client)
+ pc->mq = GNUNET_CLIENT_connect (pc->h->cfg,
+ "fs",
+ handlers,
+ &index_mq_error_handler,
+ pc);
+ if (NULL == pc->mq)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Can not index file `%s': %s. Will try to insert instead.\n"),
p->data.file.have_hash = GNUNET_YES;
GNUNET_FS_file_information_sync_ (p);
}
- ism = GNUNET_malloc (sizeof (struct IndexStartMessage) + slen);
- ism->header.size = htons (sizeof (struct IndexStartMessage) + slen);
- ism->header.type = htons (GNUNET_MESSAGE_TYPE_FS_INDEX_START);
- if (GNUNET_OK == GNUNET_DISK_file_get_identifiers (p->filename, &dev, &ino))
+ env = GNUNET_MQ_msg_extra (ism,
+ slen,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START);
+ if (GNUNET_OK ==
+ GNUNET_DISK_file_get_identifiers (p->filename,
+ &dev,
+ &ino))
{
ism->device = GNUNET_htonll (dev);
ism->inode = GNUNET_htonll (ino);
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- _("Failed to get file identifiers for `%s'\n"), p->filename);
+ _("Failed to get file identifiers for `%s'\n"),
+ p->filename);
}
ism->file_id = *res;
- memcpy (&ism[1], fn, slen);
+ GNUNET_memcpy (&ism[1],
+ fn,
+ slen);
GNUNET_free (fn);
- pc->client = client;
- GNUNET_break (GNUNET_YES ==
- GNUNET_CLIENT_transmit_and_get_response (client, &ism->header,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &process_index_start_response,
- pc));
- GNUNET_free (ism);
+ GNUNET_MQ_send (pc->mq,
+ env);
}
p->chk_uri,
&p->bo,
pc->options,
- &publish_kblocks_cont, pc);
+ &publish_kblocks_cont,
+ pc);
}
else
{
/**
- * Process the response (or lack thereof) from
- * the "fs" service to our LOC sign request.
+ * Process the response from the "fs" service to our LOC sign request.
*
* @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
- * @param msg the response we got
+ * @param sig the response we got
*/
static void
-process_signature_response (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_signature_response (void *cls,
+ const struct ResponseLocSignatureMessage *sig)
{
struct GNUNET_FS_PublishContext *pc = cls;
- const struct ResponseLocSignatureMessage *sig;
struct GNUNET_FS_FileInformation *p;
p = pc->fi_pos;
- if (NULL == msg)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Can not create LOC URI. Will continue with CHK instead.\n"));
- publish_kblocks (pc);
- return;
- }
- if (sizeof (struct ResponseLocSignatureMessage) !=
- ntohs (msg->size))
- {
- GNUNET_break (0);
- publish_kblocks (pc);
- return;
- }
- sig = (const struct ResponseLocSignatureMessage *) msg;
p->chk_uri->type = GNUNET_FS_URI_LOC;
/* p->data.loc.fi kept from CHK before */
p->chk_uri->data.loc.peer = sig->peer;
- p->chk_uri->data.loc.expirationTime = GNUNET_TIME_absolute_ntoh (sig->expiration_time);
+ p->chk_uri->data.loc.expirationTime
+ = GNUNET_TIME_absolute_ntoh (sig->expiration_time);
p->chk_uri->data.loc.contentSignature = sig->signature;
GNUNET_FS_file_information_sync_ (p);
GNUNET_FS_publish_sync_ (pc);
}
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_FS_PublishContext *`
+ * @param error error code
+ */
+static void
+loc_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_FS_PublishContext *pc = cls;
+
+ if (NULL != pc->mq)
+ {
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Can not create LOC URI. Will continue with CHK instead.\n"));
+ publish_kblocks (pc);
+}
+
+
/**
* We're publishing without anonymity. Contact the FS service
* to create a signed LOC URI for further processing, then
static void
create_loc_uri (struct GNUNET_FS_PublishContext *pc)
{
- struct RequestLocSignatureMessage req;
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_fixed_size (signature_response,
+ GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGNATURE,
+ struct ResponseLocSignatureMessage,
+ pc),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
+ struct RequestLocSignatureMessage *req;
struct GNUNET_FS_FileInformation *p;
- if (NULL == pc->client)
- pc->client = GNUNET_CLIENT_connect ("fs", pc->h->cfg);
- if (NULL == pc->client)
+ if (NULL != pc->mq)
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = GNUNET_CLIENT_connect (pc->h->cfg,
+ "fs",
+ handlers,
+ &loc_mq_error_handler,
+ pc);
+ if (NULL == pc->mq)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Can not create LOC URI. Will continue with CHK instead.\n"));
return;
}
p = pc->fi_pos;
- req.header.size = htons (sizeof (struct RequestLocSignatureMessage));
- req.header.type = htons (GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN);
- req.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT);
- req.expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time);
- req.chk = p->chk_uri->data.chk.chk;
- req.file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CLIENT_transmit_and_get_response (pc->client,
- &req.header,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &process_signature_response,
- pc));
+ env = GNUNET_MQ_msg (req,
+ GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN);
+ req->purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT);
+ req->expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time);
+ req->chk = p->chk_uri->data.chk.chk;
+ req->file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length);
+ GNUNET_MQ_send (pc->mq,
+ env);
}
{
kc = GNUNET_FS_uri_ksk_get_keyword_count (*uri);
pc->reserve_entries += kc;
- pc->reserve_space += GNUNET_SERVER_MAX_MESSAGE_SIZE * kc;
+ pc->reserve_space += GNUNET_MAX_MESSAGE_SIZE * kc;
}
pi.status = GNUNET_FS_STATUS_PUBLISH_START;
*client_info = GNUNET_FS_publish_make_status_ (&pi, pc, fi, 0);