From 14d30b9759fad1d797fe0650c11b38b869163e51 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 7 Oct 2010 13:52:14 +0000 Subject: [PATCH] adding support for artificial delays --- src/fs/fs.h | 6 -- src/fs/gnunet-service-fs.c | 151 ++++++++++++++++++++++++++++++------- 2 files changed, 123 insertions(+), 34 deletions(-) diff --git a/src/fs/fs.h b/src/fs/fs.h index 0eba5615c..d48af35b4 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -122,12 +122,6 @@ */ #define AVAILABILITY_TRIALS_MAX 8 -/** - * By how much (in ms) do we decrement the TTL - * at each hop? - */ -#define TTL_DECREMENT 5000 - /** * Length of the P2P success tracker. Note that * having a very long list can also hurt performance. diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index c2baf1ada..5118cb56c 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -44,6 +44,13 @@ #define DEBUG_FS GNUNET_NO +/** + * Should we introduce random latency in processing? Required for proper + * implementation of GAP, but can be disabled for performance evaluation of + * the basic routing algorithm. + */ +#define SUPPORT_DELAYS GNUNET_NO + /** * Maximum number of outgoing messages we queue per peer. */ @@ -139,6 +146,11 @@ struct PendingMessage */ void *cont_cls; + /** + * Do not transmit this pending message until this deadline. + */ + struct GNUNET_TIME_Absolute delay_until; + /** * Size of the reply; actual reply message follows * at the end of this struct. @@ -225,6 +237,11 @@ struct ConnectedPeer */ struct GNUNET_TIME_Absolute last_transmission_request_start; + /** + * ID of delay task for scheduling transmission. + */ + GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; + /** * Average priority of successful replies. Calculated * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n @@ -976,7 +993,7 @@ consider_migration (void *cls, } /* consider scheduling transmission to cp for content migration */ - if (cp->cth != NULL) + if (cp->cth != NULL) return GNUNET_YES; msize = 0; pos = mig_head; @@ -1004,6 +1021,11 @@ consider_migration (void *cls, msize, GNUNET_h2s (key)); #endif + if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task); + cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; + } cp->cth = GNUNET_CORE_notify_transmit_ready (core, 0, GNUNET_TIME_UNIT_FOREVER_REL, @@ -1336,12 +1358,12 @@ destroy_pending_message (struct PendingMessage *pm, TransmissionContinuation cont; void *cont_cls; + cont = pm->cont; + cont_cls = pm->cont_cls; if (pml != NULL) { GNUNET_assert (pml->pm == pm); GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); - cont = pm->cont; - cont_cls = pm->cont_cls; destroy_pending_message_list_entry (pml); } else @@ -1689,7 +1711,15 @@ peer_disconnect_handler (void *cls, GNUNET_PEER_change_rc (cp->pid, -1); GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); if (NULL != cp->cth) - GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); + { + GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); + cp->cth = NULL; + } + if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task); + cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; + } while (NULL != (pm = cp->pending_messages_head)) destroy_pending_message (pm, 0 /* delivery failed */); GNUNET_LOAD_value_free (cp->transmission_delay); @@ -1893,6 +1923,39 @@ shutdown_task (void *cls, /* ******************* Utility functions ******************** */ +/** + * We've had to delay a request for transmission to core, but now + * we should be ready. Run it. + * + * @param cls the 'struct ConnectedPeer' for which a request was delayed + * @param tc task context (unused) + */ +static void +delayed_transmission_request (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct ConnectedPeer *cp = cls; + struct GNUNET_PeerIdentity pid; + struct PendingMessage *pm; + + pm = cp->pending_messages_head; + cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (cp->cth == NULL); + if (pm == NULL) + return; + GNUNET_PEER_resolve (cp->pid, + &pid); + cp->last_transmission_request_start = GNUNET_TIME_absolute_get (); + cp->cth = GNUNET_CORE_notify_transmit_ready (core, + pm->priority, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + &pid, + pm->msize, + &transmit_to_peer, + cp); +} + + /** * Transmit messages by copying it to the target buffer * "buf". "buf" will be NULL and "size" zero if the socket was closed @@ -1912,13 +1975,16 @@ transmit_to_peer (void *cls, { struct ConnectedPeer *cp = cls; char *cbuf = buf; - struct GNUNET_PeerIdentity pid; struct PendingMessage *pm; + struct PendingMessage *next_pm; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Relative min_delay; struct MigrationReadyBlock *mb; struct MigrationReadyBlock *next; struct PutMessage migm; size_t msize; unsigned int i; + struct GNUNET_PeerIdentity pid; cp->cth = NULL; if (NULL == buf) @@ -1930,33 +1996,48 @@ transmit_to_peer (void *cls, GNUNET_LOAD_update (cp->transmission_delay, UINT64_MAX); return 0; - } + } GNUNET_LOAD_update (cp->transmission_delay, GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value); + now = GNUNET_TIME_absolute_get (); msize = 0; - while ( (NULL != (pm = cp->pending_messages_head) ) && + min_delay = GNUNET_TIME_UNIT_FOREVER_REL; + next_pm = cp->pending_messages_head; + while ( (NULL != (pm = next_pm) ) && (pm->msize <= size) ) { + next_pm = pm->next; + if (pm->delay_until.value > now.value) + { + min_delay = GNUNET_TIME_relative_min (min_delay, + GNUNET_TIME_absolute_get_remaining (pm->delay_until)); + continue; + } memcpy (&cbuf[msize], &pm[1], pm->msize); msize += pm->msize; size -= pm->msize; + GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head, + cp->pending_messages_tail, + pm); + if (NULL == pm->pml) + cp->pending_requests--; destroy_pending_message (pm, cp->pid); } - if (NULL != pm) - { - GNUNET_PEER_resolve (cp->pid, - &pid); - cp->last_transmission_request_start = GNUNET_TIME_absolute_get (); - cp->cth = GNUNET_CORE_notify_transmit_ready (core, - pm->priority, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - &pid, - pm->msize, - &transmit_to_peer, - cp); + if (pm != NULL) + min_delay = GNUNET_TIME_UNIT_ZERO; + if (NULL != cp->pending_messages_head) + { + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task); + cp->delayed_transmission_request_task + = GNUNET_SCHEDULER_add_delayed (sched, + min_delay, + &delayed_transmission_request, + cp); } - else + if (pm == NULL) { + GNUNET_PEER_resolve (cp->pid, + &pid); next = mig_head; while (NULL != (mb = next)) { @@ -1984,7 +2065,7 @@ transmit_to_peer (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing migration block `%s' (%u bytes) to `%s'\n", GNUNET_h2s (&mb->query), - mb->size, + (unsigned int) mb->size, GNUNET_i2s (&pid)); #endif break; @@ -1995,7 +2076,7 @@ transmit_to_peer (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n", GNUNET_h2s (&mb->query), - mb->size, + (unsigned int) mb->size, GNUNET_i2s (&pid)); #endif } @@ -2013,9 +2094,9 @@ transmit_to_peer (void *cls, } #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u bytes to peer %u\n", - msize, - cp->pid); + "Transmitting %u bytes to peer with PID %u\n", + (unsigned int) msize, + (unsigned int) cp->pid); #endif return msize; } @@ -2063,7 +2144,15 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp, destroy_pending_message (cp->pending_messages_tail, 0); GNUNET_PEER_resolve (cp->pid, &pid); if (NULL != cp->cth) - GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); + { + GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); + cp->cth = NULL; + } + if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task); + cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; + } /* need to schedule transmission */ cp->last_transmission_request_start = GNUNET_TIME_absolute_get (); cp->cth = GNUNET_CORE_notify_transmit_ready (core, @@ -3119,6 +3208,12 @@ process_reply (void *cls, reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); reply->cont = &transmit_reply_continuation; reply->cont_cls = pr; +#if SUPPORT_DELAYS + reply->delay_until + = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + TTL_DECREMENT))); +#endif reply->msize = msize; reply->priority = UINT32_MAX; /* send replies first! */ pm = (struct PutMessage*) &reply[1]; @@ -3557,10 +3652,10 @@ process_local_reply (void *cls, prq.priority = priority; prq.finished = GNUNET_NO; prq.request_found = GNUNET_NO; - process_reply (&prq, key, pr); if ( (old_rf == 0) && - (pr->results_found == 1) ) + (pr->results_found == 0) ) update_datastore_delays (pr->start_time); + process_reply (&prq, key, pr); if (prq.finished == GNUNET_YES) return; if (pr->qe == NULL) -- 2.25.1