* How often must a peer violate bandwidth quotas before we start
* to simply drop its messages?
*/
-#define QUOTA_VIOLATION_DROP_THRESHOLD 100
+#define QUOTA_VIOLATION_DROP_THRESHOLD 10
/**
* How long until a HELLO verification attempt should time out?
*/
#define TRANSPORT_DEFAULT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
+/**
+ * How often will we re-validate for latency information
+ */
+#define TRANSPORT_DEFAULT_REVALIDATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
+
/**
* Priority to use for PONG messages.
*/
*/
struct GNUNET_TIME_Absolute expires;
+ /**
+ * Task used to re-validate addresses, updates latencies and
+ * verifies liveness.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier revalidate_task;
+
/**
* Length of addr.
*/
struct GNUNET_TIME_Absolute peer_timeout;
/**
- * At what time did we reset last_received last?
+ * Tracker for inbound bandwidth.
*/
- struct GNUNET_TIME_Absolute last_quota_update;
+ struct GNUNET_BANDWIDTH_Tracker in_tracker;
/**
* The latency we have seen for this particular address for
*/
struct GNUNET_TIME_Relative latency;
- /**
- * DV distance to this peer (1 if no DV is used).
- */
- uint32_t distance;
-
- /**
- * How many bytes have we received since the "last_quota_update"
- * timestamp?
- */
- uint64_t last_received;
-
- /**
- * Global quota for inbound traffic for the neighbour in bytes/ms.
- */
- uint32_t quota_in;
-
/**
* How often has the other peer (recently) violated the
* inbound traffic limit? Incremented by 10 per violation,
*/
unsigned int quota_violation_count;
+ /**
+ * DV distance to this peer (1 if no DV is used).
+ */
+ uint32_t distance;
+
/**
* Have we seen an PONG from this neighbour in the past (and
* not had a disconnect since)?
/**
* Message used to ask a peer to validate receipt (to check an address
- * from a HELLO). Followed by the address used. Note that the
- * recipients response does not affirm that he has this address,
- * only that he got the challenge message.
+ * from a HELLO).
*/
struct TransportPingMessage
{
};
+/**
+ * Struct for keeping information about addresses to validate
+ * so that we can re-use for sending around ping's and receiving
+ * pongs periodically to keep connections alive and also better
+ * estimate latency of connections.
+ *
+ */
+struct PeriodicValidationContext
+{
+
+ /**
+ * The address we are keeping alive
+ */
+ struct ForeignAddressList *foreign_address;
+
+ /**
+ * The name of the transport
+ */
+ char *transport;
+
+ /**
+ * Public Key of the peer to re-validate
+ */
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded publicKey;
+
+};
/**
* Our HELLO message.
*/
static struct CheckHelloValidatedContext *chvc_tail;
-
/**
* Map of PeerIdentities to 'struct ValidationEntry*'s (addresses
* of the given peer that we are currently validating).
*/
static struct GNUNET_CONTAINER_MultiHashMap *validation_map;
+/**
+ * Handle for reporting statistics.
+ */
+static struct GNUNET_STATISTICS_Handle *stats;
+
/**
* The peer specified by the given neighbour has timed-out or a plugin
* if sender_address is not specified (NULL) then return the
* first matching entry. If sender_address is specified, then
* make sure that the address and address_len also matches.
- *
+ *
+ * FIXME: This description does not fit the function.
+ *
* @return NULL if not found.
*/
static struct NeighbourList *
}
-/**
- * Update the quota values for the given neighbour now.
- */
-static void
-update_quota (struct NeighbourList *n)
-{
- struct GNUNET_TIME_Relative delta;
- uint64_t allowed;
- uint64_t remaining;
-
- delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
- if (delta.value < MIN_QUOTA_REFRESH_TIME)
- return; /* not enough time passed for doing quota update */
- allowed = delta.value * n->quota_in;
-
- if (n->last_received < allowed)
- {
- remaining = allowed - n->last_received;
- if (n->quota_in > 0)
- remaining /= n->quota_in;
- else
- remaining = 0;
- if (remaining > MAX_BANDWIDTH_CARRY)
- remaining = MAX_BANDWIDTH_CARRY;
- n->last_received = 0;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->last_quota_update.value -= remaining;
- if (n->quota_violation_count > 0)
- n->quota_violation_count--;
- }
- else
- {
- n->last_received -= allowed;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- if (n->last_received > allowed)
- {
- /* more than twice the allowed rate! */
- n->quota_violation_count += 10;
- }
- }
-}
-
-
/**
* Function called to notify a client about the socket being ready to
* queue more data. "buf" will be NULL and "size" zero if the socket
/* fatal error with client, free message queue! */
while (NULL != (q = client->message_queue_head))
{
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded (could not transmit to client)"),
+ ntohs (((const struct GNUNET_MessageHeader*)&q[1])->size),
+ GNUNET_NO);
GNUNET_CONTAINER_DLL_remove (client->message_queue_head,
client->message_queue_tail,
q);
struct MessageQueue *mq = cls;
struct NeighbourList *n;
+ if (result == GNUNET_OK)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes successfully transmitted by plugins"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ }
+ else
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes with transmission failure by plugins"),
+ mq->message_buf_size,
+ GNUNET_NO);
+ }
n = find_neighbour(&mq->neighbour_id);
GNUNET_assert (n != NULL);
if (mq->specific_address != NULL)
transmit_send_ok (mq->client, n, result);
GNUNET_free (mq);
try_transmission_to_peer (n);
- if (result != GNUNET_OK)
- disconnect_neighbour (n, GNUNET_YES);
}
static void
try_transmission_to_peer (struct NeighbourList *neighbour)
{
- struct GNUNET_TIME_Relative min_latency;
struct ReadyList *rl;
struct MessageQueue *mq;
struct GNUNET_TIME_Relative timeout;
if (neighbour->messages_head == NULL)
return; /* nothing to do */
- min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
rl = NULL;
mq = neighbour->messages_head;
/* FIXME: support bi-directional use of TCP */
mq->message_buf_size,
GNUNET_i2s (&mq->neighbour_id));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ -mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded (no destination address available)"),
+ mq->message_buf_size,
+ GNUNET_NO);
if (mq->client != NULL)
transmit_send_ok (mq->client, neighbour, GNUNET_NO);
GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
GNUNET_i2s (&mq->neighbour_id),
timeout.value);
#endif
+ /* FIXME: might want to trigger peerinfo lookup here
+ (unless that's already pending...) */
return;
}
GNUNET_CONTAINER_DLL_remove (neighbour->messages_head,
mq->specific_address->addrlen),
rl->plugin->short_name);
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ -mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes transmitted to other peers"),
+ mq->message_buf_size,
+ GNUNET_NO);
rl->plugin->api->send (rl->plugin->api->cls,
&mq->neighbour_id,
mq->message_buf,
if (mq->client == client)
{
/* client transmitted to same peer twice
- before getting SendOk! */
+ before getting SEND_OK! */
GNUNET_break (0);
return;
}
}
}
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ message_buf_size,
+ GNUNET_NO);
mq = GNUNET_malloc (sizeof (struct MessageQueue) + message_buf_size);
mq->specific_address = peer_address;
mq->client = client;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
"Refreshed my `%s', new size is %d\n", "HELLO", GNUNET_HELLO_size(hello));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# refreshed my HELLO"),
+ 1,
+ GNUNET_NO);
cpos = clients;
while (cpos != NULL)
{
"Transmitting updated `%s' to neighbour `%4s'\n",
"HELLO", GNUNET_i2s (&npos->id));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# transmitted my HELLO to other peers"),
+ 1,
+ GNUNET_NO);
transmit_to_peer (NULL, NULL, 0,
HELLO_ADDRESS_EXPIRATION,
(const char *) our_hello,
"Notifying clients about connection from `%s'\n",
GNUNET_i2s (peer));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peers connected"),
+ 1,
+ GNUNET_NO);
cim.header.size = htons (sizeof (struct ConnectInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
cim.distance = htonl (distance);
"Notifying clients about lost connection to `%s'\n",
GNUNET_i2s (peer));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peers connected"),
+ -1,
+ GNUNET_NO);
dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
dim.reserved = htonl (0);
}
+static void send_periodic_ping(void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
/**
- * Iterator over hash map entries. Checks if the given
- * validation entry is for the same challenge as what
- * is given in the PONG.
+ * Iterator over hash map entries. Checks if the given validation
+ * entry is for the same challenge as what is given in the PONG.
*
* @param cls the 'struct TransportPongMessage*'
- * @param key peer identity
+ * @param key peer identity
* @param value value in the hash map ('struct ValidationEntry')
* @return GNUNET_YES if we should continue to
* iterate (mismatch), GNUNET_NO if not (entry matched)
struct GNUNET_PeerIdentity target;
struct NeighbourList *n;
struct ForeignAddressList *fal;
+ struct PeriodicValidationContext *periodic_validation_context;
if (ve->challenge != challenge)
return GNUNET_YES;
-
+
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Confirmed validity of address, peer `%4s' has address `%s' (%s).\n",
ve->addrlen),
ve->transport_name);
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# address validation successes"),
+ 1,
+ GNUNET_NO);
/* create the updated HELLO */
GNUNET_CRYPTO_hash (&ve->publicKey,
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
&add_validated_address,
&avac);
GNUNET_PEERINFO_add_peer (cfg, sched,
- &target,
+ &target,
hello);
GNUNET_free (hello);
n = find_neighbour (&target);
if (n != NULL)
{
- fal = add_peer_address (n, ve->transport_name,
+ fal = add_peer_address (n,
+ ve->transport_name,
ve->addr,
ve->addrlen);
+ GNUNET_assert (fal != NULL);
fal->expires = GNUNET_TIME_relative_to_absolute (HELLO_ADDRESS_EXPIRATION);
fal->validated = GNUNET_YES;
fal->latency = GNUNET_TIME_absolute_get_duration (ve->send_time);
+ periodic_validation_context = GNUNET_malloc(sizeof(struct PeriodicValidationContext));
+ periodic_validation_context->foreign_address = fal;
+ periodic_validation_context->transport = strdup(ve->transport_name);
+ memcpy(&periodic_validation_context->publicKey,
+ &ve->publicKey,
+ sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+ /* FIXME: this causes all of the revalidation PINGs for the same HELLO
+ to be transmitted in bulk, which is not nice; also,
+ triggering these HERE means that revalidations do NOT happen AT ALL
+ for HELLOs a previous instance of this process validated (since
+ there is no "initial" validation PING => no revalidation => BUG! */
+ fal->revalidate_task = GNUNET_SCHEDULER_add_delayed(sched,
+ TRANSPORT_DEFAULT_REVALIDATION,
+ &send_periodic_ping,
+ periodic_validation_context);
if (n->latency.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
n->latency = fal->latency;
else
{
GNUNET_SCHEDULER_cancel (sched,
n->retry_task);
- n->retry_task = GNUNET_SCHEDULER_NO_TASK;
+ n->retry_task = GNUNET_SCHEDULER_NO_TASK;
try_transmission_to_peer (n);
}
}
* (otherwise we may be seeing a MiM attack).
*
* @param cls closure
- * @param name name of the transport that generated the address
+ * @param message the pong message
* @param peer who responded to our challenge
- * @param challenge the challenge number we presumably used
- * @param sender_addr string describing our sender address (as observed
- * by the other peer in human-readable format)
+ * @param sender_address string describing our sender address (as observed
+ * by the other peer in binary format)
+ * @param sender_address_len number of bytes in 'sender_address'
*/
static void
handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
"Receiving `%s' message from `%4s'.\n", "PONG",
GNUNET_i2s (peer));
#endif
- if (GNUNET_SYSERR !=
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PONG messages received"),
+ 1,
+ GNUNET_NO);
+ if (GNUNET_SYSERR !=
GNUNET_CONTAINER_multihashmap_get_multiple (validation_map,
&peer->hashPubKey,
&check_pending_validation,
#endif
return;
}
-
+
#if 0
/* FIXME: add given address to potential pool of our addresses
(for voting) */
GNUNET_log (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
_("Another peer saw us using the address `%s' via `%s'.\n"),
GNUNET_a2s ((const struct sockaddr *) &pong[1],
- ntohs(pong->addrlen)),
- va->transport_name);
+ ntohs(pong->addrlen)),
+ va->transport_name);
#endif
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
"Neighbour `%4s' has timed out!\n", GNUNET_i2s (&n->id));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# disconnects due to timeout"),
+ 1,
+ GNUNET_NO);
n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
disconnect_neighbour (n, GNUNET_NO);
}
n->next = neighbours;
neighbours = n;
n->id = *peer;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
n->peer_timeout =
GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- n->quota_in = (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
+ GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
+ GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
+ MAX_BANDWIDTH_CARRY_S);
tp = plugins;
while (tp != NULL)
{
struct ValidationEntry *va = cls;
struct GNUNET_PeerIdentity pid;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# address validation timeouts"),
+ 1,
+ GNUNET_NO);
GNUNET_CRYPTO_hash (&va->publicKey,
sizeof (struct
GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
}
+/**
+ * Check if the given address is already being validated; if not,
+ * append the given address to the list of entries that are being be
+ * validated and initiate validation.
+ *
+ * @param cls closure ('struct PeriodicValidationContext *')
+ * @param tname name of the transport
+ * @param expiration expiration time
+ * @param addr the address
+ * @param addrlen length of the address
+ * @return GNUNET_OK (always)
+ */
+static int
+rerun_validation (void *cls,
+ const char *tname,
+ struct GNUNET_TIME_Absolute expiration,
+ const void *addr, size_t addrlen)
+{
+ struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey = cls;
+ struct GNUNET_PeerIdentity id;
+ struct TransportPlugin *tp;
+ struct ValidationEntry *va;
+ struct NeighbourList *neighbour;
+ struct ForeignAddressList *peer_address;
+ struct TransportPingMessage ping;
+ /*struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;*/
+ struct CheckAddressExistsClosure caec;
+ char * message_buf;
+ uint16_t hello_size;
+ size_t tsize;
+
+ tp = find_transport (tname);
+ if (tp == NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
+ tname);
+ return GNUNET_OK;
+ }
+
+ GNUNET_CRYPTO_hash (publicKey,
+ sizeof (struct
+ GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
+ &id.hashPubKey);
+ caec.addr = addr;
+ caec.addrlen = addrlen;
+ caec.tname = tname;
+ caec.exists = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_iterate (validation_map,
+ &check_address_exists,
+ &caec);
+ if (caec.exists == GNUNET_YES)
+ {
+ /* During validation attempts we will likely trigger the other
+ peer trying to validate our address which in turn will cause
+ it to send us its HELLO, so we expect to hit this case rather
+ frequently. Only print something if we are very verbose. */
+#if DEBUG_TRANSPORT > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Some validation of address `%s' via `%s' for peer `%4s' already in progress.\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id));
+#endif
+ return GNUNET_OK;
+ }
+ va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen);
+ va->transport_name = GNUNET_strdup (tname);
+ va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ (unsigned int) -1);
+ va->send_time = GNUNET_TIME_absolute_get();
+ va->addr = (const void*) &va[1];
+ memcpy (&va[1], addr, addrlen);
+ va->addrlen = addrlen;
+ memcpy(&va->publicKey, publicKey, sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded));
+ va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
+ HELLO_VERIFICATION_TIMEOUT,
+ &timeout_hello_validation,
+ va);
+ GNUNET_CONTAINER_multihashmap_put (validation_map,
+ &id.hashPubKey,
+ va,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ neighbour = find_neighbour(&id);
+ if (neighbour == NULL)
+ neighbour = setup_new_neighbour(&id);
+ peer_address = add_peer_address(neighbour, tname, addr, addrlen);
+ GNUNET_assert(peer_address != NULL);
+ hello_size = GNUNET_HELLO_size(our_hello);
+ tsize = sizeof(struct TransportPingMessage) + hello_size;
+ message_buf = GNUNET_malloc(tsize);
+ ping.challenge = htonl(va->challenge);
+ ping.header.size = htons(sizeof(struct TransportPingMessage));
+ ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
+ memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity));
+ memcpy(message_buf, our_hello, hello_size);
+ memcpy(&message_buf[hello_size],
+ &ping,
+ sizeof(struct TransportPingMessage));
+#if DEBUG_TRANSPORT_REVALIDATION
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Performing re-validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n",
+ GNUNET_a2s (addr, addrlen),
+ tname,
+ GNUNET_i2s (&id),
+ "HELLO", hello_size,
+ "PING", sizeof (struct TransportPingMessage));
+#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PING messages sent for re-validation"),
+ 1,
+ GNUNET_NO);
+ transmit_to_peer (NULL, peer_address,
+ GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+ HELLO_VERIFICATION_TIMEOUT,
+ message_buf, tsize,
+ GNUNET_YES, neighbour);
+ GNUNET_free(message_buf);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Send periodic ping messages to a give foreign address.
+ *
+ * cls closure, can be safely cast to ForeignAddressList
+ * tc task context
+ *
+ * FIXME: Since a _billion_ pongs are sent for every ping,
+ * maybe this should be a special message type or something
+ * that gets discarded on the other side instead of initiating
+ * a flood.
+ */
+static void
+send_periodic_ping (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeriodicValidationContext *periodic_validation_context = cls;
+
+ if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
+ {
+ GNUNET_free(periodic_validation_context->transport);
+ GNUNET_free(periodic_validation_context);
+ return; /* We have been shutdown, don't do anything! */
+ }
+ rerun_validation(&periodic_validation_context->publicKey,
+ periodic_validation_context->transport,
+ periodic_validation_context->foreign_address->expires,
+ periodic_validation_context->foreign_address->addr,
+ periodic_validation_context->foreign_address->addrlen);
+ GNUNET_free(periodic_validation_context->transport);
+ GNUNET_free(periodic_validation_context);
+}
+
+
/**
* Check if the given address is already being validated; if not,
* append the given address to the list of entries that are being be
uint16_t hello_size;
size_t tsize;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peer addresses scheduled for validation"),
+ 1,
+ GNUNET_NO);
tp = find_transport (tname);
if (tp == NULL)
{
_
("Transport `%s' not loaded, will not try to validate peer address using this transport.\n"),
tname);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peer addresses not validated (no applicable transport plugin available)"),
+ 1,
+ GNUNET_NO);
return GNUNET_OK;
}
GNUNET_HELLO_get_key (chvc->hello, &pk);
#if DEBUG_TRANSPORT > 1
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Validation of address `%s' via `%s' for peer `%4s' already in progress.\n",
- GNUNET_a2s (addr, addrlen),
- tname,
+ GNUNET_a2s (addr, addrlen),
+ tname,
GNUNET_i2s (&id));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# peer addresses not validated (in progress)"),
+ 1,
+ GNUNET_NO);
return GNUNET_OK;
- }
+ }
va = GNUNET_malloc (sizeof (struct ValidationEntry) + addrlen);
va->transport_name = GNUNET_strdup (tname);
va->challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
va->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
HELLO_VERIFICATION_TIMEOUT,
&timeout_hello_validation,
- va);
+ va);
GNUNET_CONTAINER_multihashmap_put (validation_map,
&id.hashPubKey,
va,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- neighbour = find_neighbour(&id);
+ neighbour = find_neighbour(&id);
if (neighbour == NULL)
neighbour = setup_new_neighbour(&id);
- peer_address = add_peer_address(neighbour, tname, addr, addrlen);
+ peer_address = add_peer_address(neighbour, tname, addr, addrlen);
GNUNET_assert(peer_address != NULL);
hello_size = GNUNET_HELLO_size(our_hello);
tsize = sizeof(struct TransportPingMessage) + hello_size;
ping.header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_PING);
memcpy(&ping.target, &id, sizeof(struct GNUNET_PeerIdentity));
memcpy(message_buf, our_hello, hello_size);
- memcpy(&message_buf[hello_size],
- &ping,
+ memcpy(&message_buf[hello_size],
+ &ping,
sizeof(struct TransportPingMessage));
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Performing validation of address `%s' via `%s' for peer `%4s' sending `%s' (%u bytes) and `%s' (%u bytes)\n",
- GNUNET_a2s (addr, addrlen),
- tname,
+ GNUNET_a2s (addr, addrlen),
+ tname,
GNUNET_i2s (&id),
"HELLO", hello_size,
"PING", sizeof (struct TransportPingMessage));
#endif
- transmit_to_peer (NULL, peer_address,
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PING messages sent for initial validation"),
+ 1,
+ GNUNET_NO);
+ transmit_to_peer (NULL, peer_address,
GNUNET_SCHEDULER_PRIORITY_DEFAULT,
HELLO_VERIFICATION_TIMEOUT,
- message_buf, tsize,
+ message_buf, tsize,
GNUNET_YES, neighbour);
GNUNET_free(message_buf);
return GNUNET_OK;
{
struct NeighbourList *n = cls;
struct ForeignAddressList *fal;
+ int try;
+ try = GNUNET_NO;
fal = find_peer_address (n, tname, addr, addrlen);
if (fal == NULL)
{
expiration.value);
#endif
fal = add_peer_address (n, tname, addr, addrlen);
+ try = GNUNET_YES;
}
if (fal == NULL)
return GNUNET_OK;
fal->expires = GNUNET_TIME_absolute_max (expiration,
fal->expires);
- fal->validated = GNUNET_YES;
+ fal->validated = GNUNET_YES;
+ if (try == GNUNET_YES)
+ try_transmission_to_peer (n);
return GNUNET_OK;
}
*
* @param cls closure
* @param peer id of the peer, NULL for last call
- * @param hello hello message for the peer (can be NULL)
+ * @param h hello message for the peer (can be NULL)
* @param trust amount of trust we have in the peer (not used)
*/
static void
"HELLO",
GNUNET_i2s (&target));
#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# new HELLOs requiring full validation"),
+ 1,
+ GNUNET_NO);
GNUNET_HELLO_iterate_addresses (chvc->hello,
GNUNET_NO,
&run_validation,
chvc);
}
+ else
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# duplicate HELLO (peer known)"),
+ 1,
+ GNUNET_NO);
+ }
GNUNET_free (chvc);
return;
- }
+ }
if (h == NULL)
return;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peerinfo had `%s' message for peer `%4s', validating only new addresses.\n",
+ "HELLO",
+ GNUNET_i2s (peer));
+#endif
chvc->hello_known = GNUNET_YES;
n = find_neighbour (peer);
if (n != NULL)
if (GNUNET_SCHEDULER_get_load (sched,
GNUNET_SCHEDULER_PRIORITY_BACKGROUND) > MAX_HELLO_LOAD)
{
- /* TODO: call to stats? */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLOs ignored due to high load"),
+ 1,
+ GNUNET_NO);
return GNUNET_OK;
}
hello = (const struct GNUNET_HELLO_Message *) message;
GNUNET_CRYPTO_hash (&publicKey,
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
&target.hashPubKey);
-#if DEBUG_TRANSPORT
+ if (0 == memcmp (&my_identity,
+ &target,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLOs ignored for validation (is my own HELLO)"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+#if DEBUG_TRANSPORT > 1
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Processing `%s' message for `%4s' of size %u\n",
"HELLO",
GNUNET_i2s (&target),
GNUNET_HELLO_size(hello));
#endif
-
chvc = GNUNET_malloc (sizeof (struct CheckHelloValidatedContext) + hsize);
chvc->hello = (const struct GNUNET_HELLO_Message *) &chvc[1];
memcpy (&chvc[1], hello, hsize);
/* free all messages on the queue */
while (NULL != (mq = n->messages_head))
{
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes in message queue for other peers"),
+ -mq->message_buf_size,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes discarded due to disconnect"),
+ mq->message_buf_size,
+ GNUNET_NO);
GNUNET_CONTAINER_DLL_remove (n->messages_head,
n->messages_tail,
mq);
struct TransportPlugin *plugin = cls;
struct TransportPingMessage *ping;
struct TransportPongMessage *pong;
- uint16_t msize;
struct NeighbourList *n;
struct ReadyList *rl;
struct ForeignAddressList *fal;
- msize = ntohs (message->size);
- if (msize < sizeof (struct TransportPingMessage))
+ if (ntohs (message->size) != sizeof (struct TransportPingMessage))
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
GNUNET_a2s ((const struct sockaddr *)sender_address,
sender_address_len));
#endif
- msize -= sizeof (struct TransportPingMessage);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# PING messages received"),
+ 1,
+ GNUNET_NO);
pong = GNUNET_malloc (sizeof (struct TransportPongMessage) + sender_address_len);
pong->header.size = htons (sizeof (struct TransportPongMessage) + sender_address_len);
pong->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_PONG);
* and generally forward to our receive callback.
*
* @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- * (opaque to transport service)
- * @param sender_address_len the length of the sender address
* @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- * for future receive calls for messages from this
- * particular peer
- *
- */
-static void
+ * @param message the message, NULL if we only care about
+ * learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ * (plugins that do not support this, can ignore the return value)
+ */
+static struct GNUNET_TIME_Relative
plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
const struct GNUNET_MessageHeader *message,
unsigned int distance, const char *sender_address,
struct ForeignAddressList *peer_address;
uint16_t msize;
struct NeighbourList *n;
+ struct GNUNET_TIME_Relative ret;
n = find_neighbour (peer);
if (n == NULL)
- {
- if (message == NULL)
- return; /* disconnect of peer already marked down */
- n = setup_new_neighbour (peer);
- }
+ n = setup_new_neighbour (peer);
service_context = n->plugins;
while ((service_context != NULL) && (plugin != service_context->plugin))
service_context = service_context->next;
GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
- if (message == NULL)
+ if (message != NULL)
{
+ peer_address = add_peer_address(n,
+ plugin->short_name,
+ sender_address,
+ sender_address_len);
+ if (peer_address != NULL)
+ {
+ peer_address->distance = distance;
+ if (peer_address->connected == GNUNET_NO)
+ {
+ peer_address->connected = GNUNET_YES;
+ peer_address->connect_attempts++;
+ }
+ peer_address->timeout
+ =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ }
+ /* update traffic received amount ... */
+ msize = ntohs (message->size);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bytes received from other peers"),
+ msize,
+ GNUNET_NO);
+ n->distance = distance;
+ n->peer_timeout =
+ GNUNET_TIME_relative_to_absolute
+ (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_SCHEDULER_cancel (sched,
+ n->timeout_task);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &neighbour_timeout_task, n);
+ if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+ {
+ /* dropping message due to frequent inbound volume violations! */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+ GNUNET_ERROR_TYPE_BULK,
+ _
+ ("Dropping incoming message due to repeated bandwidth quota (%u b/s) violations (total of %u).\n"),
+ n->in_tracker.available_bytes_per_s__,
+ n->quota_violation_count);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# bandwidth quota violations by other peers"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored (UDP...) */
+ }
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_HELLO:
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLO messages received from other peers"),
+ 1,
+ GNUNET_NO);
+ process_hello (plugin, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+ handle_ping(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+ handle_pong(plugin, message, peer, sender_address, sender_address_len);
+ break;
+ default:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Receive failed from `%4s', triggering disconnect\n",
- GNUNET_i2s (&n->id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s', sending to all clients.\n",
+ ntohs (message->type), GNUNET_i2s (peer));
#endif
- /* TODO: call stats */
- disconnect_neighbour (n, GNUNET_YES);
- return;
- }
- peer_address = add_peer_address(n,
- plugin->short_name,
- sender_address,
- sender_address_len);
- if (peer_address != NULL)
+ if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
+ msize))
+ n->quota_violation_count++;
+ else
+ n->quota_violation_count = 0; /* back within limits */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# payload received from other peers"),
+ msize,
+ GNUNET_NO);
+ /* transmit message to all clients */
+ im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+ im->header.size = htons (sizeof (struct InboundMessage) + msize);
+ im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+ im->latency = GNUNET_TIME_relative_hton (n->latency);
+ im->peer = *peer;
+ memcpy (&im[1], message, msize);
+ cpos = clients;
+ while (cpos != NULL)
+ {
+ transmit_to_client (cpos, &im->header, GNUNET_YES);
+ cpos = cpos->next;
+ }
+ GNUNET_free (im);
+ }
+ }
+ ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
+ if (ret.value > 0)
{
- peer_address->distance = distance;
- if (peer_address->connected == GNUNET_NO)
- {
- peer_address->connected = GNUNET_YES;
- peer_address->connect_attempts++;
- }
- peer_address->timeout
- =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- }
- /* update traffic received amount ... */
- msize = ntohs (message->size);
- n->last_received += msize;
- n->distance = distance;
- n->peer_timeout =
- GNUNET_TIME_relative_to_absolute
- (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
- GNUNET_SCHEDULER_cancel (sched,
- n->timeout_task);
- n->timeout_task =
- GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
- &neighbour_timeout_task, n);
- update_quota (n);
- if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
- {
- /* dropping message due to frequent inbound volume violations! */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
- GNUNET_ERROR_TYPE_BULK,
- _
- ("Dropping incoming message due to repeated bandwidth quota violations (total of %u).\n"), n->quota_violation_count);
- /* TODO: call stats */
- return;
- }
- switch (ntohs (message->type))
- {
- case GNUNET_MESSAGE_TYPE_HELLO:
- process_hello (plugin, message);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
- handle_ping(plugin, message, peer, sender_address, sender_address_len);
- break;
- case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
- handle_pong(plugin, message, peer, sender_address, sender_address_len);
- break;
- default:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from `%4s', sending to all clients.\n",
- ntohs (message->type), GNUNET_i2s (peer));
-#endif
- /* transmit message to all clients */
- im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
- im->header.size = htons (sizeof (struct InboundMessage) + msize);
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = GNUNET_TIME_relative_hton (n->latency);
- im->peer = *peer;
- memcpy (&im[1], message, msize);
- cpos = clients;
- while (cpos != NULL)
- {
- transmit_to_client (cpos, &im->header, GNUNET_YES);
- cpos = cpos->next;
- }
- GNUNET_free (im);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Throttling read (%llu bytes excess at %u b/s), waiting %llums before reading more.\n",
+ (unsigned long long) n->in_tracker.consumption_since_last_update__,
+ (unsigned int) n->in_tracker.available_bytes_per_s__,
+ (unsigned long long) ret.value);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# ms throttling suggested"),
+ (int64_t) ret.value,
+ GNUNET_NO);
}
+ return ret;
}
{
int ret;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client\n", "HELLO");
-#endif
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# HELLOs received from clients"),
+ 1,
+ GNUNET_NO);
ret = process_hello (NULL, message);
GNUNET_SERVER_receive_done (client, ret);
}
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# payload received for other peers"),
+ size,
+ GNUNET_NO);
obm = (const struct OutboundMessage *) message;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
const struct QuotaSetMessage *qsm =
(const struct QuotaSetMessage *) message;
struct NeighbourList *n;
- struct TransportPlugin *p;
- struct ReadyList *rl;
-
+
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# SET QUOTA messages received"),
+ 1,
+ GNUNET_NO);
n = find_neighbour (&qsm->peer);
if (n == NULL)
{
GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# SET QUOTA messages ignored (no such peer)"),
+ 1,
+ GNUNET_NO);
return;
}
-
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received `%s' request (new quota %u, old quota %u) from client for peer `%4s'\n",
- "SET_QUOTA", ntohl(qsm->quota_in), n->quota_in, GNUNET_i2s (&qsm->peer));
+ "SET_QUOTA",
+ (unsigned int) ntohl (qsm->quota.value__),
+ (unsigned int) n->in_tracker.available_bytes_per_s__,
+ GNUNET_i2s (&qsm->peer));
#endif
-
- update_quota (n);
- if (n->quota_in < ntohl (qsm->quota_in))
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->quota_in = ntohl (qsm->quota_in);
- rl = n->plugins;
- while (rl != NULL)
- {
- p = rl->plugin;
- p->api->set_receive_quota (p->api->cls,
- &qsm->peer, ntohl (qsm->quota_in));
- rl = rl->next;
- }
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker,
+ qsm->quota);
+ if (0 == ntohl (qsm->quota.value__))
+ disconnect_neighbour (n, GNUNET_NO);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
plug->env.cls = plug;
plug->env.receive = &plugin_env_receive;
plug->env.notify_address = &plugin_env_notify_address;
- plug->env.default_quota_in =
- (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
plug->env.max_connections = max_connect_per_transport;
+ plug->env.stats = stats;
}
&abort_validation,
NULL);
GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+ validation_map = NULL;
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
}
sched = s;
cfg = c;
+ stats = GNUNET_STATISTICS_create (sched, "transport", cfg);
validation_map = GNUNET_CONTAINER_multihashmap_create (64);
/* parse configuration */
if ((GNUNET_OK !=
_
("Transport service is lacking key configuration settings. Exiting.\n"));
GNUNET_SCHEDULER_shutdown (s);
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
+ GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+ validation_map = NULL;
return;
}
max_connect_per_transport = (uint32_t) tneigh;
_
("Transport service could not access hostkey. Exiting.\n"));
GNUNET_SCHEDULER_shutdown (s);
+ if (stats != NULL)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ stats = NULL;
+ }
+ GNUNET_CONTAINER_multihashmap_destroy (validation_map);
+ validation_map = NULL;
return;
}
GNUNET_CRYPTO_rsa_key_get_public (my_private_key, &my_public_key);