From: Christian Grothoff Date: Tue, 7 Jun 2011 22:08:36 +0000 (+0000) Subject: add support for delays X-Git-Tag: initial-import-from-subversion-38251~18298 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=4cb7e23cef8a149ac1334519ff898cc05811ac66;p=oweals%2Fgnunet.git add support for delays --- diff --git a/TODO b/TODO index bd5bad598..22e39ae5e 100644 --- a/TODO +++ b/TODO @@ -1,6 +1,5 @@ 0.9.0pre3: -* FS [CG] - - implement 'SUPPORT_DELAYS' +* crashes in gnunet-service-transport [MW] 0.9.0: * new webpage: diff --git a/src/fs/fs_test_lib_data.conf b/src/fs/fs_test_lib_data.conf index 377348f02..d3e9922c3 100644 --- a/src/fs/fs_test_lib_data.conf +++ b/src/fs/fs_test_lib_data.conf @@ -56,6 +56,7 @@ HOSTNAME = localhost #OPTIONS = -L DEBUG CONTENT_CACHING = NO CONTENT_PUSHING = NO +DELAY = NO #DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes #BINARY = /home/grothoff/bin/gnunet-service-fs diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 87cdd8322..d95aec6bf 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -128,6 +128,10 @@ struct GNUNET_BLOCK_Context *GSF_block_ctx; */ struct GNUNET_CORE_Handle *GSF_core; +/** + * Are we introducing randomized delays for better anonymity? + */ +int GSF_enable_randomized_delays; /* ***************************** locals ******************************* */ @@ -235,6 +239,7 @@ handle_p2p_put (void *cls, GNUNET_break (0); return GNUNET_OK; } + GSF_cover_content_count++; return GSF_handle_p2p_content_ (cp, message); } @@ -568,6 +573,7 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg) { GSF_cfg = cfg; + GSF_enable_randomized_delays = GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY"); GSF_dsh = GNUNET_DATASTORE_connect (cfg); if (NULL == GSF_dsh) { diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h index 792b43049..ed7cd2e7b 100644 --- a/src/fs/gnunet-service-fs.h +++ b/src/fs/gnunet-service-fs.h @@ -147,6 +147,10 @@ extern unsigned int GSF_cover_content_count; */ extern struct GNUNET_BLOCK_Context *GSF_block_ctx; +/** + * Are we introducing randomized delays for better anonymity? + */ +extern int GSF_enable_randomized_delays; /** * Test if the DATABASE (GET) load on this peer is too high diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 7123db73a..9686c7dfe 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -125,6 +125,45 @@ struct GSF_PeerTransmitHandle }; +/** + * Handle for an entry in our delay list. + */ +struct GSF_DelayedHandle +{ + + /** + * Kept in a doubly-linked list. + */ + struct GSF_DelayedHandle *next; + + /** + * Kept in a doubly-linked list. + */ + struct GSF_DelayedHandle *prev; + + /** + * Peer this transmission belongs to. + */ + struct GSF_ConnectedPeer *cp; + + /** + * The PUT that was delayed. + */ + struct PutMessage *pm; + + /** + * Task for the delay. + */ + GNUNET_SCHEDULER_TaskIdentifier delay_task; + + /** + * Size of the message. + */ + size_t msize; + +}; + + /** * Information per peer and request. */ @@ -183,6 +222,18 @@ struct GSF_ConnectedPeer */ struct GSF_PeerTransmitHandle *pth_tail; + /** + * Messages (replies, queries, content migration) we would like to + * send to this peer in the near future. Sorted by priority, head. + */ + struct GSF_DelayedHandle *delayed_head; + + /** + * Messages (replies, queries, content migration) we would like to + * send to this peer in the near future. Sorted by priority, tail. + */ + struct GSF_DelayedHandle *delayed_tail; + /** * Migration stop message in our queue, or NULL if we have none pending. */ @@ -756,6 +807,61 @@ peer_request_destroy (void *cls, } +/** + * The artificial delay is over, transmit the message now. + * + * @param cls the 'struct GSF_DelayedHandle' with the message + * @param tc scheduler context + */ +static void +transmit_delayed_now (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GSF_DelayedHandle *dh = cls; + struct GSF_ConnectedPeer *cp = dh->cp; + + GNUNET_CONTAINER_DLL_remove (cp->delayed_head, + cp->delayed_tail, + dh); + if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason)) + { + GNUNET_free (dh->pm); + GNUNET_free (dh); + return; + } + (void) GSF_peer_transmit_ (cp, GNUNET_NO, + UINT32_MAX, + REPLY_TIMEOUT, + dh->msize, + ©_reply, + dh->pm); + GNUNET_free (dh); +} + + +/** + * Get the randomized delay a response should be subjected to. + * + * @return desired delay + */ +static struct GNUNET_TIME_Relative +get_randomized_delay () +{ + struct GNUNET_TIME_Relative ret; + + /* FIXME: replace 5000 with something relating to current observed P2P message latency */ + ret = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + 5000)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# artificial delays introduced (ms)"), + ret.rel_value, + GNUNET_NO); + + return ret; +} + + /** * Handle a reply to a pending request. Also called if a request * expires (then with data == NULL). The handler may be called @@ -767,6 +873,7 @@ peer_request_destroy (void *cls, * @param cls 'struct PeerRequest' this is an answer for * @param eval evaluation of the result * @param pr handle to the original pending request + * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown" * @param expiration when does 'data' expire? * @param type type of the block * @param data response data, NULL on request expiration @@ -776,6 +883,7 @@ static void handle_p2p_reply (void *cls, enum GNUNET_BLOCK_EvaluationResult eval, struct GSF_PendingRequest *pr, + uint32_t reply_anonymity_level, struct GNUNET_TIME_Absolute expiration, enum GNUNET_BLOCK_Type type, const void *data, @@ -825,18 +933,52 @@ handle_p2p_reply (void *cls, GNUNET_break (0); return; } + if ( (reply_anonymity_level != UINT32_MAX) && + (reply_anonymity_level > 1) ) + { + if (reply_anonymity_level - 1 > GSF_cover_content_count) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies dropped due to insufficient cover traffic"), + 1, + GNUNET_NO); + return; + } + GSF_cover_content_count -= (reply_anonymity_level - 1); + } + pm = GNUNET_malloc (msize); pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); pm->header.size = htons (msize); pm->type = htonl (type); pm->expiration = GNUNET_TIME_absolute_hton (expiration); memcpy (&pm[1], data, data_len); - (void) GSF_peer_transmit_ (cp, GNUNET_NO, - UINT32_MAX, - REPLY_TIMEOUT, - msize, - ©_reply, - pm); + if ( (reply_anonymity_level != UINT32_MAX) && + (reply_anonymity_level != 0) && + (GSF_enable_randomized_delays == GNUNET_YES) ) + { + struct GSF_DelayedHandle *dh; + + dh = GNUNET_malloc (sizeof (struct GSF_DelayedHandle)); + dh->cp = cp; + dh->pm = pm; + dh->msize = msize; + GNUNET_CONTAINER_DLL_insert (cp->delayed_head, + cp->delayed_tail, + dh); + dh->delay_task = GNUNET_SCHEDULER_add_delayed (get_randomized_delay (), + &transmit_delayed_now, + dh); + } + else + { + (void) GSF_peer_transmit_ (cp, GNUNET_NO, + UINT32_MAX, + REPLY_TIMEOUT, + msize, + ©_reply, + pm); + } if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) return; if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task) @@ -1492,6 +1634,7 @@ GSF_peer_disconnect_handler_ (void *cls, { struct GSF_ConnectedPeer *cp; struct GSF_PeerTransmitHandle *pth; + struct GSF_DelayedHandle *dh; cp = GNUNET_CONTAINER_multihashmap_get (cp_map, &peer->hashPubKey); @@ -1542,6 +1685,15 @@ GSF_peer_disconnect_handler_ (void *cls, GNUNET_assert (0 == pth->cth_in_progress); GNUNET_free (pth); } + while (NULL != (dh = cp->delayed_head)) + { + GNUNET_CONTAINER_DLL_remove (cp->delayed_head, + cp->delayed_tail, + dh); + GNUNET_SCHEDULER_cancel (dh->delay_task); + GNUNET_free (dh->pm); + GNUNET_free (dh); + } GNUNET_PEER_change_rc (cp->ppd.pid, -1); if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task) { diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c index fe22bd099..c712f58d6 100644 --- a/src/fs/gnunet-service-fs_lc.c +++ b/src/fs/gnunet-service-fs_lc.c @@ -222,6 +222,7 @@ client_request_destroy (void *cls, * @param cls user-specified closure * @param eval evaluation of the result * @param pr handle to the original pending request + * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown" * @param expiration when does 'data' expire? * @param type type of the block * @param data response data, NULL on request expiration @@ -231,6 +232,7 @@ static void client_response_handler (void *cls, enum GNUNET_BLOCK_EvaluationResult eval, struct GSF_PendingRequest *pr, + uint32_t reply_anonymity_level, struct GNUNET_TIME_Absolute expiration, enum GNUNET_BLOCK_Type type, const void *data, diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index d4f14985b..55b256e7d 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -366,6 +366,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr, + UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_BLOCK_TYPE_ANY, NULL, 0); @@ -726,7 +727,7 @@ update_request_performance_data (struct ProcessReplyClosure *prq, */ static int process_reply (void *cls, - const GNUNET_HashCode * key, + const GNUNET_HashCode *key, void *value) { struct ProcessReplyClosure *prq = cls; @@ -766,6 +767,7 @@ process_reply (void *cls, pr->rh (pr->rh_cls, prq->eval, pr, + prq->anonymity_level, prq->expiration, prq->type, prq->data, prq->size); @@ -825,6 +827,7 @@ process_reply (void *cls, pr->rh (pr->rh_cls, prq->eval, pr, + prq->anonymity_level, prq->expiration, prq->type, prq->data, prq->size); @@ -1426,7 +1429,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, prq.type = type; prq.expiration = expiration; prq.priority = 0; - prq.anonymity_level = 1; + prq.anonymity_level = UINT32_MAX; prq.request_found = GNUNET_NO; GNUNET_CONTAINER_multihashmap_get_multiple (pr_map, &query, diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index fa3c51ffd..2f828e281 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -160,6 +160,7 @@ struct GSF_PendingRequestData * @param cls user-specified closure * @param eval evaluation of the result * @param pr handle to the original pending request + * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown" * @param expiration when does 'data' expire? * @param type type of the block * @param data response data, NULL on request expiration @@ -168,6 +169,7 @@ struct GSF_PendingRequestData typedef void (*GSF_PendingRequestReplyHandler)(void *cls, enum GNUNET_BLOCK_EvaluationResult eval, struct GSF_PendingRequest *pr, + uint32_t reply_anonymity_level, struct GNUNET_TIME_Absolute expiration, enum GNUNET_BLOCK_Type type, const void *data, diff --git a/src/fs/perf_gnunet_service_fs_p2p.c b/src/fs/perf_gnunet_service_fs_p2p.c index 3a5a62756..572585520 100644 --- a/src/fs/perf_gnunet_service_fs_p2p.c +++ b/src/fs/perf_gnunet_service_fs_p2p.c @@ -81,6 +81,7 @@ struct StatValues */ static struct StatValues stats[] = { + { "fs", "# artificial delays introduced (ms)"}, { "fs", "# queries forwarded"}, { "fs", "# replies received and matched"}, { "fs", "# results found locally"}, @@ -93,7 +94,6 @@ static struct StatValues stats[] = { "fs", "# P2P searches discarded (queue length bound)"}, { "fs", "# replies received for local clients"}, { "fs", "# queries retransmitted to same target"}, - { "fs", "cummulative artificial delay introduced (ms)"}, { "core", "# bytes decrypted"}, { "core", "# bytes encrypted"}, { "core", "# discarded CORE_SEND requests"}, diff --git a/src/fs/perf_gnunet_service_fs_p2p_trust.c b/src/fs/perf_gnunet_service_fs_p2p_trust.c index 444e7e7c3..0128e281d 100644 --- a/src/fs/perf_gnunet_service_fs_p2p_trust.c +++ b/src/fs/perf_gnunet_service_fs_p2p_trust.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2010,2011 Christian Grothoff (and other contributing authors) + (C) 2010, 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -33,13 +33,15 @@ * @author Christian Grothoff * * Sample output: - * - 10 MB, 3 peers: - * Download speed of type `seeder 1' was 6 MiB/s - * Download speed of type `seeder 2' was 6 MiB/s - * Download speed of type `leach` was 1051 KiB/s + * - 10 MB, 3 peers, with delays: + * Download speed of type `seeder 1' was 757 KiB/s + * Download speed of type `seeder 2' was 613 KiB/s + * Download speed of type `leach` was 539 KiB/s * - * Analysis: leach squeezed out entirely early, then gets to - * take its turn once the main downloads complete + * - 10 MB, 3 peers, without delays: + * Download speed of type `seeder 1' was 1784 KiB/s + * Download speed of type `seeder 2' was 1604 KiB/s + * Download speed of type `leach` was 1384 KiB/s */ #include "platform.h" #include "fs_test_lib.h" @@ -114,6 +116,7 @@ struct StatValues */ static struct StatValues stats[] = { + { "fs", "# artificial delays introduced (ms)"}, { "fs", "# queries forwarded"}, { "fs", "# replies received and matched"}, { "fs", "# results found locally"}, @@ -126,13 +129,10 @@ static struct StatValues stats[] = { "fs", "# P2P searches discarded (queue length bound)"}, { "fs", "# replies received for local clients"}, { "fs", "# queries retransmitted to same target"}, - { "fs", "cummulative artificial delay introduced (ms)"}, { "core", "# bytes decrypted"}, { "core", "# bytes encrypted"}, { "core", "# discarded CORE_SEND requests"}, - { "core", "# discarded CORE_SEND request bytes"}, { "core", "# discarded lower priority CORE_SEND requests"}, - { "core", "# discarded lower priority CORE_SEND request bytes"}, { "transport", "# bytes received via TCP"}, { "transport", "# bytes transmitted via TCP"}, { "datacache", "# bytes stored"}, @@ -309,12 +309,12 @@ do_downloads (void *cls, VERBOSE, &do_report, "leach"); /* mutual downloads of (primary) sharing peers */ - GNUNET_FS_TEST_download (daemons[NUM_DAEMONS-1], + GNUNET_FS_TEST_download (daemons[NUM_DAEMONS-2], TIMEOUT, anonymity, SEED1, uri1, VERBOSE, &do_report, "seeder 2"); - GNUNET_FS_TEST_download (daemons[NUM_DAEMONS-2], + GNUNET_FS_TEST_download (daemons[NUM_DAEMONS-1], TIMEOUT, anonymity, SEED2, uri2, VERBOSE,