eliminate DHT PUT OK message by using MQ feature of calling continuation when transmi...
authorChristian Grothoff <christian@grothoff.org>
Mon, 30 Apr 2018 09:31:22 +0000 (11:31 +0200)
committerChristian Grothoff <christian@grothoff.org>
Mon, 30 Apr 2018 09:31:22 +0000 (11:31 +0200)
13 files changed:
src/dht/dht.h
src/dht/dht_api.c
src/dht/dht_test_lib.c
src/dht/gnunet-dht-put.c
src/dht/gnunet-service-dht_clients.c
src/dht/test_dht_api.c
src/dht/test_dht_topo.c
src/exit/gnunet-daemon-exit.c
src/fs/gnunet-service-fs_put.c
src/include/gnunet_dht_service.h
src/include/gnunet_protocols.h
src/util/mq.c
src/zonemaster/gnunet-service-zonemaster.c

index 4c994f93a43225781c71302406625f5a8aedd2da..95ffa33ca502a2e017994d418515187c9443fd64 100644 (file)
@@ -213,11 +213,6 @@ struct GNUNET_DHT_ClientPutMessage
    */
   uint32_t desired_replication_level GNUNET_PACKED;
 
-  /**
-   * Unique ID for the PUT message.
-   */
-  uint64_t unique_id GNUNET_PACKED;
-
   /**
    * How long should this data persist?
    */
@@ -233,30 +228,6 @@ struct GNUNET_DHT_ClientPutMessage
 };
 
 
-/**
- * Message to confirming receipt of PUT, sent from DHT service to clients.
- */
-struct GNUNET_DHT_ClientPutConfirmationMessage
-{
-  /**
-   * Type: #GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Always zero.
-   */
-  uint32_t reserved GNUNET_PACKED;
-
-  /**
-   * Unique ID from the PUT message that is being confirmed.
-   */
-  uint64_t unique_id GNUNET_PACKED;
-
-};
-
-
-
 /**
  * Message to monitor put requests going through peer, DHT service -> clients.
  */
index 7a0771de0d33c813ad88dfcfd0f1f33a93f14286..af0dafbf3867e2b00149c944d7ba3dd91adcb835 100644 (file)
@@ -55,7 +55,7 @@ struct GNUNET_DHT_PutHandle
   /**
    * Continuation to call when done.
    */
-  GNUNET_DHT_PutContinuation cont;
+  GNUNET_SCHEDULER_TaskCallback cont;
 
   /**
    * Main handle to this DHT api
@@ -68,9 +68,9 @@ struct GNUNET_DHT_PutHandle
   void *cont_cls;
 
   /**
-   * Unique ID for the PUT operation.
+   * Envelope from the PUT operation.
    */
-  uint64_t unique_id;
+  struct GNUNET_MQ_Envelope *env;
 
 };
 
