* @author Christian Grothoff
*
* POST-TESTING:
- * - revisit API (which arguments are used, needed)?
- * - add code to re-transmit key if first attempt failed
- * + timeout on connect / key exchange, etc.
- * + timeout for automatic re-try, etc.
- * - add code to give up re-transmission of key if many attempts fail
- * - add code to send PINGs if we are about to time-out otherwise
- * ? add heuristic to do another send_key in "handle_set_key"
- * in case previous attempt failed / didn't work / persist
- * (but don't do it always to avoid storm of SET_KEY's going
- * back and forth!) --- alternatively, add "status" field
- * of the other peer to the set key message, that way we'd
- * know for sure!
- * - check that hostkey used by transport (for HELLOs) is the
- * same as the hostkey that we are using!
* - topology management:
* + bootstrapping (transport offer hello, plugins)
* + internal neighbour selection
- * + update bandwidth usage statistics
- * + bandwidth allocation (transport set quota)
+ *
+ * Considerations for later:
+ * - check that hostkey used by transport (for HELLOs) is the
+ * same as the hostkey that we are using!
+ * - add code to send PINGs if we are about to time-out otherwise
* - optimize lookup (many O(n) list traversals
* could ideally be changed to O(1) hash map lookups)
*/
#include "platform.h"
+#include "gnunet_constants.h"
#include "gnunet_util_lib.h"
#include "gnunet_hello_lib.h"
#include "gnunet_peerinfo_service.h"
*/
#define MAX_WINDOW_TIME (5 * 60 * 1000)
-
/**
- * Amount of bytes per minute (in/out) to assume initially
- * (before either peer has communicated any particular
- * preference). Should be rather low.
+ * Minimum of bytes per minute (out) to assign to any connected peer.
+ * Should be rather low; values larger than DEFAULT_BPM_IN_OUT make no
+ * sense.
*/
-#define DEFAULT_BPM_IN_OUT 2048
+#define MIN_BPM_PER_PEER GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT
+/**
+ * What is the smallest change (in number of bytes per minute)
+ * that we consider significant enough to bother triggering?
+ */
+#define MIN_BPM_CHANGE 32
/**
* After how much time past the "official" expiration time do
*/
#define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS
-
/**
* What is the maximum delay for a SET_KEY message?
*/
#define MAX_SET_KEY_DELAY GNUNET_TIME_UNIT_SECONDS
-
/**
* What how long do we wait for SET_KEY confirmation initially?
*/
#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
-
/**
* What is the maximum delay for a PING message?
*/
#define MAX_PING_DELAY GNUNET_TIME_UNIT_SECONDS
-
/**
* What is the maximum delay for a PONG message?
*/
#define MAX_PONG_DELAY GNUNET_TIME_UNIT_SECONDS
+/**
+ * How often do we recalculate bandwidth quotas?
+ */
+#define QUOTA_UPDATE_FREQUENCY GNUNET_TIME_UNIT_SECONDS
/**
* What is the priority for a SET_KEY message?
*/
#define SET_KEY_PRIORITY 0xFFFFFF
-
/**
* What is the priority for a PING message?
*/
#define PING_PRIORITY 0xFFFFFF
-
/**
* What is the priority for a PONG message?
*/
#define PONG_PRIORITY 0xFFFFFF
-
/**
* How many messages do we queue per peer at most?
*/
#define MAX_PEER_QUEUE_SIZE 16
-
/**
* How many non-mandatory messages do we queue per client at most?
*/
#define MAX_CLIENT_QUEUE_SIZE 32
-
/**
* What is the maximum age of a message for us to consider
* processing it? Note that this looks at the timestamp used
*/
#define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS
-
/**
* What is the maximum size for encrypted messages? Note that this
* number imposes a clear limit on the maximum size of any message.
*/
#define ENCRYPTED_HEADER_SIZE (sizeof(struct GNUNET_MessageHeader) + sizeof(uint32_t) + sizeof(GNUNET_HashCode))
+
/**
* Encapsulation for encrypted messages exchanged between
* peers. Followed by the actual encrypted data.
*/
GNUNET_SCHEDULER_TaskIdentifier retry_set_key_task;
+ /**
+ * ID of task used for updating bandwidth quota for this neighbour.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier quota_update_task;
+
/**
* At what time did we generate our encryption key?
*/
struct GNUNET_TIME_Relative last_latency;
/**
- * At what frequency are we currently re-trying SET KEY messages?
+ * At what frequency are we currently re-trying SET_KEY messages?
*/
struct GNUNET_TIME_Relative set_key_retry_frequency;
* bandwidth-hogs are sampled at a frequency of about 78s!);
* may get negative if we have VERY high priority content.
*/
- long long available_send_window;
+ long long available_send_window;
/**
* How much downstream capacity of this peer has been reserved for
* make sure that this reserved amount of bandwidth is actually
* available).
*/
- long long available_recv_window;
+ long long available_recv_window;
/**
* How valueable were the messages of this peer recently?
*/
- double current_preference;
+ unsigned long long current_preference;
/**
* Bit map indicating which of the 32 sequence numbers before the last
/**
* Internal bandwidth limit set for this peer (initially
- * typcially set to "-1"). "bpm_out" is MAX of
+ * typically set to "-1"). "bpm_out" is MAX of
* "bpm_out_internal_limit" and "bpm_out_external_limit".
*/
uint32_t bpm_out_internal_limit;
uint32_t bpm_out_external_limit;
/**
- * What was our PING challenge number?
+ * What was our PING challenge number (for this peer)?
*/
uint32_t ping_challenge;
*/
static struct GNUNET_TRANSPORT_Handle *transport;
+/**
+ * Linked list of our clients.
+ */
+static struct Client *clients;
+
/**
* We keep neighbours in a linked list (for now).
*/
static struct Neighbour *neighbours;
/**
- * Linked list of our clients.
+ * Sum of all preferences among all neighbours.
*/
-static struct Client *clients;
+static unsigned long long preference_sum;
+
+/**
+ * Total number of neighbours we have.
+ */
+static unsigned int neighbour_count;
+
+/**
+ * How much inbound bandwidth are we supposed to be using?
+ */
+static unsigned long long bandwidth_target_in;
+
+/**
+ * How much outbound bandwidth are we supposed to be using?
+ */
+static unsigned long long bandwidth_target_out;
+
+
+
+/**
+ * A preference value for a neighbour was update. Update
+ * the preference sum accordingly.
+ *
+ * @param inc how much was a preference value increased?
+ */
+static void
+update_preference_sum (unsigned long long inc)
+{
+ struct Neighbour *n;
+ unsigned long long os;
+
+ os = preference_sum;
+ preference_sum += inc;
+ if (preference_sum >= os)
+ return; /* done! */
+ /* overflow! compensate by cutting all values in half! */
+ preference_sum = 0;
+ n = neighbours;
+ while (n != NULL)
+ {
+ n->current_preference /= 2;
+ preference_sum += n->current_preference;
+ n = n->next;
+ }
+}
/**
* Recalculate the number of bytes we expect to
* receive or transmit in a given window.
*
+ * @param force force an update now (even if not much time has passed)
* @param window pointer to the byte counter (updated)
* @param ts pointer to the timestamp (updated)
* @param bpm number of bytes per minute that should
* be added to the window.
*/
static void
-update_window (long long *window,
+update_window (int force,
+ long long *window,
struct GNUNET_TIME_Absolute *ts, unsigned int bpm)
{
struct GNUNET_TIME_Relative since;
since = GNUNET_TIME_absolute_get_duration (*ts);
- if (since.value < 60 * 1000)
+ if ( (force == GNUNET_NO) &&
+ (since.value < 60 * 1000) )
return; /* not even a minute has passed */
*ts = GNUNET_TIME_absolute_get ();
*window += (bpm * since.value) / 60 / 1000;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending `%s' message to client.\n", "NOTIFY_CONNECT");
#endif
- cnm.bpm_available = htonl (n->bpm_out);
- cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
+ cnm.reserved = htonl (0);
cnm.peer = n->peer;
send_to_client (c, &cnm.header, GNUNET_NO);
n = n->next;
struct ConfigurationInfoMessage cim;
struct Client *c;
int reserv;
+ unsigned long long old_preference;
#if DEBUG_CORE_CLIENT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
memset (&cim, 0, sizeof (cim));
if ((n != NULL) && (n->status == PEER_STATE_KEY_CONFIRMED))
{
+ update_window (GNUNET_YES,
+ &n->available_send_window,
+ &n->last_asw_update,
+ n->bpm_out);
n->bpm_out_internal_limit = ntohl (rcm->limit_outbound_bpm);
n->bpm_out = GNUNET_MAX (n->bpm_out_internal_limit,
n->bpm_out_external_limit);
}
else if (reserv > 0)
{
- update_window (&n->available_recv_window,
+ update_window (GNUNET_NO,
+ &n->available_recv_window,
&n->last_arw_update, n->bpm_in);
if (n->available_recv_window < reserv)
reserv = n->available_recv_window;
n->available_recv_window -= reserv;
}
- n->current_preference += rcm->preference_change;
- if (n->current_preference < 0)
- n->current_preference = 0;
+ old_preference = n->current_preference;
+ n->current_preference += GNUNET_ntohll(rcm->preference_change);
+ if (old_preference > n->current_preference)
+ {
+ /* overflow; cap at maximum value */
+ n->current_preference = (unsigned long long) -1;
+ }
+ update_preference_sum (n->current_preference - old_preference);
cim.reserved_amount = htonl (reserv);
cim.bpm_in = htonl (n->bpm_in);
cim.bpm_out = htonl (n->bpm_out);
GNUNET_assert (size >= m->size);
memcpy (cbuf, &m[1], m->size);
ret = m->size;
+ n->available_send_window -= m->size;
process_encrypted_neighbour_queue (n);
#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
n->th =
GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer,
n->encrypted_head->size,
+ n->encrypted_head->priority,
GNUNET_TIME_absolute_get_remaining
(n->encrypted_head->deadline),
¬ify_encrypted_transmit_ready,
min = NULL;
min_prio = -1;
discard_low_prio = GNUNET_NO;
- /* number of bytes available for transmission at time "t" */
+ /* calculate number of bytes available for transmission at time "t" */
+ update_window (GNUNET_NO,
+ &n->available_send_window,
+ &n->last_asw_update,
+ n->bpm_out);
avail = n->available_send_window;
t = n->last_asw_update;
- /* how many bytes have we (hyptothetically) scheduled so far */
+ /* how many bytes have we (hypothetically) scheduled so far */
off = 0;
/* maximum time we can wait before transmitting anything
and still make all of our deadlines */
}
/* guard against sending "tiny" messages with large headers without
urgent deadlines */
- if ((slack > 1000) && (size > 4 * off))
+ if ( (slack > 1000) && (size > 4 * off) )
{
/* less than 25% of message would be filled with
deadlines still being met if we delay by one
struct MessageEntry *prev;
struct MessageEntry *next;
struct MessageEntry *pos;
- struct GNUNET_TIME_Absolute cutoff;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_TIME_Relative delta;
- cutoff = GNUNET_TIME_relative_to_absolute(PAST_EXPIRATION_DISCARD_TIME);
+ now = GNUNET_TIME_absolute_get ();
prev = NULL;
pos = n->messages;
while (pos != NULL)
{
next = pos->next;
- if (pos->deadline.value < cutoff.value)
+ delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now);
+ if (delta.value > PAST_EXPIRATION_DISCARD_TIME.value)
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Message is %llu ms past due, discarding.\n",
- cutoff.value - pos->deadline.value);
+ delta.value);
+#endif
if (prev == NULL)
n->messages = next;
else
"Core received `%s' request for `%4s', will try to establish connection within %llu ms\n",
"SEND",
GNUNET_i2s (&sm->peer),
- sm->deadline.value);
+ GNUNET_TIME_absolute_get_remaining
+ (GNUNET_TIME_absolute_ntoh(sm->deadline)).value);
#endif
msize += sizeof (struct SendMessage);
/* ask transport to connect to the peer */
if (NULL ==
GNUNET_TRANSPORT_notify_transmit_ready (transport,
&sm->peer,
- 0,
+ 0, 0,
GNUNET_TIME_absolute_get_remaining
(GNUNET_TIME_absolute_ntoh
(sm->deadline)),
#endif
if (n->public_key == NULL)
{
+#if DEBUG_CORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Lacking public key for peer, trying to obtain one.\n");
+#endif
m_cpy = GNUNET_malloc (sizeof (struct SetKeyMessage));
memcpy (m_cpy, m, sizeof (struct SetKeyMessage));
/* lookup n's public key, then try again */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Core service receives `%s' request from `%4s'.\n",
"ENCRYPTED_MESSAGE", GNUNET_i2s (&n->peer));
-#endif
+#endif
/* decrypt */
if (GNUNET_OK !=
do_decrypt (n,
}
/* process decrypted message(s) */
+ update_window (GNUNET_YES,
+ &n->available_send_window,
+ &n->last_asw_update,
+ n->bpm_out);
n->bpm_out_external_limit = ntohl (pt->inbound_bpm_limit);
n->bpm_out = GNUNET_MAX (n->bpm_out_external_limit,
n->bpm_out_internal_limit);
}
+/**
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ *
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Schedule the task that will recalculate the bandwidth
+ * quota for this peer (and possibly force a disconnect of
+ * idle peers by calculating a bandwidth of zero).
+ */
+static void
+schedule_quota_update (struct Neighbour *n)
+{
+ GNUNET_assert (n->quota_update_task ==
+ GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
+ n->quota_update_task
+ = GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_NO,
+ GNUNET_SCHEDULER_PRIORITY_IDLE,
+ GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
+ QUOTA_UPDATE_FREQUENCY,
+ &neighbour_quota_update,
+ n);
+}
+
+
+/**
+ * Function that recalculates the bandwidth quota for the
+ * given neighbour and transmits it to the transport service.
+ *
+ * @param cls neighbour for the quota update
+ * @param tc context
+ */
+static void
+neighbour_quota_update (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct Neighbour *n = cls;
+ uint32_t q_in;
+ double pref_rel;
+ double share;
+ unsigned long long distributable;
+
+ n->quota_update_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
+ /* calculate relative preference among all neighbours;
+ divides by a bit more to avoid division by zero AND to
+ account for possibility of new neighbours joining any time
+ AND to convert to double... */
+ pref_rel = n->current_preference / (1.0 + preference_sum);
+ share = 0;
+ distributable = 0;
+ if (bandwidth_target_out > neighbour_count * MIN_BPM_PER_PEER)
+ distributable = bandwidth_target_out - neighbour_count * MIN_BPM_PER_PEER;
+ share = distributable * pref_rel;
+ q_in = MIN_BPM_PER_PEER + (unsigned long long) share;
+ /* check if we want to disconnect for good due to inactivity */
+ if ( (GNUNET_TIME_absolute_get_duration (n->last_activity).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) &&
+ (GNUNET_TIME_absolute_get_duration (n->time_established).value > GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.value) )
+ q_in = 0; /* force disconnect */
+ if ( (n->bpm_in + MIN_BPM_CHANGE < q_in) ||
+ (n->bpm_in - MIN_BPM_CHANGE > q_in) )
+ {
+ n->bpm_in = q_in;
+ GNUNET_TRANSPORT_set_quota (transport,
+ &n->peer,
+ n->bpm_in,
+ n->bpm_out,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ NULL, NULL);
+ }
+ schedule_quota_update (n);
+}
+
+
/**
* Function called by transport to notify us that
* a peer connected to us (on the network level).
n = GNUNET_malloc (sizeof (struct Neighbour));
n->next = neighbours;
neighbours = n;
+ neighbour_count++;
n->peer = *peer;
n->last_latency = latency;
GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key);
n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY;
n->last_asw_update = now;
n->last_arw_update = now;
- n->bpm_in = DEFAULT_BPM_IN_OUT;
- n->bpm_out = DEFAULT_BPM_IN_OUT;
+ n->bpm_in = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
+ n->bpm_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
n->bpm_out_internal_limit = (uint32_t) - 1;
- n->bpm_out_external_limit = DEFAULT_BPM_IN_OUT;
+ n->bpm_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
n->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
(uint32_t) - 1);
#if DEBUG_CORE
"Received connection from `%4s'.\n",
GNUNET_i2s (&n->peer));
#endif
+ schedule_quota_update (n);
cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT);
- cnm.bpm_available = htonl (DEFAULT_BPM_IN_OUT);
+ cnm.reserved = htonl (0);
cnm.peer = *peer;
- cnm.last_activity = GNUNET_TIME_absolute_hton (now);
send_to_all_clients (&cnm.header, GNUNET_YES);
}
/**
* Free the given entry for the neighbour (it has
* already been removed from the list at this point).
+ *
* @param n neighbour to free
*/
static void
GNUNET_SCHEDULER_cancel (sched, n->retry_plaintext_task);
if (n->retry_set_key_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
GNUNET_SCHEDULER_cancel (sched, n->retry_set_key_task);
+ if (n->quota_update_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
+ GNUNET_SCHEDULER_cancel (sched, n->quota_update_task);
GNUNET_free_non_null (n->public_key);
GNUNET_free_non_null (n->pending_ping);
GNUNET_free (n);
neighbours = n->next;
else
p->next = n->next;
+ GNUNET_assert (neighbour_count > 0);
+ neighbour_count--;
cnm.header.size = htons (sizeof (struct ConnectNotifyMessage));
cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT);
- cnm.bpm_available = htonl (0);
+ cnm.reserved = htonl (0);
cnm.peer = *peer;
- cnm.last_activity = GNUNET_TIME_absolute_hton (n->last_activity);
send_to_all_clients (&cnm.header, GNUNET_YES);
free_neighbour (n);
}
while (NULL != (n = neighbours))
{
neighbours = n->next;
+ GNUNET_assert (neighbour_count > 0);
+ neighbour_count--;
free_neighbour (n);
}
while (NULL != (c = clients))
cfg = c;
/* parse configuration */
if (
-#if 0
(GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
"CORE",
- "XX",
- &qin)) ||
+ "TOTAL_QUOTA_IN",
+ &bandwidth_target_in)) ||
+ (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (c,
+ "CORE",
+ "TOTAL_QUOTA_OUT",
+ &bandwidth_target_out)) ||
+#if 0
(GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
"CORE",
static void
cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg)
{
-
-
if (my_private_key != NULL)
GNUNET_CRYPTO_rsa_key_free (my_private_key);
}