* #3 only validated addresses are selected for scheduling; that
* also ensures we know the RTT
* #4 to ensure flow control and RTT are OK, we always do the
- * 'validation', even if address comes from PEERSTORE
+ * 'validation', even if address comes from PEERSTORE???
* - ACK handling / retransmission
* - track RTT, distance, loss, etc.
* - DV data structures:
*/
#define MAX_ADDRESS_VALID_UNTIL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
+/**
+ * What is the maximum frequency at which we do address validation?
+ * A random value between 0 and this value is added when scheduling
+ * the #validation_task (both to ensure we do not validate too often,
+ * and to randomize a bit).
+ */
+#define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
+
/**
* How many messages can we have pending for a given communicator
* process before we start to throttle that communicator?
*/
enum GNUNET_NetworkType nt;
+ /**
+ * We are technically ready to send the challenge, but we are waiting for
+ * the respective queue to become available for transmission.
+ */
+ int awaiting_queue;
+
};
*/
static struct GNUNET_SCHEDULER_Task *ephemeral_task;
+/**
+ * Task to run address validation.
+ */
+static struct GNUNET_SCHEDULER_Task *validation_task;
+
/**
* Free cached ephemeral key.
}
+/**
+ * Free validation state.
+ *
+ * @param vs validation state to free
+ */
+static void
+free_validation_state (struct ValidationState *vs)
+{
+ GNUNET_CONTAINER_multipeermap_remove (validation_map,
+ &vs->pid,
+ vs);
+ GNUNET_CONTAINER_heap_remove_node (vs->hn);
+ vs->hn = NULL;
+ if (NULL != vs->sc)
+ {
+ GNUNET_PEERSTORE_store_cancel (vs->sc);
+ vs->sc = NULL;
+ }
+ GNUNET_free (vs->address);
+ GNUNET_free (vs);
+}
+
+
/**
* Lookup neighbour record for peer @a pid.
*
*/
static int
free_reassembly_cb (void *cls,
- const struct GNUNET_ShortHashCode *key,
- void *value)
+ const struct GNUNET_ShortHashCode *key,
+ void *value)
{
struct ReassemblyContext *rc = value;
(void) cls;
*/
static void
route_message (const struct GNUNET_PeerIdentity *target,
- struct GNUNET_MessageHeader *hdr)
+ struct GNUNET_MessageHeader *hdr)
{
// FIXME: send hdr to target, free hdr (possibly using DV, possibly broadcasting)
GNUNET_free (hdr);
*/
static void
handle_fragment_box (void *cls,
- const struct TransportFragmentBox *fb)
+ const struct TransportFragmentBox *fb)
{
struct CommunicatorMessageContext *cmc = cls;
struct Neighbour *n;
*/
static void
handle_fragment_ack (void *cls,
- const struct TransportFragmentAckMessage *fa)
+ const struct TransportFragmentAckMessage *fa)
{
struct CommunicatorMessageContext *cmc = cls;
*/
static void
handle_dv_box (void *cls,
- const struct TransportDVBox *dvb)
+ const struct TransportDVBox *dvb)
{
struct CommunicatorMessageContext *cmc = cls;
uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
}
-/**
- * New queue became available. Process the request.
- *
- * @param cls the client
- * @param aqm the send message that was sent
- */
-static void
-handle_add_queue_message (void *cls,
- const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
-{
- struct TransportClient *tc = cls;
- struct Queue *queue;
- struct Neighbour *neighbour;
- const char *addr;
- uint16_t addr_len;
-
- if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
- {
- /* MTU so small as to be useless for transmissions,
- required for #fragment_message()! */
- GNUNET_break_op (0);
- GNUNET_SERVICE_client_drop (tc->client);
- return;
- }
- neighbour = lookup_neighbour (&aqm->receiver);
- if (NULL == neighbour)
- {
- neighbour = GNUNET_new (struct Neighbour);
- neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
- neighbour->pid = aqm->receiver;
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (neighbours,
- &neighbour->pid,
- neighbour,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- cores_send_connect_info (&neighbour->pid,
- GNUNET_BANDWIDTH_ZERO);
- }
- addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
- addr = (const char *) &aqm[1];
-
- queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
- queue->tc = tc;
- queue->address = (const char *) &queue[1];
- queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
- queue->qid = aqm->qid;
- queue->mtu = ntohl (aqm->mtu);
- queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
- queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
- queue->neighbour = neighbour;
- GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
- &tracker_update_in_cb,
- queue,
- GNUNET_BANDWIDTH_ZERO,
- GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
- &tracker_excess_in_cb,
- queue);
- GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
- &tracker_update_out_cb,
- queue,
- GNUNET_BANDWIDTH_ZERO,
- GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
- &tracker_excess_out_cb,
- queue);
- memcpy (&queue[1],
- addr,
- addr_len);
- /* notify monitors about new queue */
- {
- struct MonitorEvent me = {
- .rtt = queue->rtt,
- .cs = queue->cs
- };
-
- notify_monitors (&neighbour->pid,
- queue->address,
- queue->nt,
- &me);
- }
- GNUNET_CONTAINER_MDLL_insert (neighbour,
- neighbour->queue_head,
- neighbour->queue_tail,
- queue);
- GNUNET_CONTAINER_MDLL_insert (client,
- tc->details.communicator.queue_head,
- tc->details.communicator.queue_tail,
- queue);
- GNUNET_SERVICE_client_continue (tc->client);
-}
-
-
/**
* Queue to a peer went down. Process the request.
*
*/
static void
handle_monitor_start (void *cls,
- const struct GNUNET_TRANSPORT_MonitorStart *start)
+ const struct GNUNET_TRANSPORT_MonitorStart *start)
{
struct TransportClient *tc = cls;
}
+/**
+ * Task run periodically to validate some address based on #validation_heap.
+ *
+ * @param cls NULL
+ */
+static void
+validation_start_cb (void *cls);
+
+
+/**
+ * Set the time for next_challenge of @a vs to @a new_time.
+ * Updates the heap and if necessary reschedules the job.
+ *
+ * @param vs validation state to update
+ * @param new_time new time for revalidation
+ */
+static void
+update_next_challenge_time (struct ValidationState *vs,
+ struct GNUNET_TIME_Absolute new_time)
+{
+ struct GNUNET_TIME_Relative delta;
+
+ if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
+ return; /* be lazy */
+ vs->next_challenge = new_time;
+ if (NULL == vs->hn)
+ vs->hn = GNUNET_CONTAINER_heap_insert (validation_heap,
+ vs,
+ new_time.abs_value_us);
+ else
+ GNUNET_CONTAINER_heap_update_cost (vs->hn,
+ new_time.abs_value_us);
+ if ( (vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
+ (NULL != validation_task) )
+ return;
+ if (NULL != validation_task)
+ GNUNET_SCHEDULER_cancel (validation_task);
+ /* randomize a bit */
+ delta.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
+ MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
+ new_time = GNUNET_TIME_absolute_add (new_time,
+ delta);
+ validation_task = GNUNET_SCHEDULER_add_at (new_time,
+ &validation_start_cb,
+ NULL);
+}
+
+
+/**
+ * The queue @a q (which matches the peer and address in @a vs) is
+ * ready for queueing. We should now queue the validation request.
+ *
+ * @param q queue to send on
+ * @param vs state to derive validation challenge from
+ */
+static void
+validation_transmit_on_queue (struct Queue *q,
+ struct ValidationState *vs)
+{
+ struct TransportValidationChallenge tvc;
+
+ vs->last_challenge_use = GNUNET_TIME_absolute_get ();
+ tvc.reserved = htonl (0);
+ tvc.challenge = vs->challenge;
+ tvc.sender_time = GNUNET_TIME_absolute_hton (vs->last_challenge_use);
+ // FIXME: actually send on queue!
+}
+
+
+/**
+ * Task run periodically to validate some address based on #validation_heap.
+ *
+ * @param cls NULL
+ */
+static void
+validation_start_cb (void *cls)
+{
+ struct ValidationState *vs;
+ struct Neighbour *n;
+ struct Queue *q;
+
+ (void) cls;
+ validation_task = NULL;
+ vs = GNUNET_CONTAINER_heap_peek (validation_heap);
+ /* drop validations past their expiration */
+ while ( (NULL != vs) &&
+ (0 == GNUNET_TIME_absolute_get_remaining (vs->valid_until).rel_value_us) )
+ {
+ free_validation_state (vs);
+ vs = GNUNET_CONTAINER_heap_peek (validation_heap);
+ }
+ if (NULL == vs)
+ return; /* woopsie, no more addresses known, should only
+ happen if we're really a lonely peer */
+ n = GNUNET_CONTAINER_multipeermap_get (neighbours,
+ &vs->pid);
+ q = NULL;
+ if (NULL != n)
+ {
+ for (struct Queue *pos = n->queue_head;
+ NULL != pos;
+ pos = pos->next_neighbour)
+ {
+ if (0 == strcmp (pos->address,
+ vs->address))
+ {
+ q = pos;
+ break;
+ }
+ }
+ }
+ if (NULL == q)
+ {
+ vs->awaiting_queue = GNUNET_YES;
+ suggest_to_connect (&vs->pid,
+ vs->address);
+ }
+ else
+ validation_transmit_on_queue (q,
+ vs);
+ /* Finally, reschedule next attempt */
+ vs->challenge_backoff = GNUNET_TIME_randomized_backoff (vs->challenge_backoff,
+ MAX_VALIDATION_CHALLENGE_FREQ);
+ update_next_challenge_time (vs,
+ GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
+}
+
+
+/**
+ * A new queue has been created, check if any address validation
+ * requests have been waiting for it.
+ *
+ * @param cls a `struct Queue`
+ * @param pid peer concerned (unused)
+ * @param value a `struct ValidationState`
+ * @return #GNUNET_NO if a match was found and we can stop looking
+ */
+static int
+check_validation_request_pending (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct Queue *q = cls;
+ struct ValidationState *vs = value;
+
+ (void) pid;
+ if ( (GNUNET_YES == vs->awaiting_queue) &&
+ (0 == strcmp (vs->address,
+ q->address)) )
+ {
+ vs->awaiting_queue = GNUNET_NO;
+ validation_transmit_on_queue (q,
+ vs);
+ return GNUNET_NO;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * New queue became available. Process the request.
+ *
+ * @param cls the client
+ * @param aqm the send message that was sent
+ */
+static void
+handle_add_queue_message (void *cls,
+ const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
+{
+ struct TransportClient *tc = cls;
+ struct Queue *queue;
+ struct Neighbour *neighbour;
+ const char *addr;
+ uint16_t addr_len;
+
+ if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
+ {
+ /* MTU so small as to be useless for transmissions,
+ required for #fragment_message()! */
+ GNUNET_break_op (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ neighbour = lookup_neighbour (&aqm->receiver);
+ if (NULL == neighbour)
+ {
+ neighbour = GNUNET_new (struct Neighbour);
+ neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
+ neighbour->pid = aqm->receiver;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_put (neighbours,
+ &neighbour->pid,
+ neighbour,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ cores_send_connect_info (&neighbour->pid,
+ GNUNET_BANDWIDTH_ZERO);
+ }
+ addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
+ addr = (const char *) &aqm[1];
+
+ queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
+ queue->tc = tc;
+ queue->address = (const char *) &queue[1];
+ queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+ queue->qid = aqm->qid;
+ queue->mtu = ntohl (aqm->mtu);
+ queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
+ queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
+ queue->neighbour = neighbour;
+ GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
+ &tracker_update_in_cb,
+ queue,
+ GNUNET_BANDWIDTH_ZERO,
+ GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
+ &tracker_excess_in_cb,
+ queue);
+ GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
+ &tracker_update_out_cb,
+ queue,
+ GNUNET_BANDWIDTH_ZERO,
+ GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S,
+ &tracker_excess_out_cb,
+ queue);
+ memcpy (&queue[1],
+ addr,
+ addr_len);
+ /* notify monitors about new queue */
+ {
+ struct MonitorEvent me = {
+ .rtt = queue->rtt,
+ .cs = queue->cs
+ };
+
+ notify_monitors (&neighbour->pid,
+ queue->address,
+ queue->nt,
+ &me);
+ }
+ GNUNET_CONTAINER_MDLL_insert (neighbour,
+ neighbour->queue_head,
+ neighbour->queue_tail,
+ queue);
+ GNUNET_CONTAINER_MDLL_insert (client,
+ tc->details.communicator.queue_head,
+ tc->details.communicator.queue_tail,
+ queue);
+ /* check if valdiations are waiting for the queue */
+ (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+ &aqm->receiver,
+ &check_validation_request_pending,
+ queue);
+ GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
/**
* Communicator tells us that our request to create a queue "worked", that
* is setting up the queue is now in process.
}
+/**
+ * Closure for #check_known_address.
+ */
+struct CheckKnownAddressContext
+{
+ /**
+ * Set to the address we are looking for.
+ */
+ const char *address;
+
+ /**
+ * Set to a matching validation state, if one was found.
+ */
+ struct ValidationState *vs;
+};
+
+
+/**
+ * Test if the validation state in @a value matches the
+ * address from @a cls.
+ *
+ * @param cls a `struct CheckKnownAddressContext`
+ * @param pid unused (must match though)
+ * @param value a `struct ValidationState`
+ * @return #GNUNET_OK if not matching, #GNUNET_NO if match found
+ */
+static int
+check_known_address (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct CheckKnownAddressContext *ckac = cls;
+ struct ValidationState *vs = value;
+
+ (void) pid;
+ if (0 != strcmp (vs->address,
+ ckac->address))
+ return GNUNET_OK;
+ ckac->vs = vs;
+ return GNUNET_NO;
+}
+
+
/**
* Given another peers address, consider checking it for validity
* and then adding it to the Peerstore.
char *address;
enum GNUNET_NetworkType nt;
struct GNUNET_TIME_Absolute expiration;
+ struct GNUNET_TIME_Absolute now;
+ struct ValidationState *vs;
(void) cls;
- // FIXME: pre-check: do we know this address already?
- // FIXME: pre-check: rate-limit signature verification / validation!
+ // FIXME: checking that we know this address already should
+ // be done BEFORE checking the signature => HELLO API change!
+ // FIXME: pre-check: rate-limit signature verification / validation?!
address = GNUNET_HELLO_extract_address (&hdr[1],
ntohs (hdr->header.size) - sizeof (*hdr),
&hdr->peer,
}
if (0 == GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us)
return; /* expired */
- // FIXME: do begin actual verification here!
- GNUNET_free (address);
+ {
+ struct CheckKnownAddressContext ckac = {
+ .address = address,
+ .vs = NULL
+ };
+
+ (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
+ &hdr->peer,
+ &check_known_address,
+ &ckac);
+ if (NULL != (vs = ckac.vs))
+ {
+ /* if 'vs' is not currently valid, we need to speed up retrying the validation */
+ if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
+ {
+ /* reduce backoff as we got a fresh advertisement */
+ vs->challenge_backoff = GNUNET_TIME_relative_min (FAST_VALIDATION_CHALLENGE_FREQ,
+ GNUNET_TIME_relative_divide (vs->challenge_backoff,
+ 2));
+ update_next_challenge_time (vs,
+ GNUNET_TIME_relative_to_absolute (vs->challenge_backoff));
+ }
+ GNUNET_free (address);
+ return;
+ }
+ }
+ now = GNUNET_TIME_absolute_get();
+ vs = GNUNET_new (struct ValidationState);
+ vs->pid = hdr->peer;
+ vs->valid_until = expiration;
+ vs->first_challenge_use = now;
+ vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &vs->challenge,
+ sizeof (vs->challenge));
+ vs->address = address;
+ vs->nt = nt;
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (validation_map,
+ &vs->pid,
+ vs,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ update_next_challenge_time (vs,
+ now);
}
*
* @param cls NULL
* @param pid unused
- * @param value a `struct Neighbour`
+ * @param value a `struct EphemeralCacheEntry`
* @return #GNUNET_OK (always)
*/
static int
}
+/**
+ * Free validation state.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct ValidationState`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_validation_state_cb (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct ValidationState *vs = value;
+
+ (void) cls;
+ (void) pid;
+ free_validation_state (vs);
+ return GNUNET_OK;
+}
+
+
/**
* Function called when the service shuts down. Unloads our plugins
* and cancels pending validations.
}
GNUNET_CONTAINER_multipeermap_destroy (neighbours);
neighbours = NULL;
+ GNUNET_CONTAINER_multipeermap_iterate (validation_map,
+ &free_validation_state_cb,
+ NULL);
+ GNUNET_CONTAINER_multipeermap_destroy (validation_map);
+ validation_map = NULL;
+ GNUNET_CONTAINER_heap_destroy (validation_heap);
+ validation_heap = NULL;
GNUNET_CONTAINER_multipeermap_iterate (dv_routes,
- &free_dv_routes_cb,
- NULL);
+ &free_dv_routes_cb,
+ NULL);
GNUNET_CONTAINER_multipeermap_destroy (dv_routes);
dv_routes = NULL;
GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
- &free_ephemeral_cb,
- NULL);
+ &free_ephemeral_cb,
+ NULL);
GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
ephemeral_map = NULL;
GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
GNUNET_YES);
ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ validation_map = GNUNET_CONTAINER_multipeermap_create (1024,
+ GNUNET_YES);
+ validation_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
if (NULL == GST_my_private_key)
{