along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
- * @file transport/gnunet-service-transport.c
- * @brief main for gnunet-service-transport
+ * @file transport/gnunet-service-tng.c
+ * @brief main for gnunet-service-tng
* @author Christian Grothoff
*
* TODO:
- * - design ATS-NG API
* - figure out how to transmit (selective) ACKs in case of uni-directional
* communicators (with/without core? DV-only?) When do we use ACKs?
- * How/where do we distinguish between TCP/HTTP and unreliable communicators?
- * => Should communicator provide reliable/unreliable ("flags") information?
+ * => communicators use selective ACKs for flow control
+ * => transport uses message-level ACKs for RTT, fragment confirmation
+ * => integrate DV into transport, use neither core nor communicators
+ * but rather give communicators transport-encapsulated messages
+ * (which could be core-data, background-channel traffic, or
+ * transport-to-transport traffic)
+ *
+ * Implement:
* - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
+ *
+ * Easy:
+ * - use ATS bandwidth allocation callback and schedule transmissions!
+ *
+ * Plan:
* - inform ATS about RTT, goodput/loss, overheads, etc.
- * - ask ATS about bandwidth allocation!
+ *
+ * Later:
* - change transport-core API to provide proper flow control in both
* directions, allow multiple messages per peer simultaneously (tag
* confirmations with unique message ID), and replace quota-out with
* proper flow control;
+ *
+ * Design realizations / discussion:
+ * - communicators do flow control by calling MQ "notify sent"
+ * when 'ready'. They determine flow implicitly (i.e. TCP blocking)
+ * or explicitly via background channel FC ACKs. As long as the
+ * channel is not full, they may 'notify sent' even if the other
+ * peer has not yet confirmed receipt. The other peer confirming
+ * is _only_ for FC, not for more reliable transmission; reliable
+ * transmission (i.e. of fragments) is left to _transport_.
+ * - ACKs sent back in uni-directional communicators are done via
+ * the background channel API; here transport _may_ initially
+ * broadcast (with bounded # hops) if no path is known;
+ * - transport should _integrate_ DV-routing and build a view of
+ * the network; then background channel traffic can be
+ * routed via DV as well as explicit "DV" traffic.
+ * - background channel is also used for ACKs and NAT traversal support
+ * - transport service is responsible for AEAD'ing the background
+ * channel, timestamps and monotonic time are used against replay
+ * of old messages -> peerstore needs to be supplied with
+ * "latest timestamps seen" data
+ * - if transport implements DV, we likely need a 3rd peermap
+ * in addition to ephemerals and (direct) neighbours
+ * => in this data structure, we should track ATS metrics (distance, RTT, etc.)
+ * as well as latest timestamps seen, goodput, fragments for transmission, etc.
+ * ==> check if stuff needs to be moved out of "Neighbour"
+ * - transport should encapsualte core-level messages and do its
+ * own ACKing for RTT/goodput/loss measurements _and_ fragment
+ * for retransmission
*/
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_statistics_service.h"
-#include "gnunet_transport_service.h"
#include "gnunet_transport_monitor_service.h"
#include "gnunet_peerstore_service.h"
-#include "gnunet_ats_service.h"
-#include "gnunet-service-transport.h"
+#include "gnunet_hello_lib.h"
+#include "gnunet_ats_transport_service.h"
#include "transport.h"
#define MAX_PENDING (128 * 1024)
+GNUNET_NETWORK_STRUCT_BEGIN
+
+/**
+ * Outer layer of an encapsulated backchannel message.
+ */
+struct TransportBackchannelEncapsulationMessage
+{
+ /**
+ * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Distance the backchannel message has traveled, to be updated at
+ * each hop. Used to bound the number of hops in case a backchannel
+ * message is broadcast and thus travels without routing
+ * information (during initial backchannel discovery).
+ */
+ uint32_t distance;
+
+ /**
+ * Target's peer identity (as backchannels may be transmitted
+ * indirectly, or even be broadcast).
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * Ephemeral key setup by the sender for @e target, used
+ * to encrypt the payload.
+ */
+ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+
+ /**
+ * HMAC over the ciphertext of the encrypted, variable-size
+ * body that follows. Verified via DH of @e target and
+ * @e ephemeral_key
+ */
+ struct GNUNET_HashCode hmac;
+
+ /* Followed by encrypted, variable-size payload */
+};
+
+
+/**
+ * Message by which a peer confirms that it is using an
+ * ephemeral key.
+ */
+struct EphemeralConfirmation
+{
+
+ /**
+ * Purpose is #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
+ */
+ struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
+
+ /**
+ * How long is this signature over the ephemeral key
+ * valid?
+ */
+ struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
+
+ /**
+ * Ephemeral key setup by the sender for @e target, used
+ * to encrypt the payload.
+ */
+ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+
+};
+
+/**
+ * Plaintext of the variable-size payload that is encrypted
+ * within a `struct TransportBackchannelEncapsulationMessage`
+ */
+struct TransportBackchannelRequestPayload
+{
+
+ /**
+ * Sender's peer identity.
+ */
+ struct GNUNET_PeerIdentity sender;
+
+ /**
+ * Signature of the sender over an
+ * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL.
+ */
+ struct GNUNET_CRYPTO_EddsaSignature sender_sig;
+
+ /**
+ * How long is this signature over the ephemeral key
+ * valid?
+ */
+ struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
+
+ /**
+ * Current monotonic time of the sending transport service. Used to
+ * detect replayed messages. Note that the receiver should remember
+ * a list of the recently seen timestamps and only reject messages
+ * if the timestamp is in the list, or the list is "full" and the
+ * timestamp is smaller than the lowest in the list. This list of
+ * timestamps per peer should be persisted to guard against replays
+ * after restarts.
+ */
+ struct GNUNET_TIME_AbsoluteNBO monotonic_time;
+
+ /* Followed by a `struct GNUNET_MessageHeader` with a message
+ for a communicator */
+
+ /* Followed by a 0-termianted string specifying the name of
+ the communicator which is to receive the message */
+
+};
+
+GNUNET_NETWORK_STRUCT_END
+
+
+
/**
* What type of client is the `struct TransportClient` about?
*/
};
+/**
+ * Entry in our cache of ephemeral keys we currently use.
+ */
+struct EphemeralCacheEntry
+{
+
+ /**
+ * Target's peer identity (we don't re-use ephemerals
+ * to limit linkability of messages).
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * Signature affirming @e ephemeral_key of type
+ * #GNUNET_SIGNATURE_PURPOSE_TRANSPORT_EPHEMERAL
+ */
+ struct GNUNET_CRYPTO_EddsaSignature sender_sig;
+
+ /**
+ * How long is @e sender_sig valid
+ */
+ struct GNUNET_TIME_Absolute ephemeral_validity;
+
+ /**
+ * Our ephemeral key.
+ */
+ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+
+ /**
+ * Node in the ephemeral cache for this entry.
+ * Used for expiration.
+ */
+ struct GNUNET_CONTAINER_HeapNode *hn;
+};
+
+
/**
* Client connected to the transport service.
*/
/**
- * List of available queues for a particular neighbour.
+ * An ATS session is a message queue provided by a communicator
+ * via which we can reach a particular neighbour.
*/
-struct Queue
+struct GNUNET_ATS_Session
{
/**
* Kept in a MDLL.
*/
- struct Queue *next_neighbour;
+ struct GNUNET_ATS_Session *next_neighbour;
/**
* Kept in a MDLL.
*/
- struct Queue *prev_neighbour;
+ struct GNUNET_ATS_Session *prev_neighbour;
/**
* Kept in a MDLL.
*/
- struct Queue *prev_client;
+ struct GNUNET_ATS_Session *prev_client;
/**
* Kept in a MDLL.
*/
- struct Queue *next_client;
+ struct GNUNET_ATS_Session *next_client;
/**
- * Which neighbour is this queue for?
+ * Which neighbour is this ATS session for?
*/
struct Neighbour *neighbour;
/**
- * Which communicator offers this queue?
+ * Which communicator offers this ATS session?
*/
struct TransportClient *tc;
/**
- * Address served by the queue.
+ * Address served by the ATS session.
*/
const char *address;
/**
- * Our current RTT estimate for this queue.
+ * Our current RTT estimate for this ATS session.
*/
struct GNUNET_TIME_Relative rtt;
/**
- * Unique identifier of this queue with the communicator.
+ * Unique identifier of this ATS session with the communicator.
*/
uint32_t qid;
/**
- * Maximum transmission unit supported by this queue.
+ * Maximum transmission unit supported by this ATS session.
*/
uint32_t mtu;
/**
- * Distance to the target of this queue.
+ * Distance to the target of this ATS session.
*/
uint32_t distance;
/**
- * Network type offered by this queue.
+ * Network type offered by this ATS session.
*/
enum GNUNET_NetworkType nt;
/**
- * Connection status for this queue.
+ * Connection status for this ATS session.
*/
enum GNUNET_TRANSPORT_ConnectionStatus cs;
*/
uint32_t num_bytes_pending;
- // FIXME: add ATS-specific fields here!
+ /**
+ * How much outbound bandwidth do we have available for this session?
+ */
+ struct GNUNET_BANDWIDTH_Tracker tracker_out;
+
+ /**
+ * How much inbound bandwidth do we have available for this session?
+ */
+ struct GNUNET_BANDWIDTH_Tracker tracker_in;
};
struct PendingMessage *pending_msg_tail;
/**
- * Head of DLL of queues to this peer.
+ * Head of DLL of ATS sessions to this peer.
*/
- struct Queue *queue_head;
+ struct GNUNET_ATS_Session *session_head;
/**
- * Tail of DLL of queues to this peer.
+ * Tail of DLL of ATS sessions to this peer.
*/
- struct Queue *queue_tail;
+ struct GNUNET_ATS_Session *session_tail;
/**
* Quota at which CORE is allowed to transmit to this peer
/**
* Head of DLL of queues offered by this communicator.
*/
- struct Queue *queue_head;
+ struct GNUNET_ATS_Session *session_head;
/**
* Tail of DLL of queues offered by this communicator.
*/
- struct Queue *queue_tail;
+ struct GNUNET_ATS_Session *session_tail;
/**
* Head of list of the addresses of this peer offered by this communicator.
/**
* Statistics handle.
*/
-struct GNUNET_STATISTICS_Handle *GST_stats;
+static struct GNUNET_STATISTICS_Handle *GST_stats;
/**
* Configuration handle.
*/
-const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
+static const struct GNUNET_CONFIGURATION_Handle *GST_cfg;
/**
* Our public key.
*/
-struct GNUNET_PeerIdentity GST_my_identity;
+static struct GNUNET_PeerIdentity GST_my_identity;
/**
* Our private key.
*/
-struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
+static struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
/**
* Map from PIDs to `struct Neighbour` entries. A peer is
*/
static struct GNUNET_PEERSTORE_Handle *peerstore;
+/**
+ * Heap sorting `struct EphemeralCacheEntry` by their
+ * key/signature validity.
+ */
+static struct GNUNET_CONTAINER_Heap *ephemeral_heap;
+
+/**
+ * Hash map for looking up `struct EphemeralCacheEntry`s
+ * by peer identity. (We may have ephemerals in our
+ * cache for which we do not have a neighbour entry,
+ * and similar many neighbours may not need ephemerals,
+ * so we use a second map.)
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *ephemeral_map;
+
+/**
+ * Our connection to ATS for allocation and bootstrapping.
+ */
+static struct GNUNET_ATS_TransportHandle *ats;
+
+
+/**
+ * Free cached ephemeral key.
+ *
+ * @param ece cached signature to free
+ */
+static void
+free_ephemeral (struct EphemeralCacheEntry *ece)
+{
+ GNUNET_CONTAINER_multipeermap_remove (ephemeral_map,
+ &ece->target,
+ ece);
+ GNUNET_CONTAINER_heap_remove_node (ece->hn);
+ GNUNET_free (ece);
+}
+
/**
* Lookup neighbour record for peer @a pid.
static void
free_neighbour (struct Neighbour *neighbour)
{
- GNUNET_assert (NULL == neighbour->queue_head);
+ GNUNET_assert (NULL == neighbour->session_head);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (neighbours,
&neighbour->pid,
* @param queue the queue to free
*/
static void
-free_queue (struct Queue *queue)
+free_queue (struct GNUNET_ATS_Session *queue)
{
struct Neighbour *neighbour = queue->neighbour;
struct TransportClient *tc = queue->tc;
};
GNUNET_CONTAINER_MDLL_remove (neighbour,
- neighbour->queue_head,
- neighbour->queue_tail,
+ neighbour->session_head,
+ neighbour->session_tail,
queue);
GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.communicator.queue_head,
- tc->details.communicator.queue_tail,
+ tc->details.communicator.session_head,
+ tc->details.communicator.session_tail,
queue);
notify_monitors (&neighbour->pid,
queue->address,
queue->nt,
&me);
+ GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_in);
+ GNUNET_BANDWIDTH_tracker_notification_stop (&queue->tracker_out);
GNUNET_free (queue);
- if (NULL == neighbour->queue_head)
+ if (NULL == neighbour->session_head)
{
cores_send_disconnect_info (&neighbour->pid);
free_neighbour (neighbour);
break;
case CT_COMMUNICATOR:
{
- struct Queue *q;
+ struct GNUNET_ATS_Session *q;
struct AddressListEntry *ale;
- while (NULL != (q = tc->details.communicator.queue_head))
+ while (NULL != (q = tc->details.communicator.session_head))
free_queue (q);
while (NULL != (ale = tc->details.communicator.addr_head))
free_address_list_entry (ale);
}
+/**
+ * Bandwidth tracker informs us that the delay until we
+ * can transmit again changed.
+ *
+ * @param cls a `struct GNUNET_ATS_Session` for which the delay changed
+ */
+static void
+tracker_update_cb (void *cls)
+{
+ struct GNUNET_ATS_Session *queue = cls;
+
+ // FIXME: re-schedule transmission tasks if applicable!
+}
+
+
+/**
+ * Bandwidth tracker informs us that excessive bandwidth was allocated
+ * which is not being used.
+ *
+ * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted
+ */
+static void
+tracker_excess_cb (void *cls)
+{
+ /* FIXME: what do we do? */
+}
+
+
/**
* New queue became available. Process the request.
*
const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
{
struct TransportClient *tc = cls;
- struct Queue *queue;
+ struct GNUNET_ATS_Session *queue;
struct Neighbour *neighbour;
const char *addr;
uint16_t addr_len;
addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
addr = (const char *) &aqm[1];
- queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
+ queue = GNUNET_malloc (sizeof (struct GNUNET_ATS_Session) + addr_len);
queue->tc = tc;
queue->address = (const char *) &queue[1];
queue->rtt = GNUNET_TIME_UNIT_FOREVER_REL;
queue->nt = (enum GNUNET_NetworkType) ntohl (aqm->nt);
queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs);
queue->neighbour = neighbour;
+ GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in,
+ &tracker_update_cb,
+ queue,
+ GNUNET_BANDWIDTH_ZERO,
+ 0 /* FIXME: max carry in seconds! */,
+ &tracker_excess_cb,
+ queue);
+ GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out,
+ &tracker_update_cb,
+ queue,
+ GNUNET_BANDWIDTH_ZERO,
+ 0 /* FIXME: max carry in seconds! */,
+ &tracker_excess_cb,
+ queue);
memcpy (&queue[1],
addr,
addr_len);
&me);
}
GNUNET_CONTAINER_MDLL_insert (neighbour,
- neighbour->queue_head,
- neighbour->queue_tail,
+ neighbour->session_head,
+ neighbour->session_tail,
queue);
GNUNET_CONTAINER_MDLL_insert (client,
- tc->details.communicator.queue_head,
- tc->details.communicator.queue_tail,
+ tc->details.communicator.session_head,
+ tc->details.communicator.session_tail,
queue);
// FIXME: possibly transmit queued messages?
GNUNET_SERVICE_client_continue (tc->client);
GNUNET_SERVICE_client_drop (tc->client);
return;
}
- for (struct Queue *queue = tc->details.communicator.queue_head;
+ for (struct GNUNET_ATS_Session *queue = tc->details.communicator.session_head;
NULL != queue;
queue = queue->next_client)
{
struct Neighbour *neighbour = value;
GNUNET_assert (CT_MONITOR == tc->type);
- for (struct Queue *q = neighbour->queue_head;
+ for (struct GNUNET_ATS_Session *q = neighbour->session_head;
NULL != q;
q = q->next_neighbour)
{
}
+/**
+ * Signature of a function called by ATS with the current bandwidth
+ * allocation to be used as determined by ATS.
+ *
+ * @param cls closure, NULL
+ * @param session session this is about
+ * @param bandwidth_out assigned outbound bandwidth for the connection,
+ * 0 to signal disconnect
+ * @param bandwidth_in assigned inbound bandwidth for the connection,
+ * 0 to signal disconnect
+ */
+static void
+ats_allocation_cb (void *cls,
+ struct GNUNET_ATS_Session *session,
+ struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
+ struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in)
+{
+ (void) cls;
+ GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_out,
+ bandwidth_out);
+ GNUNET_BANDWIDTH_tracker_update_quota (&session->tracker_in,
+ bandwidth_in);
+}
+
+
+/**
+ * Find transport client providing communication service
+ * for the protocol @a prefix.
+ *
+ * @param prefix communicator name
+ * @return NULL if no such transport client is available
+ */
+static struct TransportClient *
+lookup_communicator (const char *prefix)
+{
+ GNUNET_break (0); // FIXME: implement
+ return NULL;
+}
+
+
+/**
+ * Signature of a function called by ATS suggesting transport to
+ * try connecting with a particular address.
+ *
+ * @param cls closure, NULL
+ * @param pid target peer
+ * @param address the address to try
+ */
+static void
+ats_suggestion_cb (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ const char *address)
+{
+ struct TransportClient *tc;
+ char *prefix;
+
+ (void) cls;
+ prefix = NULL; // FIXME
+ tc = lookup_communicator (prefix);
+ if (NULL == tc)
+ {
+ // STATS...
+ return;
+ }
+ // FIXME: forward suggestion to tc
+}
+
+
/**
* Free neighbour entry.
*
}
+/**
+ * Free ephemeral entry.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_ephemeral_cb (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct EphemeralCacheEntry *ece = value;
+
+ (void) cls;
+ (void) pid;
+ free_ephemeral (ece);
+ return GNUNET_OK;
+}
+
+
/**
* Function called when the service shuts down. Unloads our plugins
* and cancels pending validations.
GNUNET_CONTAINER_multipeermap_iterate (neighbours,
&free_neighbour_cb,
NULL);
+ if (NULL != ats)
+ {
+ GNUNET_ATS_transport_done (ats);
+ ats = NULL;
+ }
if (NULL != peerstore)
{
GNUNET_PEERSTORE_disconnect (peerstore,
GST_my_private_key = NULL;
}
GNUNET_CONTAINER_multipeermap_destroy (neighbours);
+ neighbours = NULL;
+ GNUNET_CONTAINER_multipeermap_iterate (ephemeral_map,
+ &free_ephemeral_cb,
+ NULL);
+ GNUNET_CONTAINER_multipeermap_destroy (ephemeral_map);
+ ephemeral_map = NULL;
+ GNUNET_CONTAINER_heap_destroy (ephemeral_heap);
+ ephemeral_heap = NULL;
}
GST_cfg = c;
neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
GNUNET_YES);
+ ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
+ GNUNET_YES);
+ ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (GST_cfg);
if (NULL == GST_my_private_key)
{
GNUNET_log(GNUNET_ERROR_TYPE_INFO,
"My identity is `%s'\n",
GNUNET_i2s_full (&GST_my_identity));
-
GST_stats = GNUNET_STATISTICS_create ("transport",
GST_cfg);
GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
NULL);
peerstore = GNUNET_PEERSTORE_connect (GST_cfg);
if (NULL == peerstore)
- {
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- /* start subsystems */
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ ats = GNUNET_ATS_transport_init (GST_cfg,
+ &ats_allocation_cb,
+ NULL,
+ &ats_suggestion_cb,
+ NULL);
+ if (NULL == ats)
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
}