X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs.c;h=06ac91c73d5f1a21f9b79d59f6454801106b0df7;hb=1f09f4f7716db5939ec1c9a278b5661616dd72d6;hp=20a98e6f24721285a357c1ac51882c5d6b1e2264;hpb=7351d04517a2a1ad48880d4fa46e780068929d6e;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 20a98e6f2..06ac91c73 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -24,9 +24,7 @@ * @author Christian Grothoff * * To use: - * - consider re-issue GSF_dht_lookup_ after non-DHT reply received - * - implement 'SUPPORT_DELAYS' - * + * - consider re-issue GSF_dht_lookup_ after non-DHT reply received */ #include "platform.h" #include @@ -59,7 +57,7 @@ /** - * How quickly do we age cover traffic? At the given + * How quickly do we age cover traffic? At the given * time interval, remaining cover traffic counters are * decremented by 1/16th. */ @@ -93,6 +91,12 @@ struct GNUNET_DHT_Handle *GSF_dht; */ struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime; +/** + * Running average of the observed latency to other peers (round trip). + * Initialized to 5s as the initial default. + */ +struct GNUNET_TIME_Relative GSF_avg_latency = { 500 }; + /** * Typical priorities we're seeing from other peers right now. Since * most priorities will be zero, this value is the weighted average of @@ -106,13 +110,13 @@ struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime; double GSF_current_priorities; /** - * How many query messages have we received 'recently' that + * How many query messages have we received 'recently' that * have not yet been claimed as cover traffic? */ unsigned int GSF_cover_query_count; /** - * How many content messages have we received 'recently' that + * How many content messages have we received 'recently' that * have not yet been claimed as cover traffic? */ unsigned int GSF_cover_content_count; @@ -128,6 +132,10 @@ struct GNUNET_BLOCK_Context *GSF_block_ctx; */ struct GNUNET_CORE_Handle *GSF_core; +/** + * Are we introducing randomized delays for better anonymity? + */ +int GSF_enable_randomized_delays; /* ***************************** locals ******************************* */ @@ -158,18 +166,16 @@ static struct GNUNET_PeerIdentity my_id; * @param tc task context */ static void -age_cover_counters (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +age_cover_counters (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GSF_cover_content_count = (GSF_cover_content_count * 15) / 16; GSF_cover_query_count = (GSF_cover_query_count * 15) / 16; - cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, - &age_cover_counters, - NULL); + cover_age_task = + GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, + NULL); } - /** * We've just now completed a datastore request. Update our * datastore load calculations. @@ -182,16 +188,15 @@ GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start) struct GNUNET_TIME_Relative delay; delay = GNUNET_TIME_absolute_get_duration (start); - GNUNET_LOAD_update (datastore_get_load, - delay.rel_value); + GNUNET_LOAD_update (datastore_get_load, delay.rel_value); } /** * Test if the DATABASE (GET) load on this peer is too high * to even consider processing the query at - * all. - * + * all. + * * @return GNUNET_YES if the load is too high to do anything (load high) * GNUNET_NO to process normally (load normal) * GNUNET_SYSERR to process for free (load low) @@ -203,13 +208,43 @@ GSF_test_get_load_too_high_ (uint32_t priority) ld = GNUNET_LOAD_get_load (datastore_get_load); if (ld < 1) - return GNUNET_SYSERR; - if (ld <= priority) - return GNUNET_NO; + return GNUNET_SYSERR; + if (ld <= priority) + return GNUNET_NO; return GNUNET_YES; } +/** + * We've received peer performance information. Update + * our running average for the P2P latency. + * + * @param atsi performance information + * @param atsi_count number of 'atsi' records + */ +static void +update_latencies (const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + unsigned int i; + + for (i = 0; i < atsi_count; i++) + { + if (ntohl (atsi[i].type) == GNUNET_ATS_QUALITY_NET_DELAY) + { + GSF_avg_latency.rel_value = + (GSF_avg_latency.rel_value * 31 + + GNUNET_MIN (5000, ntohl (atsi[i].value))) / 32; + GNUNET_STATISTICS_set (GSF_stats, + gettext_noop + ("# running average P2P latency (ms)"), + GSF_avg_latency.rel_value, GNUNET_NO); + break; + } + } +} + + /** * Handle P2P "PUT" message. * @@ -218,23 +253,26 @@ GSF_test_get_load_too_high_ (uint32_t priority) * for loopback messages where we are both sender and receiver) * @param message the actual message * @param atsi performance information + * @param atsi_count number of records in 'atsi' * @return GNUNET_OK to keep the connection open, * GNUNET_SYSERR to close it (signal serious error) */ static int -handle_p2p_put (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message, - const struct GNUNET_TRANSPORT_ATS_Information *atsi) +handle_p2p_put (void *cls, const struct GNUNET_PeerIdentity *other, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) { struct GSF_ConnectedPeer *cp; cp = GSF_peer_get_ (other); if (NULL == cp) - { - GNUNET_break (0); - return GNUNET_OK; - } + { + GNUNET_break (0); + return GNUNET_OK; + } + GSF_cover_content_count++; + update_latencies (atsi, atsi_count); return GSF_handle_p2p_content_ (cp, message); } @@ -250,12 +288,19 @@ handle_p2p_put (void *cls, */ static void consider_request_for_forwarding (void *cls, - const struct GNUNET_PeerIdentity *peer, - struct GSF_ConnectedPeer *cp, - const struct GSF_PeerPerformanceData *ppd) + const struct GNUNET_PeerIdentity *peer, + struct GSF_ConnectedPeer *cp, + const struct GSF_PeerPerformanceData *ppd) { struct GSF_PendingRequest *pr = cls; + if (GNUNET_YES != GSF_pending_request_test_target_ (pr, peer)) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Loopback routes suppressed"), 1, + GNUNET_NO); + return; + } GSF_plan_add_ (cp, pr); } @@ -271,14 +316,12 @@ consider_request_for_forwarding (void *cls, * @param result final datastore lookup result */ static void -consider_forwarding (void *cls, - struct GSF_PendingRequest *pr, - enum GNUNET_BLOCK_EvaluationResult result) +consider_forwarding (void *cls, struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result) { if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) - return; /* we're done... */ - GSF_iterate_connected_peers_ (&consider_request_for_forwarding, - pr); + return; /* we're done... */ + GSF_iterate_connected_peers_ (&consider_request_for_forwarding, pr); } @@ -290,23 +333,24 @@ consider_forwarding (void *cls, * for loopback messages where we are both sender and receiver) * @param message the actual message * @param atsi performance information + * @param atsi_count number of records in 'atsi' * @return GNUNET_OK to keep the connection open, * GNUNET_SYSERR to close it (signal serious error) */ static int -handle_p2p_get (void *cls, - const struct GNUNET_PeerIdentity *other, - const struct GNUNET_MessageHeader *message, - const struct GNUNET_TRANSPORT_ATS_Information *atsi) +handle_p2p_get (void *cls, const struct GNUNET_PeerIdentity *other, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) { struct GSF_PendingRequest *pr; pr = GSF_handle_p2p_query_ (other, message); if (NULL == pr) return GNUNET_SYSERR; - GSF_local_lookup_ (pr, - &consider_forwarding, - NULL); + GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; + GSF_local_lookup_ (pr, &consider_forwarding, NULL); + update_latencies (atsi, atsi_count); return GNUNET_OK; } @@ -314,7 +358,7 @@ handle_p2p_get (void *cls, /** * We're done with the local lookup, now consider * P2P processing (depending on request options and - * result status). Also signal that we can now + * result status). Also signal that we can now * receive more request information from the client. * * @param cls the client doing the request ('struct GNUNET_SERVER_Client') @@ -322,29 +366,24 @@ handle_p2p_get (void *cls, * @param result final datastore lookup result */ static void -start_p2p_processing (void *cls, - struct GSF_PendingRequest *pr, - enum GNUNET_BLOCK_EvaluationResult result) +start_p2p_processing (void *cls, struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result) { struct GNUNET_SERVER_Client *client = cls; struct GSF_PendingRequestData *prd; prd = GSF_pending_request_get_data_ (pr); -#if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Finished database lookup for local request `%s' with result %d\n", - GNUNET_h2s (&prd->query), - result); -#endif - GNUNET_SERVER_receive_done (client, - GNUNET_OK); + "Finished database lookup for local request `%s' with result %d\n", + GNUNET_h2s (&prd->query), result); + GNUNET_SERVER_receive_done (client, GNUNET_OK); if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) - return; /* we're done, 'pr' was already destroyed... */ - if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) ) - { - GSF_pending_request_cancel_ (pr); - return; - } + return; /* we're done, 'pr' was already destroyed... */ + if (0 != (GSF_PRO_LOCAL_ONLY & prd->options)) + { + GSF_pending_request_cancel_ (pr, GNUNET_YES); + return; + } GSF_dht_lookup_ (pr); consider_forwarding (NULL, pr, result); } @@ -358,21 +397,29 @@ start_p2p_processing (void *cls, * @param message the actual message */ static void -handle_start_search (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_start_search (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { struct GSF_PendingRequest *pr; - - pr = GSF_local_client_start_search_handler_ (client, message); - if (NULL == pr) - { - /* 'GNUNET_SERVER_receive_done was already called! */ - return; - } - GSF_local_lookup_ (pr, - &start_p2p_processing, - client); + int ret; + + pr = NULL; + ret = GSF_local_client_start_search_handler_ (client, message, &pr); + switch (ret) + { + case GNUNET_SYSERR: + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + break; + case GNUNET_NO: + GNUNET_SERVER_receive_done (client, GNUNET_OK); + break; + case GNUNET_YES: + GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; + GSF_local_lookup_ (pr, &start_p2p_processing, client); + break; + default: + GNUNET_assert (0); + } } @@ -383,14 +430,13 @@ handle_start_search (void *cls, * @param tc unused */ static void -shutdown_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { if (NULL != GSF_core) - { - GNUNET_CORE_disconnect (GSF_core); - GSF_core = NULL; - } + { + GNUNET_CORE_disconnect (GSF_core); + GSF_core = NULL; + } GSF_put_done_ (); GSF_push_done_ (); GSF_pending_request_done_ (); @@ -405,11 +451,12 @@ shutdown_task (void *cls, GNUNET_CONFIGURATION_destroy (block_cfg); block_cfg = NULL; GNUNET_STATISTICS_destroy (GSF_stats, GNUNET_NO); + GSF_stats = NULL; if (GNUNET_SCHEDULER_NO_TASK != cover_age_task) - { - GNUNET_SCHEDULER_cancel (cover_age_task); - cover_age_task = GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel (cover_age_task); + cover_age_task = GNUNET_SCHEDULER_NO_TASK; + } GNUNET_FS_indexing_done (); GNUNET_LOAD_value_free (datastore_get_load); datastore_get_load = NULL; @@ -429,12 +476,20 @@ shutdown_task (void *cls, * @return GNUNET_YES to continue to iterate */ static int -consider_peer_for_forwarding (void *cls, - const GNUNET_HashCode *key, - struct GSF_PendingRequest *pr) +consider_peer_for_forwarding (void *cls, const GNUNET_HashCode * key, + struct GSF_PendingRequest *pr) { struct GSF_ConnectedPeer *cp = cls; - + struct GNUNET_PeerIdentity pid; + + GSF_connected_peer_get_identity_ (cp, &pid); + if (GNUNET_YES != GSF_pending_request_test_target_ (pr, &pid)) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Loopback routes suppressed"), 1, + GNUNET_NO); + return GNUNET_YES; + } GSF_plan_add_ (cp, pr); return GNUNET_YES; } @@ -446,21 +501,21 @@ consider_peer_for_forwarding (void *cls, * @param cls closure, not used * @param peer peer identity this notification is about * @param atsi performance information + * @param atsi_count number of records in 'atsi' */ -static void -peer_connect_handler (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_TRANSPORT_ATS_Information *atsi) +static void +peer_connect_handler (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) { struct GSF_ConnectedPeer *cp; if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity))) return; - cp = GSF_peer_connect_handler_ (peer, atsi); + cp = GSF_peer_connect_handler_ (peer, atsi, atsi_count); if (NULL == cp) return; - GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, - cp); + GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, cp); } @@ -474,16 +529,10 @@ peer_connect_handler (void *cls, * @param cls closure * @param server handle to the server, NULL if we failed * @param my_identity ID of this peer, NULL if we failed - * @param publicKey public key of this peer, NULL if we failed */ static void -peer_init_handler (void *cls, - struct GNUNET_CORE_Handle * server, - const struct GNUNET_PeerIdentity * - my_identity, - const struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded * - publicKey) +peer_init_handler (void *cls, struct GNUNET_CORE_Handle *server, + const struct GNUNET_PeerIdentity *my_identity) { my_id = *my_identity; } @@ -497,59 +546,50 @@ peer_init_handler (void *cls, */ static int main_init (struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *c) + const struct GNUNET_CONFIGURATION_Handle *c) { - static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = - { - { &handle_p2p_get, - GNUNET_MESSAGE_TYPE_FS_GET, 0 }, - { &handle_p2p_put, - GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, - { &GSF_handle_p2p_migration_stop_, - GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, - sizeof (struct MigrationStopMessage) }, - { NULL, 0, 0 } - }; + static const struct GNUNET_CORE_MessageHandler p2p_handlers[] = { + {&handle_p2p_get, + GNUNET_MESSAGE_TYPE_FS_GET, 0}, + {&handle_p2p_put, + GNUNET_MESSAGE_TYPE_FS_PUT, 0}, + {&GSF_handle_p2p_migration_stop_, + GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, + sizeof (struct MigrationStopMessage)}, + {NULL, 0, 0} + }; static const struct GNUNET_SERVER_MessageHandler handlers[] = { - {&GNUNET_FS_handle_index_start, NULL, + {&GNUNET_FS_handle_index_start, NULL, GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0}, - {&GNUNET_FS_handle_index_list_get, NULL, - GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) }, - {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, - sizeof (struct UnindexMessage) }, - {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, - 0 }, + {&GNUNET_FS_handle_index_list_get, NULL, + GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, + sizeof (struct GNUNET_MessageHeader)}, + {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, + sizeof (struct UnindexMessage)}, + {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, + 0}, {NULL, NULL, 0, 0} }; - GSF_core = GNUNET_CORE_connect (GSF_cfg, - 2, /* larger? */ - NULL, - &peer_init_handler, - &peer_connect_handler, - &GSF_peer_disconnect_handler_, - &GSF_peer_status_handler_, - NULL, GNUNET_NO, - NULL, GNUNET_NO, - p2p_handlers); + GSF_core = + GNUNET_CORE_connect (GSF_cfg, 1, NULL, &peer_init_handler, + &peer_connect_handler, &GSF_peer_disconnect_handler_, + NULL, GNUNET_NO, NULL, GNUNET_NO, p2p_handlers); if (NULL == GSF_core) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to connect to `%s' service.\n"), - "core"); - return GNUNET_SYSERR; - } - GNUNET_SERVER_disconnect_notify (server, - &GSF_client_disconnect_handler_, - NULL); + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to connect to `%s' service.\n"), "core"); + return GNUNET_SYSERR; + } + GNUNET_SERVER_disconnect_notify (server, &GSF_client_disconnect_handler_, + NULL); GNUNET_SERVER_add_handlers (server, handlers); - cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, - &age_cover_counters, - NULL); + cover_age_task = + GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, + NULL); datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, - NULL); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, + NULL); return GNUNET_OK; } @@ -562,41 +602,37 @@ main_init (struct GNUNET_SERVER_Handle *server, * @param cfg configuration to use */ static void -run (void *cls, - struct GNUNET_SERVER_Handle *server, +run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg) { GSF_cfg = cfg; + GSF_enable_randomized_delays = + GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY"); GSF_dsh = GNUNET_DATASTORE_connect (cfg); if (NULL == GSF_dsh) - { - GNUNET_SCHEDULER_shutdown (); - return; - } + { + GNUNET_SCHEDULER_shutdown (); + return; + } GSF_rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL); GSF_stats = GNUNET_STATISTICS_create ("fs", cfg); block_cfg = GNUNET_CONFIGURATION_create (); - GNUNET_CONFIGURATION_set_value_string (block_cfg, - "block", - "PLUGINS", - "fs"); + GNUNET_CONFIGURATION_set_value_string (block_cfg, "block", "PLUGINS", "fs"); GSF_block_ctx = GNUNET_BLOCK_context_create (block_cfg); GNUNET_assert (NULL != GSF_block_ctx); - GSF_dht = GNUNET_DHT_connect (cfg, - FS_DHT_HT_SIZE); + GSF_dht = GNUNET_DHT_connect (cfg, FS_DHT_HT_SIZE); GSF_plan_init (); GSF_pending_request_init_ (); GSF_connected_peer_init_ (); GSF_push_init_ (); GSF_put_init_ (); - if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, GSF_dsh)) || - - (GNUNET_OK != main_init (server, cfg)) ) - { - GNUNET_SCHEDULER_shutdown (); - shutdown_task (NULL, NULL); - return; - } + if ((GNUNET_OK != GNUNET_FS_indexing_init (cfg, GSF_dsh)) || + (GNUNET_OK != main_init (server, cfg))) + { + GNUNET_SCHEDULER_shutdown (); + shutdown_task (NULL, NULL); + return; + } } @@ -611,11 +647,8 @@ int main (int argc, char *const *argv) { return (GNUNET_OK == - GNUNET_SERVICE_run (argc, - argv, - "fs", - GNUNET_SERVICE_OPTION_NONE, - &run, NULL)) ? 0 : 1; + GNUNET_SERVICE_run (argc, argv, "fs", GNUNET_SERVICE_OPTION_NONE, + &run, NULL)) ? 0 : 1; } /* end of gnunet-service-fs.c */