GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO);
pc->dsh = NULL;
}
- if (NULL != pc->client)
+ if (NULL != pc->mq)
{
- GNUNET_CLIENT_disconnect (pc->client);
- pc->client = NULL;
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
}
GNUNET_assert (NULL == pc->upload_task);
GNUNET_free (pc);
p = pc->fi_pos;
if (NULL == pc->dsh)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Waiting for datastore connection\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Waiting for datastore connection\n");
GNUNET_assert (NULL == pc->upload_task);
pc->upload_task =
GNUNET_SCHEDULER_add_with_priority
/**
- * Process the response (or lack thereof) from
- * the "fs" service to our 'start index' request.
+ * Check the response from the "fs" service to our 'start index'
+ * request.
+ *
+ * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
+ * @param msg the response we got
+ */
+static int
+check_index_start_failed (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ size_t msize = ntohs (msg->size) - sizeof (*msg);
+ const char *emsg = (const char *) &msg[1];
+
+ if (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0')
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process the response from the "fs" service to our 'start index'
+ * request.
*
* @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
* @param msg the response we got
*/
static void
-process_index_start_response (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_index_start_failed (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_FS_PublishContext *pc = cls;
struct GNUNET_FS_FileInformation *p;
- const char *emsg;
- uint16_t msize;
+ const char *emsg = (const char *) &msg[1];
- GNUNET_CLIENT_disconnect (pc->client);
- pc->client = NULL;
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
p = pc->fi_pos;
- if (NULL == msg)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Can not index file `%s': %s. Will try to insert instead.\n"),
- p->filename,
- _("timeout on index-start request to `fs' service"));
- p->data.file.do_index = GNUNET_NO;
- GNUNET_FS_file_information_sync_ (p);
- publish_content (pc);
- return;
- }
- if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Can not index file `%s': %s. Will try to insert instead.\n"),
+ p->filename,
+ gettext (emsg));
+ p->data.file.do_index = GNUNET_NO;
+ GNUNET_FS_file_information_sync_ (p);
+ publish_content (pc);
+}
+
+
+/**
+ * Process the response from the "fs" service to our 'start index'
+ * request.
+ *
+ * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
+ * @param msg the response we got
+ */
+static void
+handle_index_start_ok (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_FS_PublishContext *pc = cls;
+ struct GNUNET_FS_FileInformation *p;
+
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
+ p = pc->fi_pos;
+ p->data.file.index_start_confirmed = GNUNET_YES;
+ GNUNET_FS_file_information_sync_ (p);
+ publish_content (pc);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_FS_PublishContext *`
+ * @param error error code
+ */
+static void
+index_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_FS_PublishContext *pc = cls;
+ struct GNUNET_FS_FileInformation *p;
+
+ if (NULL != pc->mq)
{
- msize = ntohs (msg->size);
- emsg = (const char *) &msg[1];
- if ((msize <= sizeof (struct GNUNET_MessageHeader)) ||
- (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0'))
- emsg = gettext_noop ("unknown error");
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _
- ("Can not index file `%s': %s. Will try to insert instead.\n"),
- p->filename, gettext (emsg));
- p->data.file.do_index = GNUNET_NO;
- GNUNET_FS_file_information_sync_ (p);
- publish_content (pc);
- return;
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
}
- p->data.file.index_start_confirmed = GNUNET_YES;
- /* success! continue with indexing */
+ p = pc->fi_pos;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Can not index file `%s': %s. Will try to insert instead.\n"),
+ p->filename,
+ _("error on index-start request to `fs' service"));
+ p->data.file.do_index = GNUNET_NO;
GNUNET_FS_file_information_sync_ (p);
publish_content (pc);
}
hash_for_index_cb (void *cls,
const struct GNUNET_HashCode *res)
{
+ GNUNET_MQ_hd_fixed_size (index_start_ok,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK,
+ struct GNUNET_MessageHeader);
+ GNUNET_MQ_hd_var_size (index_start_failed,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED,
+ struct GNUNET_MessageHeader);
struct GNUNET_FS_PublishContext *pc = cls;
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_index_start_ok_handler (pc),
+ make_index_start_failed_handler (pc),
+ GNUNET_MQ_handler_end ()
+ };
struct GNUNET_FS_FileInformation *p;
+ struct GNUNET_MQ_Envelope *env;
struct IndexStartMessage *ism;
size_t slen;
- struct GNUNET_CLIENT_Connection *client;
uint64_t dev;
uint64_t ino;
char *fn;
publish_content (pc);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Hash of indexed file `%s' is `%s'\n",
- p->filename, GNUNET_h2s (res));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Hash of indexed file `%s' is `%s'\n",
+ p->filename,
+ GNUNET_h2s (res));
if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY))
{
p->data.file.file_id = *res;
GNUNET_free (fn);
return;
}
- client = GNUNET_CLIENT_connect ("fs", pc->h->cfg);
- if (NULL == client)
+ pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg,
+ "fs",
+ handlers,
+ &index_mq_error_handler,
+ pc);
+ if (NULL == pc->mq)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Can not index file `%s': %s. Will try to insert instead.\n"),
p->data.file.have_hash = GNUNET_YES;
GNUNET_FS_file_information_sync_ (p);
}
- ism = GNUNET_malloc (sizeof (struct IndexStartMessage) + slen);
- ism->header.size = htons (sizeof (struct IndexStartMessage) + slen);
- ism->header.type = htons (GNUNET_MESSAGE_TYPE_FS_INDEX_START);
- if (GNUNET_OK == GNUNET_DISK_file_get_identifiers (p->filename, &dev, &ino))
+ env = GNUNET_MQ_msg_extra (ism,
+ slen,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START);
+ if (GNUNET_OK ==
+ GNUNET_DISK_file_get_identifiers (p->filename,
+ &dev,
+ &ino))
{
ism->device = GNUNET_htonll (dev);
ism->inode = GNUNET_htonll (ino);
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- _("Failed to get file identifiers for `%s'\n"), p->filename);
+ _("Failed to get file identifiers for `%s'\n"),
+ p->filename);
}
ism->file_id = *res;
- memcpy (&ism[1], fn, slen);
+ memcpy (&ism[1],
+ fn,
+ slen);
GNUNET_free (fn);
- pc->client = client;
- GNUNET_break (GNUNET_YES ==
- GNUNET_CLIENT_transmit_and_get_response (client, &ism->header,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &process_index_start_response,
- pc));
- GNUNET_free (ism);
+ GNUNET_MQ_send (pc->mq,
+ env);
}
p->chk_uri,
&p->bo,
pc->options,
- &publish_kblocks_cont, pc);
+ &publish_kblocks_cont,
+ pc);
}
else
{
/**
- * Process the response (or lack thereof) from
- * the "fs" service to our LOC sign request.
+ * Process the response from the "fs" service to our LOC sign request.
*
* @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
- * @param msg the response we got
+ * @param sig the response we got
*/
static void
-process_signature_response (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_signature_response (void *cls,
+ const struct ResponseLocSignatureMessage *sig)
{
struct GNUNET_FS_PublishContext *pc = cls;
- const struct ResponseLocSignatureMessage *sig;
struct GNUNET_FS_FileInformation *p;
p = pc->fi_pos;
- if (NULL == msg)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Can not create LOC URI. Will continue with CHK instead.\n"));
- publish_kblocks (pc);
- return;
- }
- if (sizeof (struct ResponseLocSignatureMessage) !=
- ntohs (msg->size))
- {
- GNUNET_break (0);
- publish_kblocks (pc);
- return;
- }
- sig = (const struct ResponseLocSignatureMessage *) msg;
p->chk_uri->type = GNUNET_FS_URI_LOC;
/* p->data.loc.fi kept from CHK before */
p->chk_uri->data.loc.peer = sig->peer;
- p->chk_uri->data.loc.expirationTime = GNUNET_TIME_absolute_ntoh (sig->expiration_time);
+ p->chk_uri->data.loc.expirationTime
+ = GNUNET_TIME_absolute_ntoh (sig->expiration_time);
p->chk_uri->data.loc.contentSignature = sig->signature;
GNUNET_FS_file_information_sync_ (p);
GNUNET_FS_publish_sync_ (pc);
}
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_FS_PublishContext *`
+ * @param error error code
+ */
+static void
+loc_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_FS_PublishContext *pc = cls;
+
+ if (NULL != pc->mq)
+ {
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Can not create LOC URI. Will continue with CHK instead.\n"));
+ publish_kblocks (pc);
+}
+
+
/**
* We're publishing without anonymity. Contact the FS service
* to create a signed LOC URI for further processing, then
static void
create_loc_uri (struct GNUNET_FS_PublishContext *pc)
{
- struct RequestLocSignatureMessage req;
+ GNUNET_MQ_hd_fixed_size (signature_response,
+ GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGNATURE,
+ struct ResponseLocSignatureMessage);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_signature_response_handler (pc),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
+ struct RequestLocSignatureMessage *req;
struct GNUNET_FS_FileInformation *p;
- if (NULL == pc->client)
- pc->client = GNUNET_CLIENT_connect ("fs", pc->h->cfg);
- if (NULL == pc->client)
+ if (NULL != pc->mq)
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg,
+ "fs",
+ handlers,
+ &loc_mq_error_handler,
+ pc);
+ if (NULL == pc->mq)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Can not create LOC URI. Will continue with CHK instead.\n"));
return;
}
p = pc->fi_pos;
- req.header.size = htons (sizeof (struct RequestLocSignatureMessage));
- req.header.type = htons (GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN);
- req.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT);
- req.expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time);
- req.chk = p->chk_uri->data.chk.chk;
- req.file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CLIENT_transmit_and_get_response (pc->client,
- &req.header,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &process_signature_response,
- pc));
+ env = GNUNET_MQ_msg (req,
+ GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN);
+ req->purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT);
+ req->expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time);
+ req->chk = p->chk_uri->data.chk.chk;
+ req->file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length);
+ GNUNET_MQ_send (pc->mq,
+ env);
}