@@ -440,7 +440,7 @@ static void
 do_disconnect (struct GNUNET_DHT_Handle *h)
 {
   struct GNUNET_DHT_PutHandle *ph;
-  GNUNET_DHT_PutContinuation cont;
+  GNUNET_SCHEDULER_TaskCallback cont;
   void *cont_cls;
 
   if (NULL == h->mq)
@@ -456,10 +456,10 @@ do_disconnect (struct GNUNET_DHT_Handle *h)
   {
     cont = ph->cont;
     cont_cls = ph->cont_cls;
+    ph->env = NULL;
     GNUNET_DHT_put_cancel (ph);
     if (NULL != cont)
-      cont (cont_cls,
-            GNUNET_SYSERR);
+      cont (cont_cls);
   }
   GNUNET_assert (NULL == h->reconnect_task);
   h->reconnect_task
@@ -818,31 +818,23 @@ handle_client_result (void *cls,
 
 
 /**
- * Process a put confirmation message from the service.
+ * Process a MQ PUT transmission notification.
  *
  * @param cls The DHT handle.
- * @param msg confirmation message from the service.
  */
 static void
-handle_put_confirmation (void *cls,
-                         const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
+handle_put_cont (void *cls)
 {
-  struct GNUNET_DHT_Handle *handle = cls;
-  struct GNUNET_DHT_PutHandle *ph;
-  GNUNET_DHT_PutContinuation cont;
+  struct GNUNET_DHT_PutHandle *ph = cls;
+  GNUNET_SCHEDULER_TaskCallback cont;
   void *cont_cls;
 
-  for (ph = handle->put_head; NULL != ph; ph = ph->next)
-    if (ph->unique_id == msg->unique_id)
-      break;
-  if (NULL == ph)
-    return;
   cont = ph->cont;
   cont_cls = ph->cont_cls;
+  ph->env = NULL;
   GNUNET_DHT_put_cancel (ph);
   if (NULL != cont)
-    cont (cont_cls,
-          GNUNET_OK);
+    cont (cont_cls);
 }
 
 
@@ -872,10 +864,6 @@ try_connect (struct GNUNET_DHT_Handle *h)
                            GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT,
                            struct GNUNET_DHT_ClientResultMessage,
                            h),
-    GNUNET_MQ_hd_fixed_size (put_confirmation,
-                             GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK,
-                             struct GNUNET_DHT_ClientPutConfirmationMessage,
-                             h),
     GNUNET_MQ_handler_end ()
   };
   if (NULL != h->mq)
@@ -941,8 +929,7 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
   while (NULL != (ph = handle->put_head))
   {
     if (NULL != ph->cont)
-      ph->cont (ph->cont_cls,
-                GNUNET_SYSERR);
+      ph->cont (ph->cont_cls);
     GNUNET_DHT_put_cancel (ph);
   }
   if (NULL != handle->mq)
@@ -989,7 +976,7 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
                 size_t size,
                 const void *data,
                 struct GNUNET_TIME_Absolute exp,
-                GNUNET_DHT_PutContinuation cont,
+                GNUNET_SCHEDULER_TaskCallback cont,
                 void *cont_cls)
 {
   struct GNUNET_MQ_Envelope *env;
@@ -1014,17 +1001,19 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
   ph->dht_handle = handle;
   ph->cont = cont;
   ph->cont_cls = cont_cls;
-  ph->unique_id = ++handle->uid_gen;
   GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
                                    handle->put_tail,
                                    ph);
   env = GNUNET_MQ_msg_extra (put_msg,
                              size,
                              GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
+  GNUNET_MQ_notify_sent (env,
+                         &handle_put_cont,
+                         ph);
+  ph->env = env;
   put_msg->type = htonl ((uint32_t) type);
   put_msg->options = htonl ((uint32_t) options);
   put_msg->desired_replication_level = htonl (desired_replication_level);
-  put_msg->unique_id = ph->unique_id;
   put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
   put_msg->key = *key;
   GNUNET_memcpy (&put_msg[1],
@@ -1052,6 +1041,10 @@ GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
 {
   struct GNUNET_DHT_Handle *handle = ph->dht_handle;
 
+  if (NULL != ph->env)
+    GNUNET_MQ_notify_sent (ph->env,
+                           NULL,
+                           NULL);
   GNUNET_CONTAINER_DLL_remove (handle->put_head,
                               handle->put_tail,
                               ph);
index 4c1bd3057e0e737de372f2f2d02896a4f0865d8c..52d5a3731a6aac89fd47f7be344915bf17e3834b 100644 (file)
@@ -114,7 +114,6 @@ dht_connect_cb (void *cls,
                const char *emsg)
 {
   struct GNUNET_DHT_TEST_Context *ctx = cls;
-  unsigned int i;
 
   if (NULL != emsg)
   {
@@ -124,10 +123,10 @@ dht_connect_cb (void *cls,
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
-  for (i=0;i<ctx->num_peers;i++)
+  for (unsigned int i=0;i<ctx->num_peers;i++)
     if (op == ctx->ops[i])
       ctx->dhts[i] = ca_result;
-  for (i=0;i<ctx->num_peers;i++)
+  for (unsigned int i=0;i<ctx->num_peers;i++)
     if (NULL == ctx->dhts[i])
       return; /* still some DHT connections missing */
   /* all DHT connections ready! */
@@ -147,9 +146,7 @@ dht_connect_cb (void *cls,
 void
 GNUNET_DHT_TEST_cleanup (struct GNUNET_DHT_TEST_Context *ctx)
 {
-  unsigned int i;
-
-  for (i=0;i<ctx->num_peers;i++)
+  for (unsigned int i=0;i<ctx->num_peers;i++)
     GNUNET_TESTBED_operation_done (ctx->ops[i]);
   GNUNET_free (ctx->ops);
   GNUNET_free (ctx->dhts);
@@ -160,18 +157,17 @@ GNUNET_DHT_TEST_cleanup (struct GNUNET_DHT_TEST_Context *ctx)
 
 static void
 dht_test_run (void *cls,
-             struct GNUNET_TESTBED_RunHandle *h,
+              struct GNUNET_TESTBED_RunHandle *h,
              unsigned int num_peers,
              struct GNUNET_TESTBED_Peer **peers,
               unsigned int links_succeeded,
               unsigned int links_failed)
 {
   struct GNUNET_DHT_TEST_Context *ctx = cls;
-  unsigned int i;
 
   GNUNET_assert (num_peers == ctx->num_peers);
   ctx->peers = peers;
-  for (i=0;i<num_peers;i++)
+  for (unsigned int i=0;i<num_peers;i++)
     ctx->ops[i] = GNUNET_TESTBED_service_connect (ctx,
                                                  peers[i],
                                                  "dht",
index f183fe588d62a910810147ccfca1193299e7fcff..db4d04681fdc88b9591278f77e6f5f1a95557459 100644 (file)
@@ -103,34 +103,12 @@ shutdown_task (void *cls)
  * Signature of the main function of a task.
  *
  * @param cls closure
- * @param success #GNUNET_OK if the PUT was transmitted,
- *                #GNUNET_NO on timeout,
- *                #GNUNET_SYSERR on disconnect from service
- *                after the PUT message was transmitted
- *                (so we don't know if it was received or not)
  */
 static void
-message_sent_cont (void *cls, int success)
+message_sent_cont (void *cls)
 {
-  if (verbose)
-  {
-    switch (success)
-    {
-    case GNUNET_OK:
-      FPRINTF (stderr, "%s `%s'!\n",  _("PUT request sent with key"), GNUNET_h2s_full(&key));
-      break;
-    case GNUNET_NO:
-      FPRINTF (stderr, "%s",  _("Timeout sending PUT request!\n"));
-      break;
-    case GNUNET_SYSERR:
-      FPRINTF (stderr, "%s",  _("PUT request not confirmed!\n"));
-      break;
-    default:
-      GNUNET_break (0);
-      break;
-    }
-  }
-  GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
+  GNUNET_SCHEDULER_add_now (&shutdown_task,
+                            NULL);
 }
 
 
@@ -160,7 +138,8 @@ run (void *cls,
 
   if (NULL == (dht_handle = GNUNET_DHT_connect (cfg, 1)))
   {
-    FPRINTF (stderr, _("Could not connect to %s service!\n"), "DHT");
+    FPRINTF (stderr,
+             _("Could not connect to DHT service!\n"));
     ret = 1;
     return;
   }
@@ -203,55 +182,47 @@ main (int argc, char *const *argv)
 {
 
   struct GNUNET_GETOPT_CommandLineOption options[] = {
-  
     GNUNET_GETOPT_option_string ('d',
                                  "data",
                                  "DATA",
                                  gettext_noop ("the data to insert under the key"),
                                  &data),
-  
     GNUNET_GETOPT_option_relative_time ('e',
-                                            "expiration",
-                                            "EXPIRATION",
-                                            gettext_noop ("how long to store this entry in the dht (in seconds)"),
-                                            &expiration),
-  
+                                        "expiration",
+                                        "EXPIRATION",
+                                        gettext_noop ("how long to store this entry in the dht (in seconds)"),
+                                        &expiration),
     GNUNET_GETOPT_option_string ('k',
                                  "key",
                                  "KEY",
                                  gettext_noop ("the query key"),
                                  &query_key),
-  
     GNUNET_GETOPT_option_flag ('x',
-                                  "demultiplex",
-                                  gettext_noop ("use DHT's demultiplex everywhere option"),
-                                  &demultixplex_everywhere),
-  
+                               "demultiplex",
+                               gettext_noop ("use DHT's demultiplex everywhere option"),
+                               &demultixplex_everywhere),
     GNUNET_GETOPT_option_uint ('r',
-                                   "replication",
-                                   "LEVEL",
-                                   gettext_noop ("how many replicas to create"),
-                                   &replication),
-  
+                               "replication",
+                               "LEVEL",
+                               gettext_noop ("how many replicas to create"),
+                               &replication),
     GNUNET_GETOPT_option_flag ('R',
-                                  "record",
-                                  gettext_noop ("use DHT's record route option"),
-                                  &record_route),
-  
+                               "record",
+                               gettext_noop ("use DHT's record route option"),
+                               &record_route),
     GNUNET_GETOPT_option_uint ('t',
-                                   "type",
-                                   "TYPE",
-                                   gettext_noop ("the type to insert data as"),
-                                   &query_type),
-  
+                               "type",
+                               "TYPE",
+                               gettext_noop ("the type to insert data as"),
+                               &query_type),
     GNUNET_GETOPT_option_verbose (&verbose),
-  
     GNUNET_GETOPT_OPTION_END
   };
 
 
-  if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv,
-                                                 &argc, &argv))
+  if (GNUNET_OK !=
+      GNUNET_STRINGS_get_utf8_args (argc, argv,
+                                    &argc, &argv))
     return 2;
   expiration = GNUNET_TIME_UNIT_HOURS;
   return (GNUNET_OK ==
index cb155c4845bf89d4b6fc8d5bd024f9d339dee140..503d7867bbe793520d3362e297ae28fc3b8667da 100644 (file)
@@ -477,8 +477,6 @@ handle_dht_local_put (void *cls,
   struct ClientHandle *ch = cls;
   struct GNUNET_CONTAINER_BloomFilter *peer_bf;
   uint16_t size;
-  struct GNUNET_MQ_Envelope *env;
-  struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
 
   size = ntohs (dht_msg->header.size);
   GNUNET_STATISTICS_update (GDS_stats,
@@ -537,12 +535,6 @@ handle_dht_local_put (void *cls,
                            &dht_msg[1],
                            size - sizeof (struct GNUNET_DHT_ClientPutMessage));
   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
-  env = GNUNET_MQ_msg (conf,
-                       GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
-  conf->reserved = htonl (0);
-  conf->unique_id = dht_msg->unique_id;
-  GNUNET_MQ_send (ch->mq,
-                  env);
   GNUNET_SERVICE_client_continue (ch->client);
 }
 
index 8f4e0ed3132b22768c9723a09e0c8d02bb4cd728..62d12130627e6b8162f3c947a735e16ea2867480 100644 (file)
@@ -105,11 +105,9 @@ test_get_iterator (void *cls,
  * Signature of the main function of a task.
  *
  * @param cls closure
- * @param success result of PUT
  */
 static void
-test_get (void *cls,
-          int success)
+test_get (void *cls)
 {
   struct GNUNET_HashCode hash;
 
index 8be3064f722f36dd8f7c6021142c2aa5fe6f3899..79edb2b0ccfc157f57238c90f41ffa06b0375e0c 100644 (file)
@@ -332,19 +332,17 @@ dht_get_handler (void *cls,
              "Get successful\n");
 #if 0
   {
-    int i;
-
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "PATH: (get %u, put %u)\n",
                get_path_length,
                 put_path_length);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "  LOCAL\n");
-    for (i = get_path_length - 1; i >= 0; i--)
+    for (int i = get_path_length - 1; i >= 0; i--)
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "  %s\n",
                  GNUNET_i2s (&get_path[i]));
-    for (i = put_path_length - 1; i >= 0; i--)
+    for (int i = put_path_length - 1; i >= 0; i--)
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "  %s\n",
                  GNUNET_i2s (&put_path[i]));
@@ -384,12 +382,11 @@ do_puts (void *cls)
   struct GNUNET_DHT_Handle **hs = cls;
   struct GNUNET_HashCode key;
   struct GNUNET_HashCode value;
-  unsigned int i;
 
   put_task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Putting values into DHT\n");
-  for (i = 0; i < NUM_PEERS; i++)
+  for (unsigned int i = 0; i < NUM_PEERS; i++)
   {
     GNUNET_CRYPTO_hash (&i,
                         sizeof (i),
index 5cb1ebfd9928fcbb87543f7c5de38bcd8a4f8ba0..ae40feea0cd4e96d33d4a8d84c65688e3633baa8 100644 (file)
@@ -3426,16 +3426,11 @@ do_dht_put (void *cls);
  * Schedules the next PUT.
  *
  * @param cls closure, NULL
- * @param success #GNUNET_OK if the operation worked (unused)
  */
 static void
-dht_put_cont (void *cls,
-             int success)
+dht_put_cont (void *cls)
 {
   dht_put = NULL;
-  dht_task = GNUNET_SCHEDULER_add_delayed (DHT_PUT_FREQUENCY,
-                                          &do_dht_put,
-                                          NULL);
 }
 
 
@@ -3450,7 +3445,9 @@ do_dht_put (void *cls)
 {
   struct GNUNET_TIME_Absolute expiration;
 
-  dht_task = NULL;
+  dht_task = GNUNET_SCHEDULER_add_delayed (DHT_PUT_FREQUENCY,
+                                          &do_dht_put,
+                                          NULL);
   expiration = GNUNET_TIME_absolute_ntoh (dns_advertisement.expiration_time);
   if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
       GNUNET_TIME_UNIT_HOURS.rel_value_us)
@@ -3463,6 +3460,8 @@ do_dht_put (void *cls)
                                           &dns_advertisement.purpose,
                                           &dns_advertisement.signature));
   }
+  if (NULL != dht_put)
+    GNUNET_DHT_put_cancel (dht_put);
   dht_put = GNUNET_DHT_put (dht,
                            &dht_put_key,
                            1 /* replication */,
index e8c7f586d1992bd4ec981cd70524765737b3c2d4..bb8560fff10e352d50ce92f2b4a531d4d04aa1d2 100644 (file)
@@ -135,14 +135,9 @@ schedule_next_put (struct PutOperator *po)
  * Continuation called after DHT PUT operation has finished.
  *
  * @param cls type of blocks to gather
- * @param success GNUNET_OK if the PUT was transmitted,
- *                GNUNET_NO on timeout,
- *                GNUNET_SYSERR on disconnect from service
- *                after the PUT message was transmitted
- *                (so we don't know if it was received or not)
  */
 static void
-delay_dht_put_blocks (void *cls, int success)
+delay_dht_put_blocks (void *cls)
 {
   struct PutOperator *po = cls;
 
index d798482e3e56da111501cdea349180b05b1f52a4..a4c63889e3ba37016630c8ea08f7d115d92987fd 100644 (file)
@@ -141,22 +141,6 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle);
 struct GNUNET_DHT_PutHandle;
 
 
-/**
- * Type of a PUT continuation.  You must not call
- * #GNUNET_DHT_disconnect in this continuation.
- *
- * @param cls closure
- * @param success #GNUNET_OK if the PUT was transmitted,
- *                #GNUNET_NO on timeout,
- *                #GNUNET_SYSERR on disconnect from service
- *                after the PUT message was transmitted
- *                (so we don't know if it was received or not)
- */
-typedef void
-(*GNUNET_DHT_PutContinuation)(void *cls,
-                              int success);
-
-
 /**
  * Perform a PUT operation storing data in the DHT.
  *
@@ -184,7 +168,7 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
                 size_t size,
                 const void *data,
                 struct GNUNET_TIME_Absolute exp,
-                GNUNET_DHT_PutContinuation cont,
+                GNUNET_SCHEDULER_TaskCallback cont,
                 void *cont_cls);
 
 
index d692b28ffb333ac02abe126b91badf138c654dd7..bf1b48679c00a04813eafd39c48f1741c33a2fcd 100644 (file)
@@ -623,11 +623,6 @@ extern "C"
  */
 #define GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP             154
 
-/**
- * Acknowledge receiving PUT request
- */
-#define GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK             155
-
 /**
  * Certain results are already known to the client, filter those.
  */
index 0f9ad9a12b7acec2b8eec2380ecb3e046a00f3cf..dbcce704dca500676974fd588fb435258d240216 100644 (file)
@@ -780,7 +780,9 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev,
                        GNUNET_SCHEDULER_TaskCallback cb,
                        void *cb_cls)
 {
-  GNUNET_assert (NULL == ev->sent_cb);
+  /* allow setting *OR* clearing callback */
+  GNUNET_assert ( (NULL == ev->sent_cb) ||
+                  (NULL == cb) );
   ev->sent_cb = cb;
   ev->sent_cls = cb_cls;
 }
index 590e5aae20e54097232880f9f679bd4b4251b99c..c91c4b6794ba8d4b36a81fe051bcf50878355418 100644 (file)
@@ -359,11 +359,9 @@ publish_zone_dht_start (void *cls);
  * by a monitor is done.
  *
  * @param cls a `struct DhtPutActivity`
- * @param success #GNUNET_OK on success
  */
 static void
-dht_put_monitor_continuation (void *cls,
-                              int success)
+dht_put_monitor_continuation (void *cls)
 {
   struct DhtPutActivity *ma = cls;
 
@@ -512,29 +510,23 @@ update_velocity ()
  * Continuation called from DHT once the PUT operation is done.
  *
  * @param cls a `struct DhtPutActivity`
- * @param success #GNUNET_OK on success
  */
 static void
-dht_put_continuation (void *cls,
-                      int success)
+dht_put_continuation (void *cls)
 {
   struct DhtPutActivity *ma = cls;
 
   num_public_records++;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "PUT complete (%s)\n",
-              (GNUNET_OK == success) ? "success" : "failure");
+              "PUT complete\n");
   dht_queue_length--;
   GNUNET_CONTAINER_DLL_remove (it_head,
                                it_tail,
                                ma);
   GNUNET_free (ma);
-  if (GNUNET_OK == success)
-  {
-    put_cnt++;
-    if (0 == put_cnt % DELTA_INTERVAL)
-      update_velocity ();
-  }
+  put_cnt++;
+  if (0 == put_cnt % DELTA_INTERVAL)
+    update_velocity ();
 }
 
 
@@ -598,7 +590,7 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key,
                  const char *label,
                  const struct GNUNET_GNSRECORD_Data *rd_public,
                  unsigned int rd_public_count,
-                 GNUNET_DHT_PutContinuation cont,
+                 GNUNET_SCHEDULER_TaskCallback cont,
                  void *cont_cls)
 {
   struct GNUNET_GNSRECORD_Block *block;