From: Christian Grothoff Date: Mon, 30 Apr 2018 09:31:22 +0000 (+0200) Subject: eliminate DHT PUT OK message by using MQ feature of calling continuation when transmi... X-Git-Tag: v0.11.0pre66~93^2~10^2~12 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=ccf723196e9ff7f31a56a8e8ebd8319d07fa17c8;p=oweals%2Fgnunet.git eliminate DHT PUT OK message by using MQ feature of calling continuation when transmission is complete --- diff --git a/src/dht/dht.h b/src/dht/dht.h index 4c994f93a..95ffa33ca 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -213,11 +213,6 @@ struct GNUNET_DHT_ClientPutMessage */ uint32_t desired_replication_level GNUNET_PACKED; - /** - * Unique ID for the PUT message. - */ - uint64_t unique_id GNUNET_PACKED; - /** * How long should this data persist? */ @@ -233,30 +228,6 @@ struct GNUNET_DHT_ClientPutMessage }; -/** - * Message to confirming receipt of PUT, sent from DHT service to clients. - */ -struct GNUNET_DHT_ClientPutConfirmationMessage -{ - /** - * Type: #GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK - */ - struct GNUNET_MessageHeader header; - - /** - * Always zero. - */ - uint32_t reserved GNUNET_PACKED; - - /** - * Unique ID from the PUT message that is being confirmed. - */ - uint64_t unique_id GNUNET_PACKED; - -}; - - - /** * Message to monitor put requests going through peer, DHT service -> clients. */ diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 7a0771de0..af0dafbf3 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -55,7 +55,7 @@ struct GNUNET_DHT_PutHandle /** * Continuation to call when done. */ - GNUNET_DHT_PutContinuation cont; + GNUNET_SCHEDULER_TaskCallback cont; /** * Main handle to this DHT api @@ -68,9 +68,9 @@ struct GNUNET_DHT_PutHandle void *cont_cls; /** - * Unique ID for the PUT operation. + * Envelope from the PUT operation. */ - uint64_t unique_id; + struct GNUNET_MQ_Envelope *env; }; @@ -440,7 +440,7 @@ static void do_disconnect (struct GNUNET_DHT_Handle *h) { struct GNUNET_DHT_PutHandle *ph; - GNUNET_DHT_PutContinuation cont; + GNUNET_SCHEDULER_TaskCallback cont; void *cont_cls; if (NULL == h->mq) @@ -456,10 +456,10 @@ do_disconnect (struct GNUNET_DHT_Handle *h) { cont = ph->cont; cont_cls = ph->cont_cls; + ph->env = NULL; GNUNET_DHT_put_cancel (ph); if (NULL != cont) - cont (cont_cls, - GNUNET_SYSERR); + cont (cont_cls); } GNUNET_assert (NULL == h->reconnect_task); h->reconnect_task @@ -818,31 +818,23 @@ handle_client_result (void *cls, /** - * Process a put confirmation message from the service. + * Process a MQ PUT transmission notification. * * @param cls The DHT handle. - * @param msg confirmation message from the service. */ static void -handle_put_confirmation (void *cls, - const struct GNUNET_DHT_ClientPutConfirmationMessage *msg) +handle_put_cont (void *cls) { - struct GNUNET_DHT_Handle *handle = cls; - struct GNUNET_DHT_PutHandle *ph; - GNUNET_DHT_PutContinuation cont; + struct GNUNET_DHT_PutHandle *ph = cls; + GNUNET_SCHEDULER_TaskCallback cont; void *cont_cls; - for (ph = handle->put_head; NULL != ph; ph = ph->next) - if (ph->unique_id == msg->unique_id) - break; - if (NULL == ph) - return; cont = ph->cont; cont_cls = ph->cont_cls; + ph->env = NULL; GNUNET_DHT_put_cancel (ph); if (NULL != cont) - cont (cont_cls, - GNUNET_OK); + cont (cont_cls); } @@ -872,10 +864,6 @@ try_connect (struct GNUNET_DHT_Handle *h) GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT, struct GNUNET_DHT_ClientResultMessage, h), - GNUNET_MQ_hd_fixed_size (put_confirmation, - GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK, - struct GNUNET_DHT_ClientPutConfirmationMessage, - h), GNUNET_MQ_handler_end () }; if (NULL != h->mq) @@ -941,8 +929,7 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) while (NULL != (ph = handle->put_head)) { if (NULL != ph->cont) - ph->cont (ph->cont_cls, - GNUNET_SYSERR); + ph->cont (ph->cont_cls); GNUNET_DHT_put_cancel (ph); } if (NULL != handle->mq) @@ -989,7 +976,7 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, size_t size, const void *data, struct GNUNET_TIME_Absolute exp, - GNUNET_DHT_PutContinuation cont, + GNUNET_SCHEDULER_TaskCallback cont, void *cont_cls) { struct GNUNET_MQ_Envelope *env; @@ -1014,17 +1001,19 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, ph->dht_handle = handle; ph->cont = cont; ph->cont_cls = cont_cls; - ph->unique_id = ++handle->uid_gen; GNUNET_CONTAINER_DLL_insert_tail (handle->put_head, handle->put_tail, ph); env = GNUNET_MQ_msg_extra (put_msg, size, GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); + GNUNET_MQ_notify_sent (env, + &handle_put_cont, + ph); + ph->env = env; put_msg->type = htonl ((uint32_t) type); put_msg->options = htonl ((uint32_t) options); put_msg->desired_replication_level = htonl (desired_replication_level); - put_msg->unique_id = ph->unique_id; put_msg->expiration = GNUNET_TIME_absolute_hton (exp); put_msg->key = *key; GNUNET_memcpy (&put_msg[1], @@ -1052,6 +1041,10 @@ GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph) { struct GNUNET_DHT_Handle *handle = ph->dht_handle; + if (NULL != ph->env) + GNUNET_MQ_notify_sent (ph->env, + NULL, + NULL); GNUNET_CONTAINER_DLL_remove (handle->put_head, handle->put_tail, ph); diff --git a/src/dht/dht_test_lib.c b/src/dht/dht_test_lib.c index 4c1bd3057..52d5a3731 100644 --- a/src/dht/dht_test_lib.c +++ b/src/dht/dht_test_lib.c @@ -114,7 +114,6 @@ dht_connect_cb (void *cls, const char *emsg) { struct GNUNET_DHT_TEST_Context *ctx = cls; - unsigned int i; if (NULL != emsg) { @@ -124,10 +123,10 @@ dht_connect_cb (void *cls, GNUNET_SCHEDULER_shutdown (); return; } - for (i=0;inum_peers;i++) + for (unsigned int i=0;inum_peers;i++) if (op == ctx->ops[i]) ctx->dhts[i] = ca_result; - for (i=0;inum_peers;i++) + for (unsigned int i=0;inum_peers;i++) if (NULL == ctx->dhts[i]) return; /* still some DHT connections missing */ /* all DHT connections ready! */ @@ -147,9 +146,7 @@ dht_connect_cb (void *cls, void GNUNET_DHT_TEST_cleanup (struct GNUNET_DHT_TEST_Context *ctx) { - unsigned int i; - - for (i=0;inum_peers;i++) + for (unsigned int i=0;inum_peers;i++) GNUNET_TESTBED_operation_done (ctx->ops[i]); GNUNET_free (ctx->ops); GNUNET_free (ctx->dhts); @@ -160,18 +157,17 @@ GNUNET_DHT_TEST_cleanup (struct GNUNET_DHT_TEST_Context *ctx) static void dht_test_run (void *cls, - struct GNUNET_TESTBED_RunHandle *h, + struct GNUNET_TESTBED_RunHandle *h, unsigned int num_peers, struct GNUNET_TESTBED_Peer **peers, unsigned int links_succeeded, unsigned int links_failed) { struct GNUNET_DHT_TEST_Context *ctx = cls; - unsigned int i; GNUNET_assert (num_peers == ctx->num_peers); ctx->peers = peers; - for (i=0;iops[i] = GNUNET_TESTBED_service_connect (ctx, peers[i], "dht", diff --git a/src/dht/gnunet-dht-put.c b/src/dht/gnunet-dht-put.c index f183fe588..db4d04681 100644 --- a/src/dht/gnunet-dht-put.c +++ b/src/dht/gnunet-dht-put.c @@ -103,34 +103,12 @@ shutdown_task (void *cls) * Signature of the main function of a task. * * @param cls closure - * @param success #GNUNET_OK if the PUT was transmitted, - * #GNUNET_NO on timeout, - * #GNUNET_SYSERR on disconnect from service - * after the PUT message was transmitted - * (so we don't know if it was received or not) */ static void -message_sent_cont (void *cls, int success) +message_sent_cont (void *cls) { - if (verbose) - { - switch (success) - { - case GNUNET_OK: - FPRINTF (stderr, "%s `%s'!\n", _("PUT request sent with key"), GNUNET_h2s_full(&key)); - break; - case GNUNET_NO: - FPRINTF (stderr, "%s", _("Timeout sending PUT request!\n")); - break; - case GNUNET_SYSERR: - FPRINTF (stderr, "%s", _("PUT request not confirmed!\n")); - break; - default: - GNUNET_break (0); - break; - } - } - GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); + GNUNET_SCHEDULER_add_now (&shutdown_task, + NULL); } @@ -160,7 +138,8 @@ run (void *cls, if (NULL == (dht_handle = GNUNET_DHT_connect (cfg, 1))) { - FPRINTF (stderr, _("Could not connect to %s service!\n"), "DHT"); + FPRINTF (stderr, + _("Could not connect to DHT service!\n")); ret = 1; return; } @@ -203,55 +182,47 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { - GNUNET_GETOPT_option_string ('d', "data", "DATA", gettext_noop ("the data to insert under the key"), &data), - GNUNET_GETOPT_option_relative_time ('e', - "expiration", - "EXPIRATION", - gettext_noop ("how long to store this entry in the dht (in seconds)"), - &expiration), - + "expiration", + "EXPIRATION", + gettext_noop ("how long to store this entry in the dht (in seconds)"), + &expiration), GNUNET_GETOPT_option_string ('k', "key", "KEY", gettext_noop ("the query key"), &query_key), - GNUNET_GETOPT_option_flag ('x', - "demultiplex", - gettext_noop ("use DHT's demultiplex everywhere option"), - &demultixplex_everywhere), - + "demultiplex", + gettext_noop ("use DHT's demultiplex everywhere option"), + &demultixplex_everywhere), GNUNET_GETOPT_option_uint ('r', - "replication", - "LEVEL", - gettext_noop ("how many replicas to create"), - &replication), - + "replication", + "LEVEL", + gettext_noop ("how many replicas to create"), + &replication), GNUNET_GETOPT_option_flag ('R', - "record", - gettext_noop ("use DHT's record route option"), - &record_route), - + "record", + gettext_noop ("use DHT's record route option"), + &record_route), GNUNET_GETOPT_option_uint ('t', - "type", - "TYPE", - gettext_noop ("the type to insert data as"), - &query_type), - + "type", + "TYPE", + gettext_noop ("the type to insert data as"), + &query_type), GNUNET_GETOPT_option_verbose (&verbose), - GNUNET_GETOPT_OPTION_END }; - if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, - &argc, &argv)) + if (GNUNET_OK != + GNUNET_STRINGS_get_utf8_args (argc, argv, + &argc, &argv)) return 2; expiration = GNUNET_TIME_UNIT_HOURS; return (GNUNET_OK == diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index cb155c484..503d7867b 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -477,8 +477,6 @@ handle_dht_local_put (void *cls, struct ClientHandle *ch = cls; struct GNUNET_CONTAINER_BloomFilter *peer_bf; uint16_t size; - struct GNUNET_MQ_Envelope *env; - struct GNUNET_DHT_ClientPutConfirmationMessage *conf; size = ntohs (dht_msg->header.size); GNUNET_STATISTICS_update (GDS_stats, @@ -537,12 +535,6 @@ handle_dht_local_put (void *cls, &dht_msg[1], size - sizeof (struct GNUNET_DHT_ClientPutMessage)); GNUNET_CONTAINER_bloomfilter_free (peer_bf); - env = GNUNET_MQ_msg (conf, - GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK); - conf->reserved = htonl (0); - conf->unique_id = dht_msg->unique_id; - GNUNET_MQ_send (ch->mq, - env); GNUNET_SERVICE_client_continue (ch->client); } diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c index 8f4e0ed31..62d121306 100644 --- a/src/dht/test_dht_api.c +++ b/src/dht/test_dht_api.c @@ -105,11 +105,9 @@ test_get_iterator (void *cls, * Signature of the main function of a task. * * @param cls closure - * @param success result of PUT */ static void -test_get (void *cls, - int success) +test_get (void *cls) { struct GNUNET_HashCode hash; diff --git a/src/dht/test_dht_topo.c b/src/dht/test_dht_topo.c index 8be3064f7..79edb2b0c 100644 --- a/src/dht/test_dht_topo.c +++ b/src/dht/test_dht_topo.c @@ -332,19 +332,17 @@ dht_get_handler (void *cls, "Get successful\n"); #if 0 { - int i; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PATH: (get %u, put %u)\n", get_path_length, put_path_length); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " LOCAL\n"); - for (i = get_path_length - 1; i >= 0; i--) + for (int i = get_path_length - 1; i >= 0; i--) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&get_path[i])); - for (i = put_path_length - 1; i >= 0; i--) + for (int i = put_path_length - 1; i >= 0; i--) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&put_path[i])); @@ -384,12 +382,11 @@ do_puts (void *cls) struct GNUNET_DHT_Handle **hs = cls; struct GNUNET_HashCode key; struct GNUNET_HashCode value; - unsigned int i; put_task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting values into DHT\n"); - for (i = 0; i < NUM_PEERS; i++) + for (unsigned int i = 0; i < NUM_PEERS; i++) { GNUNET_CRYPTO_hash (&i, sizeof (i), diff --git a/src/exit/gnunet-daemon-exit.c b/src/exit/gnunet-daemon-exit.c index 5cb1ebfd9..ae40feea0 100644 --- a/src/exit/gnunet-daemon-exit.c +++ b/src/exit/gnunet-daemon-exit.c @@ -3426,16 +3426,11 @@ do_dht_put (void *cls); * Schedules the next PUT. * * @param cls closure, NULL - * @param success #GNUNET_OK if the operation worked (unused) */ static void -dht_put_cont (void *cls, - int success) +dht_put_cont (void *cls) { dht_put = NULL; - dht_task = GNUNET_SCHEDULER_add_delayed (DHT_PUT_FREQUENCY, - &do_dht_put, - NULL); } @@ -3450,7 +3445,9 @@ do_dht_put (void *cls) { struct GNUNET_TIME_Absolute expiration; - dht_task = NULL; + dht_task = GNUNET_SCHEDULER_add_delayed (DHT_PUT_FREQUENCY, + &do_dht_put, + NULL); expiration = GNUNET_TIME_absolute_ntoh (dns_advertisement.expiration_time); if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us < GNUNET_TIME_UNIT_HOURS.rel_value_us) @@ -3463,6 +3460,8 @@ do_dht_put (void *cls) &dns_advertisement.purpose, &dns_advertisement.signature)); } + if (NULL != dht_put) + GNUNET_DHT_put_cancel (dht_put); dht_put = GNUNET_DHT_put (dht, &dht_put_key, 1 /* replication */, diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index e8c7f586d..bb8560fff 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c @@ -135,14 +135,9 @@ schedule_next_put (struct PutOperator *po) * Continuation called after DHT PUT operation has finished. * * @param cls type of blocks to gather - * @param success GNUNET_OK if the PUT was transmitted, - * GNUNET_NO on timeout, - * GNUNET_SYSERR on disconnect from service - * after the PUT message was transmitted - * (so we don't know if it was received or not) */ static void -delay_dht_put_blocks (void *cls, int success) +delay_dht_put_blocks (void *cls) { struct PutOperator *po = cls; diff --git a/src/include/gnunet_dht_service.h b/src/include/gnunet_dht_service.h index d798482e3..a4c63889e 100644 --- a/src/include/gnunet_dht_service.h +++ b/src/include/gnunet_dht_service.h @@ -141,22 +141,6 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle); struct GNUNET_DHT_PutHandle; -/** - * Type of a PUT continuation. You must not call - * #GNUNET_DHT_disconnect in this continuation. - * - * @param cls closure - * @param success #GNUNET_OK if the PUT was transmitted, - * #GNUNET_NO on timeout, - * #GNUNET_SYSERR on disconnect from service - * after the PUT message was transmitted - * (so we don't know if it was received or not) - */ -typedef void -(*GNUNET_DHT_PutContinuation)(void *cls, - int success); - - /** * Perform a PUT operation storing data in the DHT. * @@ -184,7 +168,7 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, size_t size, const void *data, struct GNUNET_TIME_Absolute exp, - GNUNET_DHT_PutContinuation cont, + GNUNET_SCHEDULER_TaskCallback cont, void *cont_cls); diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index d692b28ff..bf1b48679 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -623,11 +623,6 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP 154 -/** - * Acknowledge receiving PUT request - */ -#define GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK 155 - /** * Certain results are already known to the client, filter those. */ diff --git a/src/util/mq.c b/src/util/mq.c index 0f9ad9a12..dbcce704d 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -780,7 +780,9 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls) { - GNUNET_assert (NULL == ev->sent_cb); + /* allow setting *OR* clearing callback */ + GNUNET_assert ( (NULL == ev->sent_cb) || + (NULL == cb) ); ev->sent_cb = cb; ev->sent_cls = cb_cls; } diff --git a/src/zonemaster/gnunet-service-zonemaster.c b/src/zonemaster/gnunet-service-zonemaster.c index 590e5aae2..c91c4b679 100644 --- a/src/zonemaster/gnunet-service-zonemaster.c +++ b/src/zonemaster/gnunet-service-zonemaster.c @@ -359,11 +359,9 @@ publish_zone_dht_start (void *cls); * by a monitor is done. * * @param cls a `struct DhtPutActivity` - * @param success #GNUNET_OK on success */ static void -dht_put_monitor_continuation (void *cls, - int success) +dht_put_monitor_continuation (void *cls) { struct DhtPutActivity *ma = cls; @@ -512,29 +510,23 @@ update_velocity () * Continuation called from DHT once the PUT operation is done. * * @param cls a `struct DhtPutActivity` - * @param success #GNUNET_OK on success */ static void -dht_put_continuation (void *cls, - int success) +dht_put_continuation (void *cls) { struct DhtPutActivity *ma = cls; num_public_records++; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "PUT complete (%s)\n", - (GNUNET_OK == success) ? "success" : "failure"); + "PUT complete\n"); dht_queue_length--; GNUNET_CONTAINER_DLL_remove (it_head, it_tail, ma); GNUNET_free (ma); - if (GNUNET_OK == success) - { - put_cnt++; - if (0 == put_cnt % DELTA_INTERVAL) - update_velocity (); - } + put_cnt++; + if (0 == put_cnt % DELTA_INTERVAL) + update_velocity (); } @@ -598,7 +590,7 @@ perform_dht_put (const struct GNUNET_CRYPTO_EcdsaPrivateKey *key, const char *label, const struct GNUNET_GNSRECORD_Data *rd_public, unsigned int rd_public_count, - GNUNET_DHT_PutContinuation cont, + GNUNET_SCHEDULER_TaskCallback cont, void *cont_cls) { struct GNUNET_GNSRECORD_Block *block;