convert fs publish to MQ
authorChristian Grothoff <christian@grothoff.org>
Sun, 3 Jul 2016 13:18:48 +0000 (13:18 +0000)
committerChristian Grothoff <christian@grothoff.org>
Sun, 3 Jul 2016 13:18:48 +0000 (13:18 +0000)
src/fs/fs_api.h
src/fs/fs_publish.c

index 126f5902e0438790cf4f387111d59a8bcd112479..e85de94a79557e38f039f532453bd9576c4c0660 100644 (file)
@@ -1202,11 +1202,6 @@ struct GNUNET_FS_PublishContext
    */
   struct GNUNET_FS_Handle *h;
 
    */
   struct GNUNET_FS_Handle *h;
 
-  /**
-   * Connection to FS service (only used for LOC URI signing).
-   */
-  struct GNUNET_CLIENT_Connection *fs_client;
-
   /**
    * Our top-level activity entry (if we are top-level, otherwise NULL).
    */
   /**
    * Our top-level activity entry (if we are top-level, otherwise NULL).
    */
@@ -1242,7 +1237,7 @@ struct GNUNET_FS_PublishContext
    * Our own message queue for the FS service; only briefly used when
    * we start to index a file, otherwise NULL.
    */
    * Our own message queue for the FS service; only briefly used when
    * we start to index a file, otherwise NULL.
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Current position in the file-tree for the upload.
 
   /**
    * Current position in the file-tree for the upload.
index 89cc2714c99bed46885ce5ac9d10449bf5b21e6f..7cf8b481571d90b59a37b70b2fb634770f88f9cb 100644 (file)
@@ -92,10 +92,10 @@ publish_cleanup (struct GNUNET_FS_PublishContext *pc)
     GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO);
     pc->dsh = NULL;
   }
     GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO);
     pc->dsh = NULL;
   }
-  if (NULL != pc->client)
+  if (NULL != pc->mq)
   {
   {
-    GNUNET_CLIENT_disconnect (pc->client);
-    pc->client = NULL;
+    GNUNET_MQ_destroy (pc->mq);
+    pc->mq = NULL;
   }
   GNUNET_assert (NULL == pc->upload_task);
   GNUNET_free (pc);
   }
   GNUNET_assert (NULL == pc->upload_task);
   GNUNET_free (pc);
@@ -493,7 +493,8 @@ block_proc (void *cls,
   p = pc->fi_pos;
   if (NULL == pc->dsh)
   {
   p = pc->fi_pos;
   if (NULL == pc->dsh)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Waiting for datastore connection\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Waiting for datastore connection\n");
     GNUNET_assert (NULL == pc->upload_task);
     pc->upload_task =
         GNUNET_SCHEDULER_add_with_priority
     GNUNET_assert (NULL == pc->upload_task);
     pc->upload_task =
         GNUNET_SCHEDULER_add_with_priority
@@ -679,53 +680,105 @@ publish_content (struct GNUNET_FS_PublishContext *pc)
 
 
 /**
 
 
 /**
- * Process the response (or lack thereof) from
- * the "fs" service to our 'start index' request.
+ * Check the response from the "fs" service to our 'start index'
+ * request.
+ *
+ * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
+ * @param msg the response we got
+ */
+static int
+check_index_start_failed (void *cls,
+                          const struct GNUNET_MessageHeader *msg)
+{
+  size_t msize = ntohs (msg->size) - sizeof (*msg);
+  const char *emsg = (const char *) &msg[1];
+
+  if (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0')
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Process the response from the "fs" service to our 'start index'
+ * request.
  *
  * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
  * @param msg the response we got
  */
 static void
  *
  * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
  * @param msg the response we got
  */
 static void
-process_index_start_response (void *cls,
-                             const struct GNUNET_MessageHeader *msg)
+handle_index_start_failed (void *cls,
+                           const struct GNUNET_MessageHeader *msg)
 {
   struct GNUNET_FS_PublishContext *pc = cls;
   struct GNUNET_FS_FileInformation *p;
 {
   struct GNUNET_FS_PublishContext *pc = cls;
   struct GNUNET_FS_FileInformation *p;
-  const char *emsg;
-  uint16_t msize;
+  const char *emsg = (const char *) &msg[1];
 
 
-  GNUNET_CLIENT_disconnect (pc->client);
-  pc->client = NULL;
+  GNUNET_MQ_destroy (pc->mq);
+  pc->mq = NULL;
   p = pc->fi_pos;
   p = pc->fi_pos;
-  if (NULL == msg)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                _("Can not index file `%s': %s.  Will try to insert instead.\n"),
-                p->filename,
-                _("timeout on index-start request to `fs' service"));
-    p->data.file.do_index = GNUNET_NO;
-    GNUNET_FS_file_information_sync_ (p);
-    publish_content (pc);
-    return;
-  }
-  if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK)
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              _("Can not index file `%s': %s.  Will try to insert instead.\n"),
+              p->filename,
+              gettext (emsg));
+  p->data.file.do_index = GNUNET_NO;
+  GNUNET_FS_file_information_sync_ (p);
+  publish_content (pc);
+}
+
+
+/**
+ * Process the response from the "fs" service to our 'start index'
+ * request.
+ *
+ * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
+ * @param msg the response we got
+ */
+static void
+handle_index_start_ok (void *cls,
+                       const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_FS_PublishContext *pc = cls;
+  struct GNUNET_FS_FileInformation *p;
+
+  GNUNET_MQ_destroy (pc->mq);
+  pc->mq = NULL;
+  p = pc->fi_pos;
+  p->data.file.index_start_confirmed = GNUNET_YES;
+  GNUNET_FS_file_information_sync_ (p);
+  publish_content (pc);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_FS_PublishContext *`
+ * @param error error code
+ */
+static void
+index_mq_error_handler (void *cls,
+                        enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_FS_PublishContext *pc = cls;
+  struct GNUNET_FS_FileInformation *p;
+
+  if (NULL != pc->mq)
   {
   {
-    msize = ntohs (msg->size);
-    emsg = (const char *) &msg[1];
-    if ((msize <= sizeof (struct GNUNET_MessageHeader)) ||
-        (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0'))
-      emsg = gettext_noop ("unknown error");
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                _
-                ("Can not index file `%s': %s.  Will try to insert instead.\n"),
-                p->filename, gettext (emsg));
-    p->data.file.do_index = GNUNET_NO;
-    GNUNET_FS_file_information_sync_ (p);
-    publish_content (pc);
-    return;
+    GNUNET_MQ_destroy (pc->mq);
+    pc->mq = NULL;
   }
   }
