From 5391d3d34f3bf7f40f37f9e6038466002f422bb3 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 21 Jan 2019 15:09:16 +0100 Subject: [PATCH] more work on tng --- src/transport/gnunet-service-tng.c | 237 +++++++++++++++++++++++++++-- src/transport/transport.h | 2 +- 2 files changed, 224 insertions(+), 15 deletions(-) diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 0a129af80..3673958ec 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -85,6 +85,19 @@ #include "transport.h" +/** + * What is the size we assume for a read operation in the + * absence of an MTU for the purpose of flow control? + */ +#define IN_PACKET_SIZE_WITHOUT_MTU 128 + +/** + * If a queue delays the next message by more than this number + * of seconds we log a warning. Note: this is for testing, + * the value chosen here might be too aggressively low! + */ +#define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) + /** * How many messages can we have pending for a given client process * before we start to drop incoming messages? We typically should @@ -336,6 +349,12 @@ struct GNUNET_ATS_Session * Handle by which we inform ATS about this queue. */ struct GNUNET_ATS_SessionRecord *sr; + + /** + * Task scheduled for the time when this queue can (likely) transmit the + * next message. Still needs to check with the @e tracker_out to be sure. + */ + struct GNUNET_SCHEDULER_Task *transmit_task; /** * Our current RTT estimate for this ATS session. @@ -994,6 +1013,11 @@ free_queue (struct GNUNET_ATS_Session *queue) .rtt = GNUNET_TIME_UNIT_FOREVER_REL }; + if (NULL != queue->transmit_task) + { + GNUNET_SCHEDULER_cancel (queue->transmit_task); + queue->transmit_task = NULL; + } GNUNET_CONTAINER_MDLL_remove (neighbour, neighbour->session_head, neighbour->session_tail, @@ -1599,6 +1623,46 @@ check_add_queue_message (void *cls, } +/** + * Bandwidth tracker informs us that the delay until we should receive + * more has changed. + * + * @param cls a `struct GNUNET_ATS_Session` for which the delay changed + */ +static void +tracker_update_in_cb (void *cls) +{ + struct GNUNET_ATS_Session *queue = cls; + struct GNUNET_TIME_Relative in_delay; + unsigned int rsize; + + rsize = (0 == queue->mtu) ? IN_PACKET_SIZE_WITHOUT_MTU : queue->mtu; + in_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_in, + rsize); + // FIXME: how exactly do we do inbound flow control? +} + + +/** + * We believe we are ready to transmit a message on a queue. Double-checks + * with the queue's "tracker_out" and then gives the message to the + * communicator for transmission (updating the tracker, and re-scheduling + * itself if applicable). + * + * @param cls the `struct GNUNET_ATS_Session` to process transmissions for + */ +static void +transmit_on_queue (void *cls) +{ + struct GNUNET_ATS_Session *queue = cls; + + queue->transmit_task = NULL; + // FIXME: check if transmission is really ready + // FIXME: do transmission (fragmentation, adding signalling / RTT tracking logic, etc.) + // FIXME: re-schedule self +} + + /** * Bandwidth tracker informs us that the delay until we * can transmit again changed. @@ -1606,24 +1670,81 @@ check_add_queue_message (void *cls, * @param cls a `struct GNUNET_ATS_Session` for which the delay changed */ static void -tracker_update_cb (void *cls) +tracker_update_out_cb (void *cls) { struct GNUNET_ATS_Session *queue = cls; + struct Neighbour *n = queue->neighbour; + struct PendingMessage *pm = n->pending_msg_head; + struct GNUNET_TIME_Relative out_delay; + unsigned int wsize; - // FIXME: re-schedule transmission tasks if applicable! + if (NULL == pm) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bandwidth allocation updated for empty transmission queue `%s'\n", + queue->address); + return; /* no message pending, nothing to do here! */ + } + wsize = (0 == queue->mtu) + ? pm->bytes_msg /* FIXME: add overheads? */ + : queue->mtu; + out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, + wsize); + GNUNET_SCHEDULER_cancel (queue->transmit_task); + queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, + &transmit_on_queue, + queue); + if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Next transmission on queue `%s' in %s (high delay)\n", + queue->address, + GNUNET_STRINGS_relative_time_to_string (out_delay, + GNUNET_YES)); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Next transmission on queue `%s' in %s\n", + queue->address, + GNUNET_STRINGS_relative_time_to_string (out_delay, + GNUNET_YES)); +} + + +/** + * Bandwidth tracker informs us that excessive outbound bandwidth was + * allocated which is not being used. + * + * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted + */ +static void +tracker_excess_out_cb (void *cls) +{ + /* FIXME: trigger excess bandwidth report to core? Right now, + this is done internally within transport_api2_core already, + but we probably want to change the logic and trigger it + from here via a message instead! */ + /* TODO: maybe inform ATS at this point? */ + GNUNET_STATISTICS_update (GST_stats, + "# Excess outbound bandwidth reported", + 1, + GNUNET_NO); } + /** - * Bandwidth tracker informs us that excessive bandwidth was allocated + * Bandwidth tracker informs us that excessive inbound bandwidth was allocated * which is not being used. * * @param cls a `struct GNUNET_ATS_Session` for which the excess was noted */ static void -tracker_excess_cb (void *cls) +tracker_excess_in_cb (void *cls) { - /* FIXME: what do we do? */ + /* TODO: maybe inform ATS at this point? */ + GNUNET_STATISTICS_update (GST_stats, + "# Excess inbound bandwidth reported", + 1, + GNUNET_NO); } @@ -1669,18 +1790,18 @@ handle_add_queue_message (void *cls, queue->cs = (enum GNUNET_TRANSPORT_ConnectionStatus) ntohl (aqm->cs); queue->neighbour = neighbour; GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_in, - &tracker_update_cb, + &tracker_update_in_cb, queue, GNUNET_BANDWIDTH_ZERO, - 0 /* FIXME: max carry in seconds! */, - &tracker_excess_cb, + GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S, + &tracker_excess_in_cb, queue); GNUNET_BANDWIDTH_tracker_init2 (&queue->tracker_out, - &tracker_update_cb, + &tracker_update_out_cb, queue, GNUNET_BANDWIDTH_ZERO, - 0 /* FIXME: max carry in seconds! */, - &tracker_excess_cb, + GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S, + &tracker_excess_out_cb, queue); memcpy (&queue[1], addr, @@ -1940,8 +2061,12 @@ ats_suggestion_cb (void *cls, const struct GNUNET_PeerIdentity *pid, const char *address) { + static uint32_t idgen; struct TransportClient *tc; char *prefix; + struct GNUNET_TRANSPORT_CreateQueue *cqm; + struct GNUNET_MQ_Envelope *env; + size_t alen; (void) cls; prefix = GNUNET_HELLO_address_to_prefix (address); @@ -1956,11 +2081,87 @@ ats_suggestion_cb (void *cls, GNUNET_STATISTICS_update (GST_stats, "# ATS suggestions ignored due to missing communicator", 1, - GNUNET_NO); - + GNUNET_NO); + return; + } + /* forward suggestion for queue creation to communicator */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Request #%u for `%s' communicator to create queue to `%s'\n", + (unsigned int) idgen, + prefix, + address); + alen = strlen (address) + 1; + env = GNUNET_MQ_msg_extra (cqm, + alen, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE); + cqm->request_id = htonl (idgen++); + cqm->receiver = *pid; + memcpy (&cqm[1], + address, + alen); + GNUNET_MQ_send (tc->mq, + env); +} + + +/** + * Communicator tells us that our request to create a queue "worked", that + * is setting up the queue is now in process. + * + * @param cls the `struct TransportClient` + * @param cqr confirmation message + */ +static void +handle_queue_create_ok (void *cls, + const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) +{ + struct TransportClient *tc = cls; + + if (CT_COMMUNICATOR != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); return; } - // FIXME: forward suggestion to tc + GNUNET_STATISTICS_update (GST_stats, + "# ATS suggestions succeeded at communicator", + 1, + GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Request #%u for communicator to create queue succeeded\n", + (unsigned int) ntohs (cqr->request_id)); + GNUNET_SERVICE_client_continue (tc->client); +} + + +/** + * Communicator tells us that our request to create a queue failed. This usually + * indicates that the provided address is simply invalid or that the communicator's + * resources are exhausted. + * + * @param cls the `struct TransportClient` + * @param cqr failure message + */ +static void +handle_queue_create_fail (void *cls, + const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) +{ + struct TransportClient *tc = cls; + + if (CT_COMMUNICATOR != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Request #%u for communicator to create queue failed\n", + (unsigned int) ntohs (cqr->request_id)); + GNUNET_STATISTICS_update (GST_stats, + "# ATS suggestions failed in queue creation at communicator", + 1, + GNUNET_NO); + GNUNET_SERVICE_client_continue (tc->client); } @@ -2152,6 +2353,14 @@ GNUNET_SERVICE_MAIN GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG, struct GNUNET_TRANSPORT_IncomingMessage, NULL), + GNUNET_MQ_hd_fixed_size (queue_create_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK, + struct GNUNET_TRANSPORT_CreateQueueResponse, + NULL), + GNUNET_MQ_hd_fixed_size (queue_create_fail, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL, + struct GNUNET_TRANSPORT_CreateQueueResponse, + NULL), GNUNET_MQ_hd_var_size (add_queue_message, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, struct GNUNET_TRANSPORT_AddQueueMessage, diff --git a/src/transport/transport.h b/src/transport/transport.h index 88656a012..00d475e2b 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -871,7 +871,7 @@ struct GNUNET_TRANSPORT_CreateQueue /** - * Transport tells communicator that it wants a new queue. + * Communicator tells transport how queue creation went down. */ struct GNUNET_TRANSPORT_CreateQueueResponse { -- 2.25.1