0.9.0pre3:
-* FS [CG]
- - implement 'SUPPORT_DELAYS'
+* crashes in gnunet-service-transport [MW]
0.9.0:
* new webpage:
#OPTIONS = -L DEBUG
CONTENT_CACHING = NO
CONTENT_PUSHING = NO
+DELAY = NO
#DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
#BINARY = /home/grothoff/bin/gnunet-service-fs
*/
struct GNUNET_CORE_Handle *GSF_core;
+/**
+ * Are we introducing randomized delays for better anonymity?
+ */
+int GSF_enable_randomized_delays;
/* ***************************** locals ******************************* */
GNUNET_break (0);
return GNUNET_OK;
}
+ GSF_cover_content_count++;
return GSF_handle_p2p_content_ (cp, message);
}
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
GSF_cfg = cfg;
+ GSF_enable_randomized_delays = GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY");
GSF_dsh = GNUNET_DATASTORE_connect (cfg);
if (NULL == GSF_dsh)
{
*/
extern struct GNUNET_BLOCK_Context *GSF_block_ctx;
+/**
+ * Are we introducing randomized delays for better anonymity?
+ */
+extern int GSF_enable_randomized_delays;
/**
* Test if the DATABASE (GET) load on this peer is too high
};
+/**
+ * Handle for an entry in our delay list.
+ */
+struct GSF_DelayedHandle
+{
+
+ /**
+ * Kept in a doubly-linked list.
+ */
+ struct GSF_DelayedHandle *next;
+
+ /**
+ * Kept in a doubly-linked list.
+ */
+ struct GSF_DelayedHandle *prev;
+
+ /**
+ * Peer this transmission belongs to.
+ */
+ struct GSF_ConnectedPeer *cp;
+
+ /**
+ * The PUT that was delayed.
+ */
+ struct PutMessage *pm;
+
+ /**
+ * Task for the delay.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier delay_task;
+
+ /**
+ * Size of the message.
+ */
+ size_t msize;
+
+};
+
+
/**
* Information per peer and request.
*/
*/
struct GSF_PeerTransmitHandle *pth_tail;
+ /**
+ * Messages (replies, queries, content migration) we would like to
+ * send to this peer in the near future. Sorted by priority, head.
+ */
+ struct GSF_DelayedHandle *delayed_head;
+
+ /**
+ * Messages (replies, queries, content migration) we would like to
+ * send to this peer in the near future. Sorted by priority, tail.
+ */
+ struct GSF_DelayedHandle *delayed_tail;
+
/**
* Migration stop message in our queue, or NULL if we have none pending.
*/
}
+/**
+ * The artificial delay is over, transmit the message now.
+ *
+ * @param cls the 'struct GSF_DelayedHandle' with the message
+ * @param tc scheduler context
+ */
+static void
+transmit_delayed_now (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GSF_DelayedHandle *dh = cls;
+ struct GSF_ConnectedPeer *cp = dh->cp;
+
+ GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ {
+ GNUNET_free (dh->pm);
+ GNUNET_free (dh);
+ return;
+ }
+ (void) GSF_peer_transmit_ (cp, GNUNET_NO,
+ UINT32_MAX,
+ REPLY_TIMEOUT,
+ dh->msize,
+ ©_reply,
+ dh->pm);
+ GNUNET_free (dh);
+}
+
+
+/**
+ * Get the randomized delay a response should be subjected to.
+ *
+ * @return desired delay
+ */
+static struct GNUNET_TIME_Relative
+get_randomized_delay ()
+{
+ struct GNUNET_TIME_Relative ret;
+
+ /* FIXME: replace 5000 with something relating to current observed P2P message latency */
+ ret = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ 5000));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# artificial delays introduced (ms)"),
+ ret.rel_value,
+ GNUNET_NO);
+
+ return ret;
+}
+
+
/**
* Handle a reply to a pending request. Also called if a request
* expires (then with data == NULL). The handler may be called
* @param cls 'struct PeerRequest' this is an answer for
* @param eval evaluation of the result
* @param pr handle to the original pending request
+ * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
* @param expiration when does 'data' expire?
* @param type type of the block
* @param data response data, NULL on request expiration
handle_p2p_reply (void *cls,
enum GNUNET_BLOCK_EvaluationResult eval,
struct GSF_PendingRequest *pr,
+ uint32_t reply_anonymity_level,
struct GNUNET_TIME_Absolute expiration,
enum GNUNET_BLOCK_Type type,
const void *data,
GNUNET_break (0);
return;
}
+ if ( (reply_anonymity_level != UINT32_MAX) &&
+ (reply_anonymity_level > 1) )
+ {
+ if (reply_anonymity_level - 1 > GSF_cover_content_count)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# replies dropped due to insufficient cover traffic"),
+ 1,
+ GNUNET_NO);
+ return;
+ }
+ GSF_cover_content_count -= (reply_anonymity_level - 1);
+ }
+
pm = GNUNET_malloc (msize);
pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
pm->header.size = htons (msize);
pm->type = htonl (type);
pm->expiration = GNUNET_TIME_absolute_hton (expiration);
memcpy (&pm[1], data, data_len);
- (void) GSF_peer_transmit_ (cp, GNUNET_NO,
- UINT32_MAX,
- REPLY_TIMEOUT,
- msize,
- ©_reply,
- pm);
+ if ( (reply_anonymity_level != UINT32_MAX) &&
+ (reply_anonymity_level != 0) &&
+ (GSF_enable_randomized_delays == GNUNET_YES) )
+ {
+ struct GSF_DelayedHandle *dh;
+
+ dh = GNUNET_malloc (sizeof (struct GSF_DelayedHandle));
+ dh->cp = cp;
+ dh->pm = pm;
+ dh->msize = msize;
+ GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ dh->delay_task = GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
+ &transmit_delayed_now,
+ dh);
+ }
+ else
+ {
+ (void) GSF_peer_transmit_ (cp, GNUNET_NO,
+ UINT32_MAX,
+ REPLY_TIMEOUT,
+ msize,
+ ©_reply,
+ pm);
+ }
if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
return;
if (GNUNET_SCHEDULER_NO_TASK == peerreq->kill_task)
{
struct GSF_ConnectedPeer *cp;
struct GSF_PeerTransmitHandle *pth;
+ struct GSF_DelayedHandle *dh;
cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
&peer->hashPubKey);
GNUNET_assert (0 == pth->cth_in_progress);
GNUNET_free (pth);
}
+ while (NULL != (dh = cp->delayed_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
+ cp->delayed_tail,
+ dh);
+ GNUNET_SCHEDULER_cancel (dh->delay_task);
+ GNUNET_free (dh->pm);
+ GNUNET_free (dh);
+ }
GNUNET_PEER_change_rc (cp->ppd.pid, -1);
if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task)
{
* @param cls user-specified closure
* @param eval evaluation of the result
* @param pr handle to the original pending request
+ * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
* @param expiration when does 'data' expire?
* @param type type of the block
* @param data response data, NULL on request expiration
client_response_handler (void *cls,
enum GNUNET_BLOCK_EvaluationResult eval,
struct GSF_PendingRequest *pr,
+ uint32_t reply_anonymity_level,
struct GNUNET_TIME_Absolute expiration,
enum GNUNET_BLOCK_Type type,
const void *data,
dpr->rh (dpr->rh_cls,
GNUNET_BLOCK_EVALUATION_REQUEST_VALID,
dpr,
+ UINT32_MAX,
GNUNET_TIME_UNIT_FOREVER_ABS,
GNUNET_BLOCK_TYPE_ANY,
NULL, 0);
*/
static int
process_reply (void *cls,
- const GNUNET_HashCode * key,
+ const GNUNET_HashCode *key,
void *value)
{
struct ProcessReplyClosure *prq = cls;
pr->rh (pr->rh_cls,
prq->eval,
pr,
+ prq->anonymity_level,
prq->expiration,
prq->type,
prq->data, prq->size);
pr->rh (pr->rh_cls,
prq->eval,
pr,
+ prq->anonymity_level,
prq->expiration,
prq->type,
prq->data, prq->size);
prq.type = type;
prq.expiration = expiration;
prq.priority = 0;
- prq.anonymity_level = 1;
+ prq.anonymity_level = UINT32_MAX;
prq.request_found = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
&query,
* @param cls user-specified closure
* @param eval evaluation of the result
* @param pr handle to the original pending request
+ * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
* @param expiration when does 'data' expire?
* @param type type of the block
* @param data response data, NULL on request expiration
typedef void (*GSF_PendingRequestReplyHandler)(void *cls,
enum GNUNET_BLOCK_EvaluationResult eval,
struct GSF_PendingRequest *pr,
+ uint32_t reply_anonymity_level,
struct GNUNET_TIME_Absolute expiration,
enum GNUNET_BLOCK_Type type,
const void *data,
*/
static struct StatValues stats[] =
{
+ { "fs", "# artificial delays introduced (ms)"},
{ "fs", "# queries forwarded"},
{ "fs", "# replies received and matched"},
{ "fs", "# results found locally"},
{ "fs", "# P2P searches discarded (queue length bound)"},
{ "fs", "# replies received for local clients"},
{ "fs", "# queries retransmitted to same target"},
- { "fs", "cummulative artificial delay introduced (ms)"},
{ "core", "# bytes decrypted"},
{ "core", "# bytes encrypted"},
{ "core", "# discarded CORE_SEND requests"},
/*
This file is part of GNUnet.
- (C) 2010,2011 Christian Grothoff (and other contributing authors)
+ (C) 2010, 2011 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
* @author Christian Grothoff
*
* Sample output:
- * - 10 MB, 3 peers:
- * Download speed of type `seeder 1' was 6 MiB/s
- * Download speed of type `seeder 2' was 6 MiB/s
- * Download speed of type `leach` was 1051 KiB/s
+ * - 10 MB, 3 peers, with delays:
+ * Download speed of type `seeder 1' was 757 KiB/s
+ * Download speed of type `seeder 2' was 613 KiB/s
+ * Download speed of type `leach` was 539 KiB/s
*
- * Analysis: leach squeezed out entirely early, then gets to
- * take its turn once the main downloads complete
+ * - 10 MB, 3 peers, without delays:
+ * Download speed of type `seeder 1' was 1784 KiB/s
+ * Download speed of type `seeder 2' was 1604 KiB/s
+ * Download speed of type `leach` was 1384 KiB/s
*/
#include "platform.h"
#include "fs_test_lib.h"
*/
static struct StatValues stats[] =
{
+ { "fs", "# artificial delays introduced (ms)"},
{ "fs", "# queries forwarded"},
{ "fs", "# replies received and matched"},
{ "fs", "# results found locally"},
{ "fs", "# P2P searches discarded (queue length bound)"},
{ "fs", "# replies received for local clients"},
{ "fs", "# queries retransmitted to same target"},
- { "fs", "cummulative artificial delay introduced (ms)"},
{ "core", "# bytes decrypted"},
{ "core", "# bytes encrypted"},
{ "core", "# discarded CORE_SEND requests"},
- { "core", "# discarded CORE_SEND request bytes"},
{ "core", "# discarded lower priority CORE_SEND requests"},
- { "core", "# discarded lower priority CORE_SEND request bytes"},
{ "transport", "# bytes received via TCP"},
{ "transport", "# bytes transmitted via TCP"},
{ "datacache", "# bytes stored"},
VERBOSE,
&do_report, "leach");
/* mutual downloads of (primary) sharing peers */
- GNUNET_FS_TEST_download (daemons[NUM_DAEMONS-1],
+ GNUNET_FS_TEST_download (daemons[NUM_DAEMONS-2],
TIMEOUT,
anonymity, SEED1, uri1,
VERBOSE,
&do_report, "seeder 2");
- GNUNET_FS_TEST_download (daemons[NUM_DAEMONS-2],
+ GNUNET_FS_TEST_download (daemons[NUM_DAEMONS-1],
TIMEOUT,
anonymity, SEED2, uri2,
VERBOSE,