-  p->data.file.index_start_confirmed = GNUNET_YES;
-  /* success! continue with indexing */
+  p = pc->fi_pos;
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              _("Can not index file `%s': %s.  Will try to insert instead.\n"),
+              p->filename,
+              _("error on index-start request to `fs' service"));
+  p->data.file.do_index = GNUNET_NO;
   GNUNET_FS_file_information_sync_ (p);
   publish_content (pc);
 }
   GNUNET_FS_file_information_sync_ (p);
   publish_content (pc);
 }
@@ -742,11 +795,22 @@ static void
 hash_for_index_cb (void *cls,
                   const struct GNUNET_HashCode *res)
 {
 hash_for_index_cb (void *cls,
                   const struct GNUNET_HashCode *res)
 {
+  GNUNET_MQ_hd_fixed_size (index_start_ok,
+                           GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK,
+                           struct GNUNET_MessageHeader);
+  GNUNET_MQ_hd_var_size (index_start_failed,
+                         GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED,
+                         struct GNUNET_MessageHeader);
   struct GNUNET_FS_PublishContext *pc = cls;
   struct GNUNET_FS_PublishContext *pc = cls;
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_index_start_ok_handler (pc),
+    make_index_start_failed_handler (pc),
+    GNUNET_MQ_handler_end ()
+  };
   struct GNUNET_FS_FileInformation *p;
   struct GNUNET_FS_FileInformation *p;
+  struct GNUNET_MQ_Envelope *env;
   struct IndexStartMessage *ism;
   size_t slen;
   struct IndexStartMessage *ism;
   size_t slen;
-  struct GNUNET_CLIENT_Connection *client;
   uint64_t dev;
   uint64_t ino;
   char *fn;
   uint64_t dev;
   uint64_t ino;
   char *fn;
@@ -785,8 +849,10 @@ hash_for_index_cb (void *cls,
     publish_content (pc);
     return;
   }
     publish_content (pc);
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Hash of indexed file `%s' is `%s'\n",
-              p->filename, GNUNET_h2s (res));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Hash of indexed file `%s' is `%s'\n",
+              p->filename,
+              GNUNET_h2s (res));
   if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY))
   {
     p->data.file.file_id = *res;
   if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY))
   {
     p->data.file.file_id = *res;
@@ -797,8 +863,12 @@ hash_for_index_cb (void *cls,
     GNUNET_free (fn);
     return;
   }
     GNUNET_free (fn);
     return;
   }
