* Build address record by signing raw information with private key.
*
* @param address text address at @a communicator to sign
+ * @param nt network type of @a address
* @param expiration how long is @a address valid
* @param private_key signing key to use
* @param result[out] where to write address record (allocated)
*/
void
GNUNET_HELLO_sign_address (const char *address,
+ enum GNUNET_ATS_Network_Type nt,
struct GNUNET_TIME_Absolute expiration,
const struct GNUNET_CRYPTO_EddsaPrivateKey *private_key,
void **result,
sizeof (sig),
&sig_str);
*result_size = 1 + GNUNET_asprintf ((char **) result,
- "%s;%llu;%s",
+ "%s;%llu;%u;%s",
sig_str,
(unsigned long long) expiration.abs_value_us,
+ (unsigned int) nt,
address);
GNUNET_free (sig_str);
}
* @param raw raw signed address
* @param raw_size size of @a raw
* @param public_key public key to use for signature verification
+ * @param nt[out] set to network type
* @param expiration[out] how long is the address valid
* @return NULL on error, otherwise the address
*/
GNUNET_HELLO_extract_address (const void *raw,
size_t raw_size,
const struct GNUNET_CRYPTO_EddsaPublicKey *public_key,
+ enum GNUNET_ATS_Network_Type *nt,
struct GNUNET_TIME_Absolute *expiration)
{
const char *raws = raw;
unsigned long long raw_us;
+ unsigned int raw_nt;
const char *sc;
const char *sc2;
+ const char *sc3;
const char *raw_addr;
struct GNUNET_TIME_Absolute raw_expiration;
struct SignedAddress sa;
GNUNET_break_op (0);
return NULL;
}
+ if (NULL == (sc3 = strchr (sc2 + 1,
+ ';')))
+ {
+ GNUNET_break_op (0);
+ return NULL;
+ }
if (1 != sscanf (sc + 1,
- "%llu;",
- &raw_us))
+ "%llu;%u;",
+ &raw_us,
+ &raw_nt))
{
GNUNET_break_op (0);
return NULL;
GNUNET_free_non_null (sig);
return NULL;
}
- raw_addr = sc2 + 1;
+ raw_addr = sc3 + 1;
sa.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_ADDRESS);
sa.purpose.size = htonl (sizeof (sa));
return NULL;
}
GNUNET_free (sig);
+ *expiration = raw_expiration;
+ *nt = (enum GNUNET_ATS_Network_Type) raw_nt;
return GNUNET_strdup (raw_addr);
}
#include "gnunet_util_lib.h"
-
/**
* Prefix that every HELLO URI must start with.
*/
/* NG API */
+/**
+ * Defined in gnunet_ats_service.h, but here we do not care about
+ * the details so are just giving the declaration.
+ */
+enum GNUNET_ATS_Network_Type;
+
/**
* Build address record by signing raw information with private key.
*
* @param address text address to sign
+ * @param nt network type of @a address
* @param expiration how long is @a address valid
* @param private_key signing key to use
* @param result[out] where to write address record (allocated)
*/
void
GNUNET_HELLO_sign_address (const char *address,
+ enum GNUNET_ATS_Network_Type nt,
struct GNUNET_TIME_Absolute expiration,
const struct GNUNET_CRYPTO_EddsaPrivateKey *private_key,
void **result,
* @param raw raw signed address
* @param raw_size size of @a raw
* @param public_key public key to use for signature verification
+ * @param nt[out] set to network type
* @param expiration[out] how long is the address valid
* @return NULL on error, otherwise the address
*/
GNUNET_HELLO_extract_address (const void *raw,
size_t raw_size,
const struct GNUNET_CRYPTO_EddsaPublicKey *public_key,
+ enum GNUNET_ATS_Network_Type *nt,
struct GNUNET_TIME_Absolute *expiration);
* @param config_section section of the configuration to use for options
* @param addr_prefix address prefix for addresses supported by this
* communicator, could be NULL for incoming-only communicators
- * @param mtu maximum message size supported by communicator, 0 if
- * sending is not supported, SIZE_MAX for no MTU
* @param mq_init function to call to initialize a message queue given
* the address of another peer, can be NULL if the
* communicator only supports receiving messages
GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
const char *config_section_name,
const char *addr_prefix,
- size_t mtu,
GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
void *mq_init_cls);
struct GNUNET_TRANSPORT_QueueHandle;
+/**
+ * Possible states of a connection.
+ */
+enum GNUNET_TRANSPORT_ConnectionStatus {
+ /**
+ * Connection is down.
+ */
+ GNUNET_TRANSPORT_CS_DOWN = -1,
+ /**
+ * this is an outbound connection (transport initiated)
+ */
+ GNUNET_TRANSPORT_CS_OUTBOUND = 0,
+ /**
+ * this is an inbound connection (communicator initiated)
+ */
+ GNUNET_TRANSPORT_CS_INBOUND = 1
+};
+
+
/**
* Notify transport service that an MQ became available due to an
* "inbound" connection or because the communicator discovered the
* @param ch connection to transport service
* @param peer peer with which we can now communicate
* @param address address in human-readable format, 0-terminated, UTF-8
+ * @param mtu maximum message size supported by queue, 0 if
+ * sending is not supported, SIZE_MAX for no MTU
* @param nt which network type does the @a address belong to?
+ * @param cs what is the connection status of the queue?
* @param mq message queue of the @a peer
* @return API handle identifying the new MQ
*/
GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
const struct GNUNET_PeerIdentity *peer,
const char *address,
+ uint32_t mtu,
enum GNUNET_ATS_Network_Type nt,
+ enum GNUNET_TRANSPORT_ConnectionStatus cs,
struct GNUNET_MQ_Handle *mq);
#include "gnunet_util_lib.h"
#include "gnunet_ats_service.h"
+#include "gnunet_transport_communication_service.h"
+
/**
* Version number of the transport API.
enum GNUNET_ATS_Network_Type nt;
/**
- * #GNUNET_YES if this is an inbound connection (communicator initiated)
- * #GNUNET_NO if this is an outbound connection (transport initiated)
+ * Connection status.
*/
- int is_inbound;
+ enum GNUNET_TRANSPORT_ConnectionStatus cs;
/**
* Number of messages pending transmission for this @e address.
*/
#define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
+/**
+ * Our MTU.
+ */
+#define UNIX_MTU UINT16_MAX
GNUNET_NETWORK_STRUCT_BEGIN
* data to another peer.
*
* @param peer the target peer
+ * @param cs inbound or outbound queue
* @param un the address
* @param un_len number of bytes in @a un
* @return the queue or NULL of max connections exceeded
*/
static struct Queue *
setup_queue (const struct GNUNET_PeerIdentity *target,
+ enum GNUNET_TRANSPORT_ConnectionStatus cs,
const struct sockaddr_un *un,
socklen_t un_len)
{
= GNUNET_TRANSPORT_communicator_mq_add (ch,
&queue->target,
foreign_addr,
+ UNIX_MTU,
GNUNET_ATS_NET_LOOPBACK,
+ cs,
queue->mq);
GNUNET_free (foreign_addr);
}
addrlen);
if (NULL == queue)
queue = setup_queue (&msg->sender,
+ GNUNET_TRANSPORT_CS_INBOUND,
&un,
addrlen);
else
return GNUNET_OK;
}
queue = setup_queue (peer,
+ GNUNET_TRANSPORT_CS_OUTBOUND,
un,
un_len);
GNUNET_free (un);
ch = GNUNET_TRANSPORT_communicator_connect (cfg,
COMMUNICATOR_CONFIG_SECTION,
COMMUNICATOR_ADDRESS_PREFIX,
- 65535,
&mq_init,
NULL);
if (NULL == ch)
* @author Christian Grothoff
*
* TODO:
- * - MTU information is missing for queues!
- * - start supporting monitor logic (add functions to signal monitors!)
+ * - monitor start: iterate to inform monitor about all existing queues!
* - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
- * - ask ATS about bandwidth allocation
+ * - inform ATS about RTT, goodput/loss, overheads, etc.
+ * - ask ATS about bandwidth allocation!
* -
*/
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_statistics_service.h"
#include "gnunet_transport_service.h"
+#include "gnunet_transport_monitor_service.h"
#include "gnunet_peerstore_service.h"
#include "gnunet_ats_service.h"
#include "gnunet-service-transport.h"
*/
uint32_t qid;
+ /**
+ * Maximum transmission unit supported by this queue.
+ */
+ uint32_t mtu;
+
/**
* Network type offered by this queue.
*/
}
+/**
+ * Details about what to notify monitors about.
+ */
+struct MonitorEvent
+{
+ /**
+ * @deprecated To be discussed if we keep these...
+ */
+ struct GNUNET_TIME_Absolute last_validation;
+ struct GNUNET_TIME_Absolute valid_until;
+ struct GNUNET_TIME_Absolute next_validation;
+
+ /**
+ * Current round-trip time estimate.
+ */
+ struct GNUNET_TIME_Relative rtt;
+
+ /**
+ * Connection status.
+ */
+ enum GNUNET_TRANSPORT_ConnectionStatus cs;
+
+ /**
+ * Messages pending.
+ */
+ uint32_t num_msg_pending;
+
+ /**
+ * Bytes pending.
+ */
+ uint32_t num_bytes_pending;
+
+
+};
+
+
+/**
+ * Notify monitor @a tc about an event. That @a tc
+ * cares about the event has already been checked.
+ *
+ * Send @a tc information in @a me about a @a peer's status with
+ * respect to some @a address to all monitors that care.
+ *
+ * @param tc monitor to inform
+ * @param peer peer the information is about
+ * @param address address the information is about
+ * @param nt network type associated with @a address
+ * @param me detailed information to transmit
+ */
+static void
+notify_monitor (struct TransportClient *tc,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *address,
+ enum GNUNET_ATS_Network_Type nt,
+ const struct MonitorEvent *me)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_TRANSPORT_MonitorData *md;
+ size_t addr_len = strlen (address) + 1;
+
+ env = GNUNET_MQ_msg_extra (md,
+ addr_len,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
+ md->nt = htonl ((uint32_t) nt);
+ md->peer = *peer;
+ md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
+ md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
+ md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
+ md->rtt = GNUNET_TIME_relative_hton (me->rtt);
+ md->cs = htonl ((uint32_t) me->cs);
+ md->num_msg_pending = htonl (me->num_msg_pending);
+ md->num_bytes_pending = htonl (me->num_bytes_pending);
+ memcpy (&md[1],
+ address,
+ addr_len);
+ GNUNET_MQ_send (tc->mq,
+ env);
+}
+
+
+/**
+ * Send information in @a me about a @a peer's status with respect
+ * to some @a address to all monitors that care.
+ *
+ * @param peer peer the information is about
+ * @param address address the information is about
+ * @param nt network type associated with @a address
+ * @param me detailed information to transmit
+ */
+static void
+notify_monitors (const struct GNUNET_PeerIdentity *peer,
+ const char *address,
+ enum GNUNET_ATS_Network_Type nt,
+ const struct MonitorEvent *me)
+{
+ static struct GNUNET_PeerIdentity zero;
+
+ for (struct TransportClient *tc = clients_head;
+ NULL != tc;
+ tc = tc->next)
+ {
+ if (CT_MONITOR != tc->type)
+ continue;
+ if (tc->details.monitor.one_shot)
+ continue;
+ if ( (0 != memcmp (&tc->details.monitor.peer,
+ &zero,
+ sizeof (zero))) &&
+ (0 != memcmp (&tc->details.monitor.peer,
+ peer,
+ sizeof (*peer))) )
+ continue;
+ notify_monitor (tc,
+ peer,
+ address,
+ nt,
+ me);
+ }
+}
+
+
/**
* Called whenever a client connects. Allocates our
* data structures associated with that client.
}
+/**
+ * Send message to CORE clients that we lost a connection.
+ *
+ * @param tc client to inform (must be CORE client)
+ * @param pid peer the connection is for
+ * @param quota_out current quota for the peer
+ */
+static void
+core_send_connect_info (struct TransportClient *tc,
+ const struct GNUNET_PeerIdentity *pid,
+ struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct ConnectInfoMessage *cim;
+
+ GNUNET_assert (CT_CORE == tc->type);
+ env = GNUNET_MQ_msg (cim,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+ cim->quota_out = quota_out;
+ cim->id = *pid;
+ GNUNET_MQ_send (tc->mq,
+ env);
+}
+
+
+/**
+ * Send message to CORE clients that we gained a connection
+ *
+ * @param pid peer the queue was for
+ * @param quota_out current quota for the peer
+ */
+static void
+cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
+ struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+{
+ for (struct TransportClient *tc = clients_head;
+ NULL != tc;
+ tc = tc->next)
+ {
+ if (CT_CORE != tc->type)
+ continue;
+ core_send_connect_info (tc,
+ pid,
+ quota_out);
+ }
+}
+
+
+/**
+ * Send message to CORE clients that we lost a connection.
+ *
+ * @param pid peer the connection was for
+ */
+static void
+cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
+{
+ for (struct TransportClient *tc = clients_head;
+ NULL != tc;
+ tc = tc->next)
+ {
+ struct GNUNET_MQ_Envelope *env;
+ struct DisconnectInfoMessage *dim;
+
+ if (CT_CORE != tc->type)
+ continue;
+ env = GNUNET_MQ_msg (dim,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
+ dim->peer = *pid;
+ GNUNET_MQ_send (tc->mq,
+ env);
+ }
+}
+
+
/**
* Free @a queue.
*
{
struct Neighbour *neighbour = queue->neighbour;
struct TransportClient *tc = queue->tc;
+ struct MonitorEvent me = {
+ .cs = GNUNET_TRANSPORT_CS_DOWN,
+ .rtt = GNUNET_TIME_UNIT_FOREVER_REL
+ };
GNUNET_CONTAINER_MDLL_remove (neighbour,
neighbour->queue_head,
tc->details.communicator.queue_head,
tc->details.communicator.queue_tail,
queue);
+
+ notify_monitors (&neighbour->pid,
+ queue->address,
+ queue->nt,
+ &me);
GNUNET_free (queue);
if (NULL == neighbour->queue_head)
{
- // FIXME: notify cores/monitors!
+ cores_send_disconnect_info (&neighbour->pid);
free_neighbour (neighbour);
}
}
}
+/**
+ * Iterator telling new CORE client about all existing
+ * connections to peers.
+ *
+ * @param cls the new `struct TransportClient`
+ * @param pid a connected peer
+ * @param value the `struct Neighbour` with more information
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+notify_client_connect_info (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
+{
+ struct TransportClient *tc = cls;
+ struct Neighbour *neighbour = value;
+
+ core_send_connect_info (tc,
+ pid,
+ neighbour->quota_out);
+ return GNUNET_OK;
+}
+
+
/**
* Initialize a "CORE" client. We got a start message from this
* client, so add it to the list of clients for broadcasting of
return;
}
tc->type = CT_CORE;
+ GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+ ¬ify_client_connect_info,
+ tc);
GNUNET_SERVICE_client_continue (tc->client);
}
tc->details.core.pending_msg_head,
tc->details.core.pending_msg_tail,
pm);
- // FIXME: do the work, continuation with:
+ // FIXME: do the work, final continuation with call to:
client_send_response (pm,
GNUNET_NO,
0);
ale->st = NULL;
expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
GNUNET_HELLO_sign_address (ale->address,
+ ale->nt,
expiration,
GST_my_private_key,
&addr,
struct Queue *queue;
struct Neighbour *neighbour;
const char *addr;
- uint16_t addr_len;
+ uint16_t addr_len;
neighbour = lookup_neighbour (&aqm->receiver);
if (NULL == neighbour)
&neighbour->pid,
neighbour,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- // FIXME: notify ATS/COREs/monitors!
+ cores_send_connect_info (&neighbour->pid,
+ GNUNET_BANDWIDTH_ZERO);
+ // FIXME: notify ATS!
}
addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
addr = (const char *) &aqm[1];
queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
+ queue->mtu = ntohl (aqm->mtu);
queue->qid = aqm->qid;
queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt);
queue->tc = tc;
memcpy (&queue[1],
addr,
addr_len);
+ /* notify monitors about new queue */
+ {
+ struct MonitorEvent me = {
+ .cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs)
+ };
+
+ notify_monitors (&neighbour->pid,
+ queue->address,
+ queue->nt,
+ &me);
+ }
GNUNET_CONTAINER_MDLL_insert (neighbour,
neighbour->queue_head,
neighbour->queue_tail,
tc->details.monitor.peer = start->peer;
tc->details.monitor.one_shot = ntohl (start->one_shot);
// FIXME: do work!
+
+ GNUNET_SERVICE_client_mark_monitor (tc->client);
GNUNET_SERVICE_client_continue (tc->client);
}
GNUNET_CONTAINER_multipeermap_iterate (neighbours,
&free_neighbour_cb,
NULL);
- /* FIXME: if this assertion fails (likely!), make sure we
- clean up clients *before* doing the rest of the
- shutdown! (i.e. by scheduling rest asynchronously!) */
- GNUNET_assert (NULL == clients_head);
if (NULL != peerstore)
{
GNUNET_PEERSTORE_disconnect (peerstore,
*/
GNUNET_SERVICE_MAIN
("transport",
- GNUNET_SERVICE_OPTION_NONE,
+ GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN,
&run,
&client_connect_cb,
&client_disconnect_cb,
*/
uint32_t nt;
- // FIXME: add MTU?
+ /**
+ * Maximum transmission unit, in NBO. UINT32_MAX for unlimited.
+ */
+ uint32_t mtu;
+
+ /**
+ * An `enum GNUNET_TRANSPORT_ConnectionStatus` in NBO.
+ */
+ uint32_t cs;
/* followed by UTF-8 encoded, 0-terminated human-readable address */
};
struct GNUNET_TIME_RelativeNBO rtt;
/**
- * Is inbound (in NBO).
+ * Connection status (in NBO).
*/
- uint32_t is_inbound GNUNET_PACKED;
+ uint32_t cs GNUNET_PACKED;
/**
* Messages pending (in NBO).
*/
uint32_t num_bytes_pending GNUNET_PACKED;
- /* Followed by 0-terminated address of the peer
- (TODO: do we allow no address? If so,
- adjust transport_api2_monitor!) */
+ /* Followed by 0-terminated address of the peer */
};
*/
uint64_t fc_gen;
- /**
- * MTU of the communicator
- */
- size_t mtu;
-
/**
* Internal UUID for the address used in communication with the
* transport service.
*/
enum GNUNET_ATS_Network_Type nt;
+ /**
+ * Communication status of the queue.
+ */
+ enum GNUNET_TRANSPORT_ConnectionStatus cs;
+
/**
* The queue itself.
*/
* ID for this queue when talking to the transport service.
*/
uint32_t queue_id;
+
+ /**
+ * Maximum transmission unit for the queue.
+ */
+ uint32_t mtu;
};
env = GNUNET_MQ_msg_extra (aqm,
strlen (qh->address) + 1,
GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP);
+ aqm->qid = htonl (qh->queue_id);
aqm->receiver = qh->peer;
aqm->nt = htonl ((uint32_t) qh->nt);
- aqm->qid = htonl (qh->queue_id);
+ aqm->mtu = htonl (qh->mtu);
+ aqm->cs = htonl ((uint32_t) qh->cs);
memcpy (&aqm[1],
qh->address,
strlen (qh->address) + 1);
GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
const char *config_section,
const char *addr_prefix,
- size_t mtu,
GNUNET_TRANSPORT_CommunicatorMqInit mq_init,
void *mq_init_cls)
{
ch->cfg = cfg;
ch->config_section = config_section;
ch->addr_prefix = addr_prefix;
- ch->mtu = mtu;
ch->mq_init = mq_init;
ch->mq_init_cls = mq_init_cls;
reconnect (ch);
* @param ch connection to transport service
* @param peer peer with which we can now communicate
* @param address address in human-readable format, 0-terminated, UTF-8
+ * @param mtu maximum message size supported by queue, 0 if
+ * sending is not supported, SIZE_MAX for no MTU
* @param nt which network type does the @a address belong to?
+ * @param cs what is the connection status of the queue?
* @param mq message queue of the @a peer
* @return API handle identifying the new MQ
*/
GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
const struct GNUNET_PeerIdentity *peer,
const char *address,
+ uint32_t mtu,
enum GNUNET_ATS_Network_Type nt,
+ enum GNUNET_TRANSPORT_ConnectionStatus cs,
struct GNUNET_MQ_Handle *mq)
{
struct GNUNET_TRANSPORT_QueueHandle *qh;
qh->peer = *peer;
qh->address = GNUNET_strdup (address);
qh->nt = nt;
+ qh->mtu = mtu;
+ qh->cs = cs;
qh->mq = mq;
qh->queue_id = ch->queue_gen++;
GNUNET_CONTAINER_DLL_insert (ch->queue_head,
mi.address = (const char *) &md[1];
mi.nt = (enum GNUNET_ATS_Network_Type) ntohl (md->nt);
- mi.is_inbound = (int) ntohl (md->is_inbound);
+ mi.cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (md->cs);
mi.num_msg_pending = ntohl (md->num_msg_pending);
mi.num_bytes_pending = ntohl (md->num_bytes_pending);
mi.last_validation = GNUNET_TIME_absolute_ntoh (md->last_validation);