X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Ffs%2Fgnunet-service-fs.c;h=06ac91c73d5f1a21f9b79d59f6454801106b0df7;hb=1f09f4f7716db5939ec1c9a278b5661616dd72d6;hp=4d12b8bfd08fd2b468cef12e50dc0260d200928e;hpb=502af2167f7c218366666ca4944bd7cc54b5b19a;p=oweals%2Fgnunet.git diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 4d12b8bfd..06ac91c73 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -24,9 +24,7 @@ * @author Christian Grothoff * * To use: - * - consider re-issue GSF_dht_lookup_ after non-DHT reply received - * - implement 'SUPPORT_DELAYS' - * + * - consider re-issue GSF_dht_lookup_ after non-DHT reply received */ #include "platform.h" #include @@ -59,7 +57,7 @@ /** - * How quickly do we age cover traffic? At the given + * How quickly do we age cover traffic? At the given * time interval, remaining cover traffic counters are * decremented by 1/16th. */ @@ -93,6 +91,12 @@ struct GNUNET_DHT_Handle *GSF_dht; */ struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime; +/** + * Running average of the observed latency to other peers (round trip). + * Initialized to 5s as the initial default. + */ +struct GNUNET_TIME_Relative GSF_avg_latency = { 500 }; + /** * Typical priorities we're seeing from other peers right now. Since * most priorities will be zero, this value is the weighted average of @@ -106,13 +110,13 @@ struct GNUNET_LOAD_Value *GSF_rt_entry_lifetime; double GSF_current_priorities; /** - * How many query messages have we received 'recently' that + * How many query messages have we received 'recently' that * have not yet been claimed as cover traffic? */ unsigned int GSF_cover_query_count; /** - * How many content messages have we received 'recently' that + * How many content messages have we received 'recently' that * have not yet been claimed as cover traffic? */ unsigned int GSF_cover_content_count; @@ -166,12 +170,12 @@ age_cover_counters (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GSF_cover_content_count = (GSF_cover_content_count * 15) / 16; GSF_cover_query_count = (GSF_cover_query_count * 15) / 16; - cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, - &age_cover_counters, NULL); + cover_age_task = + GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, + NULL); } - /** * We've just now completed a datastore request. Update our * datastore load calculations. @@ -191,8 +195,8 @@ GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start) /** * Test if the DATABASE (GET) load on this peer is too high * to even consider processing the query at - * all. - * + * all. + * * @return GNUNET_YES if the load is too high to do anything (load high) * GNUNET_NO to process normally (load normal) * GNUNET_SYSERR to process for free (load low) @@ -211,6 +215,36 @@ GSF_test_get_load_too_high_ (uint32_t priority) } +/** + * We've received peer performance information. Update + * our running average for the P2P latency. + * + * @param atsi performance information + * @param atsi_count number of 'atsi' records + */ +static void +update_latencies (const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + unsigned int i; + + for (i = 0; i < atsi_count; i++) + { + if (ntohl (atsi[i].type) == GNUNET_ATS_QUALITY_NET_DELAY) + { + GSF_avg_latency.rel_value = + (GSF_avg_latency.rel_value * 31 + + GNUNET_MIN (5000, ntohl (atsi[i].value))) / 32; + GNUNET_STATISTICS_set (GSF_stats, + gettext_noop + ("# running average P2P latency (ms)"), + GSF_avg_latency.rel_value, GNUNET_NO); + break; + } + } +} + + /** * Handle P2P "PUT" message. * @@ -219,14 +253,15 @@ GSF_test_get_load_too_high_ (uint32_t priority) * for loopback messages where we are both sender and receiver) * @param message the actual message * @param atsi performance information + * @param atsi_count number of records in 'atsi' * @return GNUNET_OK to keep the connection open, * GNUNET_SYSERR to close it (signal serious error) */ static int -handle_p2p_put (void *cls, - const struct GNUNET_PeerIdentity *other, +handle_p2p_put (void *cls, const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message, - const struct GNUNET_TRANSPORT_ATS_Information *atsi) + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) { struct GSF_ConnectedPeer *cp; @@ -237,6 +272,7 @@ handle_p2p_put (void *cls, return GNUNET_OK; } GSF_cover_content_count++; + update_latencies (atsi, atsi_count); return GSF_handle_p2p_content_ (cp, message); } @@ -258,6 +294,13 @@ consider_request_for_forwarding (void *cls, { struct GSF_PendingRequest *pr = cls; + if (GNUNET_YES != GSF_pending_request_test_target_ (pr, peer)) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Loopback routes suppressed"), 1, + GNUNET_NO); + return; + } GSF_plan_add_ (cp, pr); } @@ -273,8 +316,7 @@ consider_request_for_forwarding (void *cls, * @param result final datastore lookup result */ static void -consider_forwarding (void *cls, - struct GSF_PendingRequest *pr, +consider_forwarding (void *cls, struct GSF_PendingRequest *pr, enum GNUNET_BLOCK_EvaluationResult result) { if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) @@ -291,21 +333,24 @@ consider_forwarding (void *cls, * for loopback messages where we are both sender and receiver) * @param message the actual message * @param atsi performance information + * @param atsi_count number of records in 'atsi' * @return GNUNET_OK to keep the connection open, * GNUNET_SYSERR to close it (signal serious error) */ static int -handle_p2p_get (void *cls, - const struct GNUNET_PeerIdentity *other, +handle_p2p_get (void *cls, const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message, - const struct GNUNET_TRANSPORT_ATS_Information *atsi) + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) { struct GSF_PendingRequest *pr; pr = GSF_handle_p2p_query_ (other, message); if (NULL == pr) return GNUNET_SYSERR; + GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; GSF_local_lookup_ (pr, &consider_forwarding, NULL); + update_latencies (atsi, atsi_count); return GNUNET_OK; } @@ -313,7 +358,7 @@ handle_p2p_get (void *cls, /** * We're done with the local lookup, now consider * P2P processing (depending on request options and - * result status). Also signal that we can now + * result status). Also signal that we can now * receive more request information from the client. * * @param cls the client doing the request ('struct GNUNET_SERVER_Client') @@ -321,19 +366,16 @@ handle_p2p_get (void *cls, * @param result final datastore lookup result */ static void -start_p2p_processing (void *cls, - struct GSF_PendingRequest *pr, +start_p2p_processing (void *cls, struct GSF_PendingRequest *pr, enum GNUNET_BLOCK_EvaluationResult result) { struct GNUNET_SERVER_Client *client = cls; struct GSF_PendingRequestData *prd; prd = GSF_pending_request_get_data_ (pr); -#if DEBUG_FS_CLIENT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Finished database lookup for local request `%s' with result %d\n", GNUNET_h2s (&prd->query), result); -#endif GNUNET_SERVER_receive_done (client, GNUNET_OK); if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) return; /* we're done, 'pr' was already destroyed... */ @@ -355,19 +397,29 @@ start_p2p_processing (void *cls, * @param message the actual message */ static void -handle_start_search (void *cls, - struct GNUNET_SERVER_Client *client, +handle_start_search (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { struct GSF_PendingRequest *pr; + int ret; - pr = GSF_local_client_start_search_handler_ (client, message); - if (NULL == pr) + pr = NULL; + ret = GSF_local_client_start_search_handler_ (client, message, &pr); + switch (ret) { - /* GNUNET_SERVER_receive_done was already called! */ - return; + case GNUNET_SYSERR: + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + break; + case GNUNET_NO: + GNUNET_SERVER_receive_done (client, GNUNET_OK); + break; + case GNUNET_YES: + GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; + GSF_local_lookup_ (pr, &start_p2p_processing, client); + break; + default: + GNUNET_assert (0); } - GSF_local_lookup_ (pr, &start_p2p_processing, client); } @@ -424,12 +476,20 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @return GNUNET_YES to continue to iterate */ static int -consider_peer_for_forwarding (void *cls, - const GNUNET_HashCode * key, +consider_peer_for_forwarding (void *cls, const GNUNET_HashCode * key, struct GSF_PendingRequest *pr) { struct GSF_ConnectedPeer *cp = cls; + struct GNUNET_PeerIdentity pid; + GSF_connected_peer_get_identity_ (cp, &pid); + if (GNUNET_YES != GSF_pending_request_test_target_ (pr, &pid)) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Loopback routes suppressed"), 1, + GNUNET_NO); + return GNUNET_YES; + } GSF_plan_add_ (cp, pr); return GNUNET_YES; } @@ -441,17 +501,18 @@ consider_peer_for_forwarding (void *cls, * @param cls closure, not used * @param peer peer identity this notification is about * @param atsi performance information + * @param atsi_count number of records in 'atsi' */ static void -peer_connect_handler (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_TRANSPORT_ATS_Information *atsi) +peer_connect_handler (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) { struct GSF_ConnectedPeer *cp; if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity))) return; - cp = GSF_peer_connect_handler_ (peer, atsi); + cp = GSF_peer_connect_handler_ (peer, atsi, atsi_count); if (NULL == cp) return; GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, cp); @@ -468,14 +529,10 @@ peer_connect_handler (void *cls, * @param cls closure * @param server handle to the server, NULL if we failed * @param my_identity ID of this peer, NULL if we failed - * @param publicKey public key of this peer, NULL if we failed */ static void -peer_init_handler (void *cls, - struct GNUNET_CORE_Handle *server, - const struct GNUNET_PeerIdentity *my_identity, - const struct - GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey) +peer_init_handler (void *cls, struct GNUNET_CORE_Handle *server, + const struct GNUNET_PeerIdentity *my_identity) { my_id = *my_identity; } @@ -514,28 +571,25 @@ main_init (struct GNUNET_SERVER_Handle *server, {NULL, NULL, 0, 0} }; - GSF_core = GNUNET_CORE_connect (GSF_cfg, 2, /* larger? */ - NULL, - &peer_init_handler, - &peer_connect_handler, - &GSF_peer_disconnect_handler_, - &GSF_peer_status_handler_, - NULL, GNUNET_NO, - NULL, GNUNET_NO, p2p_handlers); + GSF_core = + GNUNET_CORE_connect (GSF_cfg, 1, NULL, &peer_init_handler, + &peer_connect_handler, &GSF_peer_disconnect_handler_, + NULL, GNUNET_NO, NULL, GNUNET_NO, p2p_handlers); if (NULL == GSF_core) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to `%s' service.\n"), "core"); return GNUNET_SYSERR; } - GNUNET_SERVER_disconnect_notify (server, - &GSF_client_disconnect_handler_, NULL); + GNUNET_SERVER_disconnect_notify (server, &GSF_client_disconnect_handler_, + NULL); GNUNET_SERVER_add_handlers (server, handlers); - cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, - &age_cover_counters, NULL); + cover_age_task = + GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, + NULL); datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, - &shutdown_task, NULL); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, + NULL); return GNUNET_OK; } @@ -548,8 +602,7 @@ main_init (struct GNUNET_SERVER_Handle *server, * @param cfg configuration to use */ static void -run (void *cls, - struct GNUNET_SERVER_Handle *server, +run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg) { GSF_cfg = cfg; @@ -594,10 +647,8 @@ int main (int argc, char *const *argv) { return (GNUNET_OK == - GNUNET_SERVICE_run (argc, - argv, - "fs", - GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1; + GNUNET_SERVICE_run (argc, argv, "fs", GNUNET_SERVICE_OPTION_NONE, + &run, NULL)) ? 0 : 1; } /* end of gnunet-service-fs.c */