-  client = GNUNET_CLIENT_connect ("fs", pc->h->cfg);
-  if (NULL == client)
+  pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg,
+                                  "fs",
+                                  handlers,
+                                  &index_mq_error_handler,
+                                  pc);
+  if (NULL == pc->mq)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 _("Can not index file `%s': %s.  Will try to insert instead.\n"),
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 _("Can not index file `%s': %s.  Will try to insert instead.\n"),
@@ -815,10 +885,13 @@ hash_for_index_cb (void *cls,
     p->data.file.have_hash = GNUNET_YES;
     GNUNET_FS_file_information_sync_ (p);
   }
     p->data.file.have_hash = GNUNET_YES;
     GNUNET_FS_file_information_sync_ (p);
   }
-  ism = GNUNET_malloc (sizeof (struct IndexStartMessage) + slen);
-  ism->header.size = htons (sizeof (struct IndexStartMessage) + slen);
-  ism->header.type = htons (GNUNET_MESSAGE_TYPE_FS_INDEX_START);
-  if (GNUNET_OK == GNUNET_DISK_file_get_identifiers (p->filename, &dev, &ino))
+  env = GNUNET_MQ_msg_extra (ism,
+                             slen,
+                             GNUNET_MESSAGE_TYPE_FS_INDEX_START);
+  if (GNUNET_OK ==
+      GNUNET_DISK_file_get_identifiers (p->filename,
+                                        &dev,
+                                        &ino))
   {
     ism->device = GNUNET_htonll (dev);
     ism->inode = GNUNET_htonll (ino);
   {
     ism->device = GNUNET_htonll (dev);
     ism->inode = GNUNET_htonll (ino);
@@ -826,19 +899,16 @@ hash_for_index_cb (void *cls,
   else
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   else
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                _("Failed to get file identifiers for `%s'\n"), p->filename);
+                _("Failed to get file identifiers for `%s'\n"),
+                p->filename);
   }
   ism->file_id = *res;
   }
   ism->file_id = *res;
-  memcpy (&ism[1], fn, slen);
+  memcpy (&ism[1],
+          fn,
+          slen);
   GNUNET_free (fn);
   GNUNET_free (fn);
-  pc->client = client;
-  GNUNET_break (GNUNET_YES ==
-                GNUNET_CLIENT_transmit_and_get_response (client, &ism->header,
-                                                         GNUNET_TIME_UNIT_FOREVER_REL,
-                                                         GNUNET_YES,
-                                                         &process_index_start_response,
-                                                         pc));
-  GNUNET_free (ism);
+  GNUNET_MQ_send (pc->mq,
+                  env);
 }
 
 
 }
 
 
@@ -862,7 +932,8 @@ publish_kblocks (struct GNUNET_FS_PublishContext *pc)
                                         p->chk_uri,
                                         &p->bo,
                                         pc->options,
                                         p->chk_uri,
                                         &p->bo,
                                         pc->options,
-                                        &publish_kblocks_cont, pc);
+                                        &publish_kblocks_cont,
+                                        pc);
   }
   else
   {
   }
   else
   {
@@ -872,40 +943,24 @@ publish_kblocks (struct GNUNET_FS_PublishContext *pc)
 
 
 /**
 
 
 /**
- * Process the response (or lack thereof) from
- * the "fs" service to our LOC sign request.
+ * Process the response from the "fs" service to our LOC sign request.
  *
  * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
  *
  * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
- * @param msg the response we got
+ * @param sig the response we got
  */
 static void
  */
 static void
-process_signature_response (void *cls,
-                            const struct GNUNET_MessageHeader *msg)
+handle_signature_response (void *cls,
+                           const struct ResponseLocSignatureMessage *sig)
 {
   struct GNUNET_FS_PublishContext *pc = cls;
 {
   struct GNUNET_FS_PublishContext *pc = cls;
-  const struct ResponseLocSignatureMessage *sig;
   struct GNUNET_FS_FileInformation *p;
 
   p = pc->fi_pos;
   struct GNUNET_FS_FileInformation *p;
 
   p = pc->fi_pos;
-  if (NULL == msg)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                _("Can not create LOC URI. Will continue with CHK instead.\n"));
-    publish_kblocks (pc);
-    return;
-  }
-  if (sizeof (struct ResponseLocSignatureMessage) !=
-      ntohs (msg->size))
-  {
-    GNUNET_break (0);
-    publish_kblocks (pc);
-    return;
-  }
-  sig = (const struct ResponseLocSignatureMessage *) msg;
   p->chk_uri->type = GNUNET_FS_URI_LOC;
   /* p->data.loc.fi kept from CHK before */
   p->chk_uri->data.loc.peer = sig->peer;
   p->chk_uri->type = GNUNET_FS_URI_LOC;
   /* p->data.loc.fi kept from CHK before */
   p->chk_uri->data.loc.peer = sig->peer;
-  p->chk_uri->data.loc.expirationTime = GNUNET_TIME_absolute_ntoh (sig->expiration_time);
+  p->chk_uri->data.loc.expirationTime
+    = GNUNET_TIME_absolute_ntoh (sig->expiration_time);
   p->chk_uri->data.loc.contentSignature = sig->signature;
   GNUNET_FS_file_information_sync_ (p);
   GNUNET_FS_publish_sync_ (pc);
   p->chk_uri->data.loc.contentSignature = sig->signature;
   GNUNET_FS_file_information_sync_ (p);
   GNUNET_FS_publish_sync_ (pc);
@@ -913,6 +968,31 @@ process_signature_response (void *cls,
 }
 
 
 }
 
 
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_FS_PublishContext *`
+ * @param error error code
+ */
+static void
+loc_mq_error_handler (void *cls,
+                      enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_FS_PublishContext *pc = cls;
+
+  if (NULL != pc->mq)
+  {
+    GNUNET_MQ_destroy (pc->mq);
+    pc->mq = NULL;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              _("Can not create LOC URI. Will continue with CHK instead.\n"));
+  publish_kblocks (pc);
+}
+
+
 /**
  * We're publishing without anonymity. Contact the FS service
  * to create a signed LOC URI for further processing, then
 /**
  * We're publishing without anonymity. Contact the FS service
  * to create a signed LOC URI for further processing, then
@@ -923,12 +1003,25 @@ process_signature_response (void *cls,
 static void
 create_loc_uri (struct GNUNET_FS_PublishContext *pc)
 {
 static void
 create_loc_uri (struct GNUNET_FS_PublishContext *pc)
 {
-  struct RequestLocSignatureMessage req;
+  GNUNET_MQ_hd_fixed_size (signature_response,
+                           GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGNATURE,
+                           struct ResponseLocSignatureMessage);
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_signature_response_handler (pc),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MQ_Envelope *env;
+  struct RequestLocSignatureMessage *req;
   struct GNUNET_FS_FileInformation *p;
 
   struct GNUNET_FS_FileInformation *p;
 
-  if (NULL == pc->client)
-    pc->client = GNUNET_CLIENT_connect ("fs", pc->h->cfg);
-  if (NULL == pc->client)
+  if (NULL != pc->mq)
+    GNUNET_MQ_destroy (pc->mq);
+  pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg,
+                                  "fs",
+                                  handlers,
+                                  &loc_mq_error_handler,
+                                  pc);
+  if (NULL == pc->mq)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 _("Can not create LOC URI. Will continue with CHK instead.\n"));
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 _("Can not create LOC URI. Will continue with CHK instead.\n"));
@@ -936,19 +1029,14 @@ create_loc_uri (struct GNUNET_FS_PublishContext *pc)
     return;
   }
   p = pc->fi_pos;
     return;
   }
   p = pc->fi_pos;
-  req.header.size = htons (sizeof (struct RequestLocSignatureMessage));
-  req.header.type = htons (GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN);
-  req.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT);
-  req.expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time);
-  req.chk = p->chk_uri->data.chk.chk;
-  req.file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length);
-  GNUNET_break (GNUNET_YES ==
-                GNUNET_CLIENT_transmit_and_get_response (pc->client,
-                                                         &req.header,
-                                                         GNUNET_TIME_UNIT_FOREVER_REL,
-                                                         GNUNET_YES,
-                                                         &process_signature_response,
-                                                         pc));
+  env = GNUNET_MQ_msg (req,
+                       GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN);
+  req->purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT);
+  req->expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time);
+  req->chk = p->chk_uri->data.chk.chk;
+  req->file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length);
+  GNUNET_MQ_send (pc->mq,
+                  env);
 }
 
 
 }