From 3f945e6798d8d736ceb104b59ea1269a7abdfe8a Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 28 Apr 2019 19:32:10 +0200 Subject: [PATCH] towards flow control in TNG --- src/ats-tests/ats-testing.h | 100 +-- src/core/gnunet-service-core_kx.c | 681 ++++++++---------- src/hostlist/test_gnunet_daemon_hostlist.c | 124 ++-- .../test_gnunet_daemon_hostlist_reconnect.c | 142 ++-- src/include/gnunet_protocols.h | 8 + .../gnunet_transport_communication_service.h | 113 +-- src/include/gnunet_transport_core_service.h | 85 +-- src/include/gnunet_transport_service.h | 232 ++++-- src/namestore/namestore_api_monitor.c | 95 +-- .../gnunet-service-testbed_connectionpool.c | 230 +++--- .../gnunet-service-testbed_connectionpool.h | 56 +- src/transport/gnunet-service-tng.c | 575 +++++++-------- src/transport/gnunet-transport-profiler.c | 232 +++--- src/transport/gnunet-transport.c | 571 +++++++-------- src/transport/transport-testing.h | 201 +++--- src/transport/transport.h | 56 +- src/transport/transport_api2_core.c | 506 +++++-------- src/transport/transport_api_core.c | 260 +++---- 18 files changed, 1983 insertions(+), 2284 deletions(-) diff --git a/src/ats-tests/ats-testing.h b/src/ats-tests/ats-testing.h index 9c4353b52..f6df5a9bd 100644 --- a/src/ats-tests/ats-testing.h +++ b/src/ats-tests/ats-testing.h @@ -28,7 +28,7 @@ #include "gnunet_testbed_service.h" #include "gnunet_ats_service.h" #include "gnunet_core_service.h" -#include "gnunet_transport_core_service.h" +#include "gnunet_transport_service.h" #define TEST_ATS_PREFERENCE_DEFAULT 1.0 @@ -82,10 +82,10 @@ enum GeneratorType * @param masters array of master peers * @param slaves array of master peers */ -typedef void -(*GNUNET_ATS_TEST_TopologySetupDoneCallback) (void *cls, - struct BenchmarkPeer *masters, - struct BenchmarkPeer *slaves); +typedef void (*GNUNET_ATS_TEST_TopologySetupDoneCallback) ( + void *cls, + struct BenchmarkPeer *masters, + struct BenchmarkPeer *slaves); /** * Callback called when logging is required for the data contained @@ -97,13 +97,13 @@ typedef void * @param bandwidth_in bandwidth inbound * @param prop performance information */ -typedef void -(*GNUNET_ATS_TEST_LogRequest) (void *cls, - const struct GNUNET_HELLO_Address *address, - int address_active, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, - const struct GNUNET_ATS_Properties *prop); +typedef void (*GNUNET_ATS_TEST_LogRequest) ( + void *cls, + const struct GNUNET_HELLO_Address *address, + int address_active, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, + const struct GNUNET_ATS_Properties *prop); /** * Information we track for a peer in the testbed. @@ -176,7 +176,7 @@ struct BenchmarkPeer * Masters only * Progress task */ - struct GNUNET_SCHEDULER_Task * ats_task; + struct GNUNET_SCHEDULER_Task *ats_task; /** * Masters only @@ -241,7 +241,7 @@ struct TrafficGenerator long int max_rate; struct GNUNET_TIME_Relative duration_period; - struct GNUNET_SCHEDULER_Task * send_task; + struct GNUNET_SCHEDULER_Task *send_task; struct GNUNET_TIME_Absolute next_ping_transmission; struct GNUNET_TIME_Absolute time_start; }; @@ -264,7 +264,7 @@ struct PreferenceGenerator struct GNUNET_TIME_Relative duration_period; struct GNUNET_TIME_Relative frequency; - struct GNUNET_SCHEDULER_Task * set_task; + struct GNUNET_SCHEDULER_Task *set_task; struct GNUNET_TIME_Absolute next_ping_transmission; struct GNUNET_TIME_Absolute time_start; }; @@ -353,7 +353,6 @@ struct BenchmarkPartner * Current preference values for delay */ double pref_delay; - }; @@ -480,13 +479,12 @@ struct Episode; struct Experiment; -typedef void -(*GNUNET_ATS_TESTING_EpisodeDoneCallback) (struct Episode *e); +typedef void (*GNUNET_ATS_TESTING_EpisodeDoneCallback) (struct Episode *e); -typedef void -(*GNUNET_ATS_TESTING_ExperimentDoneCallback) (struct Experiment *e, - struct GNUNET_TIME_Relative duration, - int success); +typedef void (*GNUNET_ATS_TESTING_ExperimentDoneCallback) ( + struct Experiment *e, + struct GNUNET_TIME_Relative duration, + int success); /** * An operation in an experiment @@ -533,8 +531,8 @@ struct Experiment unsigned int num_episodes; struct Episode *start; - struct GNUNET_SCHEDULER_Task * experiment_timeout_task; - struct GNUNET_SCHEDULER_Task * episode_timeout_task; + struct GNUNET_SCHEDULER_Task *experiment_timeout_task; + struct GNUNET_SCHEDULER_Task *episode_timeout_task; struct Episode *cur; GNUNET_ATS_TESTING_EpisodeDoneCallback ep_done_cb; @@ -552,9 +550,10 @@ extern struct GNUNET_CONFIGURATION_Handle *cfg; * @param e_done_cb the experiment is completed */ void -GNUNET_ATS_TEST_experimentation_run (struct Experiment *e, - GNUNET_ATS_TESTING_EpisodeDoneCallback ep_done_cb, - GNUNET_ATS_TESTING_ExperimentDoneCallback e_done_cb); +GNUNET_ATS_TEST_experimentation_run ( + struct Experiment *e, + GNUNET_ATS_TESTING_EpisodeDoneCallback ep_done_cb, + GNUNET_ATS_TESTING_ExperimentDoneCallback e_done_cb); /** @@ -633,14 +632,15 @@ GNUNET_ATS_TEST_generate_traffic_stop_all (void); * @return the traffic generator */ struct PreferenceGenerator * -GNUNET_ATS_TEST_generate_preferences_start (struct BenchmarkPeer *src, - struct BenchmarkPartner *dest, - enum GeneratorType type, - unsigned int base_value, - unsigned int value_rate, - struct GNUNET_TIME_Relative period, - struct GNUNET_TIME_Relative frequency, - enum GNUNET_ATS_PreferenceKind kind); +GNUNET_ATS_TEST_generate_preferences_start ( + struct BenchmarkPeer *src, + struct BenchmarkPartner *dest, + enum GeneratorType type, + unsigned int base_value, + unsigned int value_rate, + struct GNUNET_TIME_Relative period, + struct GNUNET_TIME_Relative frequency, + enum GNUNET_ATS_PreferenceKind kind); void @@ -664,11 +664,11 @@ GNUNET_ATS_TEST_generate_preferences_stop_all (void); */ struct LoggingHandle * GNUNET_ATS_TEST_logging_start (struct GNUNET_TIME_Relative log_frequency, - const char *testname, - struct BenchmarkPeer *masters, - int num_masters, - int num_slaves, - int verbose); + const char *testname, + struct BenchmarkPeer *masters, + int num_masters, + int num_slaves, + int verbose); /** @@ -729,20 +729,22 @@ GNUNET_ATS_TEST_get_partner (int src, int dest); * @param cfg_file configuration file to use for the peers * @param num_slaves number of slaves * @param num_masters number of masters - * @param test_core connect to CORE service (#GNUNET_YES) or transport (#GNUNET_NO) + * @param test_core connect to CORE service (#GNUNET_YES) or transport + * (#GNUNET_NO) * @param done_cb function to call when topology is setup * @param done_cb_cls cls for callback * @param log_request_cb callback to call when logging is required */ void -GNUNET_ATS_TEST_create_topology (char *name, - char *cfg_file, - unsigned int num_slaves, - unsigned int num_masters, - int test_core, - GNUNET_ATS_TEST_TopologySetupDoneCallback done_cb, - void *done_cb_cls, - GNUNET_ATS_TEST_LogRequest ats_perf_cb); +GNUNET_ATS_TEST_create_topology ( + char *name, + char *cfg_file, + unsigned int num_slaves, + unsigned int num_masters, + int test_core, + GNUNET_ATS_TEST_TopologySetupDoneCallback done_cb, + void *done_cb_cls, + GNUNET_ATS_TEST_LogRequest ats_perf_cb); /** diff --git a/src/core/gnunet-service-core_kx.c b/src/core/gnunet-service-core_kx.c index bfd855285..d226b65e2 100644 --- a/src/core/gnunet-service-core_kx.c +++ b/src/core/gnunet-service-core_kx.c @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -20,7 +20,8 @@ /** * @file core/gnunet-service-core_kx.c - * @brief code for managing the key exchange (SET_KEY, PING, PONG) with other peers + * @brief code for managing the key exchange (SET_KEY, PING, PONG) with other + * peers * @author Christian Grothoff */ #include "platform.h" @@ -28,7 +29,7 @@ #include "gnunet-service-core.h" #include "gnunet-service-core_sessions.h" #include "gnunet_statistics_service.h" -#include "gnunet_transport_core_service.h" +#include "gnunet_transport_service.h" #include "gnunet_constants.h" #include "gnunet_signatures.h" #include "gnunet_protocols.h" @@ -42,22 +43,26 @@ /** * How long do we wait for SET_KEY confirmation initially? */ -#define INITIAL_SET_KEY_RETRY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) +#define INITIAL_SET_KEY_RETRY_FREQUENCY \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) /** * What is the minimum frequency for a PING message? */ -#define MIN_PING_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) +#define MIN_PING_FREQUENCY \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) /** * How often do we rekey? */ -#define REKEY_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12) +#define REKEY_FREQUENCY \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 12) /** * What time difference do we tolerate? */ -#define REKEY_TOLERANCE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) +#define REKEY_TOLERANCE \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) /** * What is the maximum age of a message for us to consider processing @@ -69,7 +74,6 @@ #define MAX_MESSAGE_AGE GNUNET_TIME_UNIT_DAYS - GNUNET_NETWORK_STRUCT_BEGIN /** @@ -120,7 +124,6 @@ struct EphemeralKeyMessage * ephemeral public key). */ struct GNUNET_PeerIdentity origin_identity; - }; @@ -228,7 +231,6 @@ struct EncryptedMessage * (recent messages are caught with the sequence number). */ struct GNUNET_TIME_AbsoluteNBO timestamp; - }; GNUNET_NETWORK_STRUCT_END @@ -237,7 +239,8 @@ GNUNET_NETWORK_STRUCT_END * Number of bytes (at the beginning) of `struct EncryptedMessage` * that are NOT encrypted. */ -#define ENCRYPTED_HEADER_SIZE (offsetof(struct EncryptedMessage, sequence_number)) +#define ENCRYPTED_HEADER_SIZE \ + (offsetof (struct EncryptedMessage, sequence_number)) /** @@ -354,7 +357,6 @@ struct GSC_KeyExchangeInfo * What is our connection status? */ enum GNUNET_CORE_KxState status; - }; @@ -411,8 +413,8 @@ calculate_seed (struct GSC_KeyExchangeInfo *kx) /* Note: may want to make this non-random and instead derive from key material to avoid having an undetectable side-channel */ - return htonl (GNUNET_CRYPTO_random_u32 - (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX)); + return htonl ( + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX)); } @@ -431,9 +433,7 @@ monitor_notify_all (struct GSC_KeyExchangeInfo *kx) msg.state = htonl ((uint32_t) kx->status); msg.peer = *kx->peer; msg.timeout = GNUNET_TIME_absolute_hton (kx->timeout); - GNUNET_notification_context_broadcast (nc, - &msg.header, - GNUNET_NO); + GNUNET_notification_context_broadcast (nc, &msg.header, GNUNET_NO); kx->last_notify_timeout = kx->timeout; } @@ -453,10 +453,8 @@ derive_auth_key (struct GNUNET_CRYPTO_AuthKey *akey, static const char ctx[] = "authentication key"; #if DEBUG_KX struct GNUNET_HashCode sh; - - GNUNET_CRYPTO_hash (skey, - sizeof (*skey), - &sh); + + GNUNET_CRYPTO_hash (skey, sizeof (*skey), &sh); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deriving Auth key from SKEY %s and seed %u\n", GNUNET_h2s (&sh), @@ -464,9 +462,13 @@ derive_auth_key (struct GNUNET_CRYPTO_AuthKey *akey, #endif GNUNET_CRYPTO_hmac_derive_key (akey, skey, - &seed, sizeof (seed), - skey, sizeof (struct GNUNET_CRYPTO_SymmetricSessionKey), - ctx, sizeof (ctx), + &seed, + sizeof (seed), + skey, + sizeof ( + struct GNUNET_CRYPTO_SymmetricSessionKey), + ctx, + sizeof (ctx), NULL); } @@ -488,10 +490,8 @@ derive_iv (struct GNUNET_CRYPTO_SymmetricInitializationVector *iv, static const char ctx[] = "initialization vector"; #if DEBUG_KX struct GNUNET_HashCode sh; - - GNUNET_CRYPTO_hash (skey, - sizeof (*skey), - &sh); + + GNUNET_CRYPTO_hash (skey, sizeof (*skey), &sh); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deriving IV from SKEY %s and seed %u for peer %s\n", GNUNET_h2s (&sh), @@ -500,10 +500,13 @@ derive_iv (struct GNUNET_CRYPTO_SymmetricInitializationVector *iv, #endif GNUNET_CRYPTO_symmetric_derive_iv (iv, skey, - &seed, sizeof (seed), - identity, - sizeof (struct GNUNET_PeerIdentity), ctx, - sizeof (ctx), NULL); + &seed, + sizeof (seed), + identity, + sizeof (struct GNUNET_PeerIdentity), + ctx, + sizeof (ctx), + NULL); } @@ -526,10 +529,8 @@ derive_pong_iv (struct GNUNET_CRYPTO_SymmetricInitializationVector *iv, static const char ctx[] = "pong initialization vector"; #if DEBUG_KX struct GNUNET_HashCode sh; - - GNUNET_CRYPTO_hash (skey, - sizeof (*skey), - &sh); + + GNUNET_CRYPTO_hash (skey, sizeof (*skey), &sh); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deriving PONG IV from SKEY %s and seed %u/%u for %s\n", GNUNET_h2s (&sh), @@ -539,12 +540,15 @@ derive_pong_iv (struct GNUNET_CRYPTO_SymmetricInitializationVector *iv, #endif GNUNET_CRYPTO_symmetric_derive_iv (iv, skey, - &seed, sizeof (seed), - identity, - sizeof (struct GNUNET_PeerIdentity), - &challenge, sizeof (challenge), - ctx, sizeof (ctx), - NULL); + &seed, + sizeof (seed), + identity, + sizeof (struct GNUNET_PeerIdentity), + &challenge, + sizeof (challenge), + ctx, + sizeof (ctx), + NULL); } @@ -558,29 +562,32 @@ derive_pong_iv (struct GNUNET_CRYPTO_SymmetricInitializationVector *iv, */ static void derive_aes_key (const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_PeerIdentity *receiver, - const struct GNUNET_HashCode *key_material, - struct GNUNET_CRYPTO_SymmetricSessionKey *skey) + const struct GNUNET_PeerIdentity *receiver, + const struct GNUNET_HashCode *key_material, + struct GNUNET_CRYPTO_SymmetricSessionKey *skey) { static const char ctx[] = "aes key generation vector"; #if DEBUG_KX struct GNUNET_HashCode sh; - - GNUNET_CRYPTO_hash (skey, - sizeof (*skey), - &sh); + + GNUNET_CRYPTO_hash (skey, sizeof (*skey), &sh); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deriving AES Keys for %s to %s from %s\n", GNUNET_i2s (sender), GNUNET_i2s2 (receiver), GNUNET_h2s (key_material)); #endif - GNUNET_CRYPTO_kdf (skey, sizeof (struct GNUNET_CRYPTO_SymmetricSessionKey), - ctx, sizeof (ctx), - key_material, sizeof (struct GNUNET_HashCode), - sender, sizeof (struct GNUNET_PeerIdentity), - receiver, sizeof (struct GNUNET_PeerIdentity), - NULL); + GNUNET_CRYPTO_kdf (skey, + sizeof (struct GNUNET_CRYPTO_SymmetricSessionKey), + ctx, + sizeof (ctx), + key_material, + sizeof (struct GNUNET_HashCode), + sender, + sizeof (struct GNUNET_PeerIdentity), + receiver, + sizeof (struct GNUNET_PeerIdentity), + NULL); } @@ -607,15 +614,14 @@ do_encrypt (struct GSC_KeyExchangeInfo *kx, GNUNET_break (0); return GNUNET_NO; } - GNUNET_assert (size == - GNUNET_CRYPTO_symmetric_encrypt (in, - (uint16_t) size, - &kx->encrypt_key, - iv, - out)); + GNUNET_assert (size == GNUNET_CRYPTO_symmetric_encrypt (in, + (uint16_t) size, + &kx->encrypt_key, + iv, + out)); GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# bytes encrypted"), - size, + gettext_noop ("# bytes encrypted"), + size, GNUNET_NO); /* the following is too sensitive to write to log files by accident, so we require manual intervention to get this one... */ @@ -625,8 +631,7 @@ do_encrypt (struct GSC_KeyExchangeInfo *kx, (unsigned int) size, GNUNET_i2s (kx->peer), (unsigned int) kx->encrypt_key.crc32, - GNUNET_CRYPTO_crc32_n (iv, - sizeof (iv))); + GNUNET_CRYPTO_crc32_n (iv, sizeof (iv))); #endif return GNUNET_OK; } @@ -656,19 +661,18 @@ do_decrypt (struct GSC_KeyExchangeInfo *kx, GNUNET_break (0); return GNUNET_NO; } - if ( (kx->status != GNUNET_CORE_KX_STATE_KEY_RECEIVED) && - (kx->status != GNUNET_CORE_KX_STATE_UP) && - (kx->status != GNUNET_CORE_KX_STATE_REKEY_SENT) ) + if ((kx->status != GNUNET_CORE_KX_STATE_KEY_RECEIVED) && + (kx->status != GNUNET_CORE_KX_STATE_UP) && + (kx->status != GNUNET_CORE_KX_STATE_REKEY_SENT)) { GNUNET_break_op (0); return GNUNET_SYSERR; } - if (size != - GNUNET_CRYPTO_symmetric_decrypt (in, - (uint16_t) size, - &kx->decrypt_key, - iv, - out)) + if (size != GNUNET_CRYPTO_symmetric_decrypt (in, + (uint16_t) size, + &kx->decrypt_key, + iv, + out)) { GNUNET_break (0); return GNUNET_SYSERR; @@ -685,9 +689,7 @@ do_decrypt (struct GSC_KeyExchangeInfo *kx, (unsigned int) size, GNUNET_i2s (kx->peer), (unsigned int) kx->decrypt_key.crc32, - GNUNET_CRYPTO_crc32_n (iv, - sizeof - (*iv))); + GNUNET_CRYPTO_crc32_n (iv, sizeof (*iv))); #endif return GNUNET_OK; } @@ -713,7 +715,8 @@ set_key_retry_task (void *cls) struct GSC_KeyExchangeInfo *kx = cls; kx->retry_set_key_task = NULL; - kx->set_key_retry_frequency = GNUNET_TIME_STD_BACKOFF (kx->set_key_retry_frequency); + kx->set_key_retry_frequency = + GNUNET_TIME_STD_BACKOFF (kx->set_key_retry_frequency); GNUNET_assert (GNUNET_CORE_KX_STATE_DOWN != kx->status); send_key (kx); } @@ -732,23 +735,20 @@ setup_fresh_ping (struct GSC_KeyExchangeInfo *kx) struct GNUNET_CRYPTO_SymmetricInitializationVector iv; pm = &kx->ping; - kx->ping_challenge = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT32_MAX); + kx->ping_challenge = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); pm->header.size = htons (sizeof (struct PingMessage)); pm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_PING); pm->iv_seed = calculate_seed (kx); - derive_iv (&iv, - &kx->encrypt_key, - pm->iv_seed, - kx->peer); + derive_iv (&iv, &kx->encrypt_key, pm->iv_seed, kx->peer); pp.challenge = kx->ping_challenge; pp.target = *kx->peer; do_encrypt (kx, - &iv, - &pp.target, - &pm->target, - sizeof (struct PingMessage) - ((void *) &pm->target - - (void *) pm)); + &iv, + &pp.target, + &pm->target, + sizeof (struct PingMessage) - + ((void *) &pm->target - (void *) pm)); } @@ -764,8 +764,7 @@ setup_fresh_ping (struct GSC_KeyExchangeInfo *kx) * #GNUNET_SYSERR to stop further processing with error */ static int -deliver_message (void *cls, - const struct GNUNET_MessageHeader *m) +deliver_message (void *cls, const struct GNUNET_MessageHeader *m) { struct GSC_KeyExchangeInfo *kx = cls; @@ -816,38 +815,32 @@ deliver_message (void *cls, static void * handle_transport_notify_connect (void *cls, const struct GNUNET_PeerIdentity *pid, - struct GNUNET_MQ_Handle *mq) + struct GNUNET_MQ_Handle *mq) { struct GSC_KeyExchangeInfo *kx; struct GNUNET_HashCode h1; struct GNUNET_HashCode h2; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Initiating key exchange with `%s'\n", + "Initiating key exchange with `%s'\n", GNUNET_i2s (pid)); GNUNET_STATISTICS_update (GSC_stats, gettext_noop ("# key exchanges initiated"), 1, GNUNET_NO); kx = GNUNET_new (struct GSC_KeyExchangeInfo); - kx->mst = GNUNET_MST_create (&deliver_message, - kx); + kx->mst = GNUNET_MST_create (&deliver_message, kx); kx->mq = mq; kx->peer = pid; kx->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY; - GNUNET_CONTAINER_DLL_insert (kx_head, - kx_tail, - kx); + GNUNET_CONTAINER_DLL_insert (kx_head, kx_tail, kx); kx->status = GNUNET_CORE_KX_STATE_KEY_SENT; monitor_notify_all (kx); - GNUNET_CRYPTO_hash (pid, - sizeof (struct GNUNET_PeerIdentity), - &h1); + GNUNET_CRYPTO_hash (pid, sizeof (struct GNUNET_PeerIdentity), &h1); GNUNET_CRYPTO_hash (&GSC_my_identity, sizeof (struct GNUNET_PeerIdentity), &h2); - if (0 < GNUNET_CRYPTO_hash_cmp (&h1, - &h2)) + if (0 < GNUNET_CRYPTO_hash_cmp (&h1, &h2)) { /* peer with "lower" identity starts KX, otherwise we typically end up with both peers starting the exchange and transmit the 'set key' @@ -858,10 +851,10 @@ handle_transport_notify_connect (void *cls, { /* peer with "higher" identity starts a delayed KX, if the "lower" peer * does not start a KX since it sees no reasons to do so */ - kx->retry_set_key_task - = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, - &set_key_retry_task, - kx); + kx->retry_set_key_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &set_key_retry_task, + kx); } return kx; } @@ -879,7 +872,7 @@ handle_transport_notify_connect (void *cls, static void handle_transport_notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer, - void *handler_cls) + void *handler_cls) { struct GSC_KeyExchangeInfo *kx = handler_cls; @@ -888,9 +881,9 @@ handle_transport_notify_disconnect (void *cls, GNUNET_i2s (peer)); GSC_SESSIONS_end (kx->peer); GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# key exchanges stopped"), + gettext_noop ("# key exchanges stopped"), 1, - GNUNET_NO); + GNUNET_NO); if (NULL != kx->retry_set_key_task) { GNUNET_SCHEDULER_cancel (kx->retry_set_key_task); @@ -903,9 +896,7 @@ handle_transport_notify_disconnect (void *cls, } kx->status = GNUNET_CORE_KX_PEER_DISCONNECT; monitor_notify_all (kx); - GNUNET_CONTAINER_DLL_remove (kx_head, - kx_tail, - kx); + GNUNET_CONTAINER_DLL_remove (kx_head, kx_tail, kx); GNUNET_MST_destroy (kx->mst); GNUNET_free (kx); } @@ -926,8 +917,7 @@ send_ping (struct GSC_KeyExchangeInfo *kx) 1, GNUNET_NO); env = GNUNET_MQ_msg_copy (&kx->ping.header); - GNUNET_MQ_send (kx->mq, - env); + GNUNET_MQ_send (kx->mq, env); } @@ -941,22 +931,15 @@ derive_session_keys (struct GSC_KeyExchangeInfo *kx) { struct GNUNET_HashCode key_material; - if (GNUNET_OK != - GNUNET_CRYPTO_ecc_ecdh (my_ephemeral_key, - &kx->other_ephemeral_key, - &key_material)) + if (GNUNET_OK != GNUNET_CRYPTO_ecc_ecdh (my_ephemeral_key, + &kx->other_ephemeral_key, + &key_material)) { GNUNET_break (0); return; } - derive_aes_key (&GSC_my_identity, - kx->peer, - &key_material, - &kx->encrypt_key); - derive_aes_key (kx->peer, - &GSC_my_identity, - &key_material, - &kx->decrypt_key); + derive_aes_key (&GSC_my_identity, kx->peer, &key_material, &kx->encrypt_key); + derive_aes_key (kx->peer, &GSC_my_identity, &key_material, &kx->decrypt_key); memset (&key_material, 0, sizeof (key_material)); /* fresh key, reset sequence numbers */ kx->last_sequence_number_received = 0; @@ -973,8 +956,7 @@ derive_session_keys (struct GSC_KeyExchangeInfo *kx) * @param m the set key message we received */ static void -handle_ephemeral_key (void *cls, - const struct EphemeralKeyMessage *m) +handle_ephemeral_key (void *cls, const struct EphemeralKeyMessage *m) { struct GSC_KeyExchangeInfo *kx = cls; struct GNUNET_TIME_Absolute start_t; @@ -983,14 +965,14 @@ handle_ephemeral_key (void *cls, enum GNUNET_CORE_KxState sender_status; end_t = GNUNET_TIME_absolute_ntoh (m->expiration_time); - if ( ( (GNUNET_CORE_KX_STATE_KEY_RECEIVED == kx->status) || - (GNUNET_CORE_KX_STATE_UP == kx->status) || - (GNUNET_CORE_KX_STATE_REKEY_SENT == kx->status) ) && - (end_t.abs_value_us < kx->foreign_key_expires.abs_value_us) ) + if (((GNUNET_CORE_KX_STATE_KEY_RECEIVED == kx->status) || + (GNUNET_CORE_KX_STATE_UP == kx->status) || + (GNUNET_CORE_KX_STATE_REKEY_SENT == kx->status)) && + (end_t.abs_value_us < kx->foreign_key_expires.abs_value_us)) { GNUNET_STATISTICS_update (GSC_stats, gettext_noop ("# old ephemeral keys ignored"), - 1, + 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received expired EPHEMERAL_KEY from %s\n", @@ -1002,18 +984,18 @@ handle_ephemeral_key (void *cls, sizeof (m->ephemeral_key))) { GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# duplicate ephemeral keys ignored"), - 1, + gettext_noop ( + "# duplicate ephemeral keys ignored"), + 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Ignoring duplicate EPHEMERAL_KEY from %s\n", GNUNET_i2s (&m->origin_identity)); return; } - if (0 != - memcmp (&m->origin_identity, - kx->peer, - sizeof (struct GNUNET_PeerIdentity))) + if (0 != memcmp (&m->origin_identity, + kx->peer, + sizeof (struct GNUNET_PeerIdentity))) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received EPHEMERAL_KEY from %s, but expected %s\n", @@ -1024,10 +1006,10 @@ handle_ephemeral_key (void *cls, } if ((ntohl (m->purpose.size) != sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose) + - sizeof (struct GNUNET_TIME_AbsoluteNBO) + - sizeof (struct GNUNET_TIME_AbsoluteNBO) + - sizeof (struct GNUNET_CRYPTO_EddsaPublicKey) + - sizeof (struct GNUNET_CRYPTO_EddsaPublicKey)) || + sizeof (struct GNUNET_TIME_AbsoluteNBO) + + sizeof (struct GNUNET_TIME_AbsoluteNBO) + + sizeof (struct GNUNET_CRYPTO_EddsaPublicKey) + + sizeof (struct GNUNET_CRYPTO_EddsaPublicKey)) || (GNUNET_OK != GNUNET_CRYPTO_eddsa_verify (GNUNET_SIGNATURE_PURPOSE_SET_ECC_KEY, &m->purpose, @@ -1037,7 +1019,8 @@ handle_ephemeral_key (void *cls, /* invalid signature */ GNUNET_break_op (0); GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# EPHEMERAL_KEYs rejected (bad signature)"), + gettext_noop ( + "# EPHEMERAL_KEYs rejected (bad signature)"), 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -1047,17 +1030,22 @@ handle_ephemeral_key (void *cls, } now = GNUNET_TIME_absolute_get (); start_t = GNUNET_TIME_absolute_ntoh (m->creation_time); - if ( (end_t.abs_value_us < GNUNET_TIME_absolute_subtract (now, REKEY_TOLERANCE).abs_value_us) || - (start_t.abs_value_us > GNUNET_TIME_absolute_add (now, REKEY_TOLERANCE).abs_value_us) ) + if ((end_t.abs_value_us < + GNUNET_TIME_absolute_subtract (now, REKEY_TOLERANCE).abs_value_us) || + (start_t.abs_value_us > + GNUNET_TIME_absolute_add (now, REKEY_TOLERANCE).abs_value_us)) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("EPHEMERAL_KEY from peer `%s' rejected as its validity range does not match our system time (%llu not in [%llu,%llu]).\n"), - GNUNET_i2s (kx->peer), - (unsigned long long) now.abs_value_us, - (unsigned long long) start_t.abs_value_us, - (unsigned long long) end_t.abs_value_us); + GNUNET_log ( + GNUNET_ERROR_TYPE_WARNING, + _ ( + "EPHEMERAL_KEY from peer `%s' rejected as its validity range does not match our system time (%llu not in [%llu,%llu]).\n"), + GNUNET_i2s (kx->peer), + (unsigned long long) now.abs_value_us, + (unsigned long long) start_t.abs_value_us, + (unsigned long long) end_t.abs_value_us); GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# EPHEMERAL_KEY messages rejected due to time"), + gettext_noop ( + "# EPHEMERAL_KEY messages rejected due to time"), 1, GNUNET_NO); return; @@ -1066,9 +1054,7 @@ handle_ephemeral_key (void *cls, { struct GNUNET_HashCode eh; - GNUNET_CRYPTO_hash (&m->ephemeral_key, - sizeof (m->ephemeral_key), - &eh); + GNUNET_CRYPTO_hash (&m->ephemeral_key, sizeof (m->ephemeral_key), &eh); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received valid EPHEMERAL_KEY `%s' from `%s' in state %d.\n", GNUNET_h2s (&eh), @@ -1166,8 +1152,7 @@ handle_ephemeral_key (void *cls, * @param m the encrypted PING message itself */ static void -handle_ping (void *cls, - const struct PingMessage *m) +handle_ping (void *cls, const struct PingMessage *m) { struct GSC_KeyExchangeInfo *kx = cls; struct PingMessage t; @@ -1180,39 +1165,34 @@ handle_ping (void *cls, gettext_noop ("# PING messages received"), 1, GNUNET_NO); - if ( (kx->status != GNUNET_CORE_KX_STATE_KEY_RECEIVED) && - (kx->status != GNUNET_CORE_KX_STATE_UP) && - (kx->status != GNUNET_CORE_KX_STATE_REKEY_SENT)) + if ((kx->status != GNUNET_CORE_KX_STATE_KEY_RECEIVED) && + (kx->status != GNUNET_CORE_KX_STATE_UP) && + (kx->status != GNUNET_CORE_KX_STATE_REKEY_SENT)) { /* ignore */ GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# PING messages dropped (out of order)"), + gettext_noop ( + "# PING messages dropped (out of order)"), 1, - GNUNET_NO); + GNUNET_NO); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives PING request from `%s'.\n", GNUNET_i2s (kx->peer)); - derive_iv (&iv, - &kx->decrypt_key, - m->iv_seed, - &GSC_my_identity); - if (GNUNET_OK != - do_decrypt (kx, - &iv, - &m->target, - &t.target, - sizeof (struct PingMessage) - ((void *) &m->target - - (void *) m))) + derive_iv (&iv, &kx->decrypt_key, m->iv_seed, &GSC_my_identity); + if (GNUNET_OK != do_decrypt (kx, + &iv, + &m->target, + &t.target, + sizeof (struct PingMessage) - + ((void *) &m->target - (void *) m))) { GNUNET_break_op (0); return; } if (0 != - memcmp (&t.target, - &GSC_my_identity, - sizeof (struct GNUNET_PeerIdentity))) + memcmp (&t.target, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity))) { if (GNUNET_CORE_KX_STATE_REKEY_SENT != kx->status) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -1220,9 +1200,10 @@ handle_ping (void *cls, GNUNET_i2s (kx->peer), GNUNET_i2s2 (&t.target)); else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Decryption of PING from peer `%s' failed after rekey (harmless)\n", - GNUNET_i2s (kx->peer)); + GNUNET_log ( + GNUNET_ERROR_TYPE_DEBUG, + "Decryption of PING from peer `%s' failed after rekey (harmless)\n", + GNUNET_i2s (kx->peer)); GNUNET_break_op (0); return; } @@ -1230,26 +1211,20 @@ handle_ping (void *cls, tx.reserved = 0; tx.challenge = t.challenge; tx.target = t.target; - env = GNUNET_MQ_msg (tp, - GNUNET_MESSAGE_TYPE_CORE_PONG); + env = GNUNET_MQ_msg (tp, GNUNET_MESSAGE_TYPE_CORE_PONG); tp->iv_seed = calculate_seed (kx); - derive_pong_iv (&iv, - &kx->encrypt_key, - tp->iv_seed, - t.challenge, - kx->peer); + derive_pong_iv (&iv, &kx->encrypt_key, tp->iv_seed, t.challenge, kx->peer); do_encrypt (kx, &iv, &tx.challenge, &tp->challenge, - sizeof (struct PongMessage) - ((void *) &tp->challenge - - (void *) tp)); + sizeof (struct PongMessage) - + ((void *) &tp->challenge - (void *) tp)); GNUNET_STATISTICS_update (GSC_stats, gettext_noop ("# PONG messages created"), 1, GNUNET_NO); - GNUNET_MQ_send (kx->mq, - env); + GNUNET_MQ_send (kx->mq, env); } @@ -1289,13 +1264,10 @@ send_keep_alive (void *cls) GNUNET_NO); setup_fresh_ping (kx); send_ping (kx); - retry = - GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2), - MIN_PING_FREQUENCY); + retry = GNUNET_TIME_relative_max (GNUNET_TIME_relative_divide (left, 2), + MIN_PING_FREQUENCY); kx->keep_alive_task = - GNUNET_SCHEDULER_add_delayed (retry, - &send_keep_alive, - kx); + GNUNET_SCHEDULER_add_delayed (retry, &send_keep_alive, kx); } @@ -1312,10 +1284,9 @@ update_timeout (struct GSC_KeyExchangeInfo *kx) struct GNUNET_TIME_Relative delta; kx->timeout = - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - delta = GNUNET_TIME_absolute_get_difference (kx->last_notify_timeout, - kx->timeout); + GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + delta = + GNUNET_TIME_absolute_get_difference (kx->last_notify_timeout, kx->timeout); if (delta.rel_value_us > 5LL * 1000LL * 1000LL) { /* we only notify monitors about timeout changes if those @@ -1324,12 +1295,10 @@ update_timeout (struct GSC_KeyExchangeInfo *kx) } if (NULL != kx->keep_alive_task) GNUNET_SCHEDULER_cancel (kx->keep_alive_task); - kx->keep_alive_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide - (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - 2), - &send_keep_alive, - kx); + kx->keep_alive_task = GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_divide (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, 2), + &send_keep_alive, + kx); } @@ -1340,8 +1309,7 @@ update_timeout (struct GSC_KeyExchangeInfo *kx) * @param m the encrypted PONG message itself */ static void -handle_pong (void *cls, - const struct PongMessage *m) +handle_pong (void *cls, const struct PongMessage *m) { struct GSC_KeyExchangeInfo *kx = cls; struct PongMessage t; @@ -1355,13 +1323,17 @@ handle_pong (void *cls, { case GNUNET_CORE_KX_STATE_DOWN: GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# PONG messages dropped (connection down)"), 1, - GNUNET_NO); + gettext_noop ( + "# PONG messages dropped (connection down)"), + 1, + GNUNET_NO); return; case GNUNET_CORE_KX_STATE_KEY_SENT: GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# PONG messages dropped (out of order)"), 1, - GNUNET_NO); + gettext_noop ( + "# PONG messages dropped (out of order)"), + 1, + GNUNET_NO); return; case GNUNET_CORE_KX_STATE_KEY_RECEIVED: break; @@ -1383,13 +1355,12 @@ handle_pong (void *cls, m->iv_seed, kx->ping_challenge, &GSC_my_identity); - if (GNUNET_OK != - do_decrypt (kx, - &iv, - &m->challenge, - &t.challenge, - sizeof (struct PongMessage) - ((void *) &m->challenge - - (void *) m))) + if (GNUNET_OK != do_decrypt (kx, + &iv, + &m->challenge, + &t.challenge, + sizeof (struct PongMessage) - + ((void *) &m->challenge - (void *) m))) { GNUNET_break_op (0); return; @@ -1398,9 +1369,8 @@ handle_pong (void *cls, gettext_noop ("# PONG messages decrypted"), 1, GNUNET_NO); - if ((0 != memcmp (&t.target, - kx->peer, - sizeof (struct GNUNET_PeerIdentity))) || + if ((0 != + memcmp (&t.target, kx->peer, sizeof (struct GNUNET_PeerIdentity))) || (kx->ping_challenge != t.challenge)) { /* PONG malformed */ @@ -1426,14 +1396,15 @@ handle_pong (void *cls, switch (kx->status) { case GNUNET_CORE_KX_STATE_DOWN: - GNUNET_assert (0); /* should be impossible */ + GNUNET_assert (0); /* should be impossible */ return; case GNUNET_CORE_KX_STATE_KEY_SENT: - GNUNET_assert (0); /* should be impossible */ + GNUNET_assert (0); /* should be impossible */ return; case GNUNET_CORE_KX_STATE_KEY_RECEIVED: GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# session keys confirmed via PONG"), + gettext_noop ( + "# session keys confirmed via PONG"), 1, GNUNET_NO); kx->status = GNUNET_CORE_KX_STATE_UP; @@ -1451,7 +1422,8 @@ handle_pong (void *cls, break; case GNUNET_CORE_KX_STATE_REKEY_SENT: GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# rekey operations confirmed via PONG"), + gettext_noop ( + "# rekey operations confirmed via PONG"), 1, GNUNET_NO); kx->status = GNUNET_CORE_KX_STATE_UP; @@ -1478,8 +1450,8 @@ send_key (struct GSC_KeyExchangeInfo *kx) GNUNET_assert (GNUNET_CORE_KX_STATE_DOWN != kx->status); if (NULL != kx->retry_set_key_task) { - GNUNET_SCHEDULER_cancel (kx->retry_set_key_task); - kx->retry_set_key_task = NULL; + GNUNET_SCHEDULER_cancel (kx->retry_set_key_task); + kx->retry_set_key_task = NULL; } /* always update sender status in SET KEY message */ #if DEBUG_KX @@ -1498,14 +1470,13 @@ send_key (struct GSC_KeyExchangeInfo *kx) #endif current_ekm.sender_status = htonl ((int32_t) (kx->status)); env = GNUNET_MQ_msg_copy (¤t_ekm.header); - GNUNET_MQ_send (kx->mq, - env); + GNUNET_MQ_send (kx->mq, env); if (GNUNET_CORE_KX_STATE_KEY_SENT != kx->status) send_ping (kx); kx->retry_set_key_task = - GNUNET_SCHEDULER_add_delayed (kx->set_key_retry_frequency, - &set_key_retry_task, - kx); + GNUNET_SCHEDULER_add_delayed (kx->set_key_retry_frequency, + &set_key_retry_task, + kx); } @@ -1522,9 +1493,9 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx, size_t payload_size) { size_t used = payload_size + sizeof (struct EncryptedMessage); - char pbuf[used]; /* plaintext */ - struct EncryptedMessage *em; /* encrypted message */ - struct EncryptedMessage *ph; /* plaintext header */ + char pbuf[used]; /* plaintext */ + struct EncryptedMessage *em; /* encrypted message */ + struct EncryptedMessage *ph; /* plaintext header */ struct GNUNET_MQ_Envelope *env; struct GNUNET_CRYPTO_SymmetricInitializationVector iv; struct GNUNET_CRYPTO_AuthKey auth_key; @@ -1534,23 +1505,17 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx, ph->iv_seed = calculate_seed (kx); ph->reserved = 0; ph->timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - GNUNET_memcpy (&ph[1], - payload, - payload_size); + GNUNET_memcpy (&ph[1], payload, payload_size); env = GNUNET_MQ_msg_extra (em, - payload_size, - GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE); + payload_size, + GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE); em->iv_seed = ph->iv_seed; - derive_iv (&iv, - &kx->encrypt_key, - ph->iv_seed, - kx->peer); - GNUNET_assert (GNUNET_OK == - do_encrypt (kx, - &iv, - &ph->sequence_number, - &em->sequence_number, - used - ENCRYPTED_HEADER_SIZE)); + derive_iv (&iv, &kx->encrypt_key, ph->iv_seed, kx->peer); + GNUNET_assert (GNUNET_OK == do_encrypt (kx, + &iv, + &ph->sequence_number, + &em->sequence_number, + used - ENCRYPTED_HEADER_SIZE)); #if DEBUG_KX { struct GNUNET_HashCode hc; @@ -1565,9 +1530,7 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx, GNUNET_i2s (kx->peer)); } #endif - derive_auth_key (&auth_key, - &kx->encrypt_key, - ph->iv_seed); + derive_auth_key (&auth_key, &kx->encrypt_key, ph->iv_seed); GNUNET_CRYPTO_hmac (&auth_key, &em->sequence_number, used - ENCRYPTED_HEADER_SIZE, @@ -1576,9 +1539,7 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx, { struct GNUNET_HashCode hc; - GNUNET_CRYPTO_hash (&auth_key, - sizeof (auth_key), - &hc); + GNUNET_CRYPTO_hash (&auth_key, sizeof (auth_key), &hc); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "For peer %s, used AC %s to create hmac %s\n", GNUNET_i2s (kx->peer), @@ -1587,8 +1548,7 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx, } #endif kx->has_excess_bandwidth = GNUNET_NO; - GNUNET_MQ_send (kx->mq, - env); + GNUNET_MQ_send (kx->mq, env); } @@ -1601,8 +1561,7 @@ GSC_KX_encrypt_and_transmit (struct GSC_KeyExchangeInfo *kx, * @return #GNUNET_OK if @a msg is well-formed (size-wise) */ static int -check_encrypted (void *cls, - const struct EncryptedMessage *m) +check_encrypted (void *cls, const struct EncryptedMessage *m) { uint16_t size = ntohs (m->header.size) - sizeof (*m); @@ -1623,11 +1582,10 @@ check_encrypted (void *cls, * @param m encrypted message */ static void -handle_encrypted (void *cls, - const struct EncryptedMessage *m) +handle_encrypted (void *cls, const struct EncryptedMessage *m) { struct GSC_KeyExchangeInfo *kx = cls; - struct EncryptedMessage *pt; /* plaintext */ + struct EncryptedMessage *pt; /* plaintext */ struct GNUNET_HashCode ph; uint32_t snum; struct GNUNET_TIME_Absolute t; @@ -1639,19 +1597,25 @@ handle_encrypted (void *cls, if (GNUNET_CORE_KX_STATE_UP != kx->status) { GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# DATA message dropped (out of order)"), + gettext_noop ( + "# DATA message dropped (out of order)"), 1, GNUNET_NO); return; } - if (0 == GNUNET_TIME_absolute_get_remaining (kx->foreign_key_expires).rel_value_us) + if (0 == + GNUNET_TIME_absolute_get_remaining (kx->foreign_key_expires).rel_value_us) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Session to peer `%s' went down due to key expiration (should not happen)\n"), - GNUNET_i2s (kx->peer)); + GNUNET_log ( + GNUNET_ERROR_TYPE_WARNING, + _ ( + "Session to peer `%s' went down due to key expiration (should not happen)\n"), + GNUNET_i2s (kx->peer)); GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# sessions terminated by key expiration"), - 1, GNUNET_NO); + gettext_noop ( + "# sessions terminated by key expiration"), + 1, + GNUNET_NO); GSC_SESSIONS_end (kx->peer); if (NULL != kx->keep_alive_task) { @@ -1669,9 +1633,7 @@ handle_encrypted (void *cls, { struct GNUNET_HashCode hc; - GNUNET_CRYPTO_hash (&m->sequence_number, - size - ENCRYPTED_HEADER_SIZE, - &hc); + GNUNET_CRYPTO_hash (&m->sequence_number, size - ENCRYPTED_HEADER_SIZE, &hc); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received encrypted payload `%s' of %u bytes from %s\n", GNUNET_h2s (&hc), @@ -1679,9 +1641,7 @@ handle_encrypted (void *cls, GNUNET_i2s (kx->peer)); } #endif - derive_auth_key (&auth_key, - &kx->decrypt_key, - m->iv_seed); + derive_auth_key (&auth_key, &kx->decrypt_key, m->iv_seed); GNUNET_CRYPTO_hmac (&auth_key, &m->sequence_number, size - ENCRYPTED_HEADER_SIZE, @@ -1690,9 +1650,7 @@ handle_encrypted (void *cls, { struct GNUNET_HashCode hc; - GNUNET_CRYPTO_hash (&auth_key, - sizeof (auth_key), - &hc); + GNUNET_CRYPTO_hash (&auth_key, sizeof (auth_key), &hc); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "For peer %s, used AC %s to verify hmac %s\n", GNUNET_i2s (kx->peer), @@ -1700,27 +1658,21 @@ handle_encrypted (void *cls, GNUNET_h2s2 (&m->hmac)); } #endif - if (0 != memcmp (&ph, - &m->hmac, - sizeof (struct GNUNET_HashCode))) + if (0 != memcmp (&ph, &m->hmac, sizeof (struct GNUNET_HashCode))) { /* checksum failed */ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed checksum validation for a message from `%s'\n", - GNUNET_i2s (kx->peer)); + "Failed checksum validation for a message from `%s'\n", + GNUNET_i2s (kx->peer)); return; } - derive_iv (&iv, - &kx->decrypt_key, - m->iv_seed, - &GSC_my_identity); + derive_iv (&iv, &kx->decrypt_key, m->iv_seed, &GSC_my_identity); /* decrypt */ - if (GNUNET_OK != - do_decrypt (kx, - &iv, - &m->sequence_number, - &buf[ENCRYPTED_HEADER_SIZE], - size - ENCRYPTED_HEADER_SIZE)) + if (GNUNET_OK != do_decrypt (kx, + &iv, + &m->sequence_number, + &buf[ENCRYPTED_HEADER_SIZE], + size - ENCRYPTED_HEADER_SIZE)) { GNUNET_break_op (0); return; @@ -1751,8 +1703,9 @@ handle_encrypted (void *cls, "Received ancient out of sequence message, ignoring.\n"); /* ancient out of sequence, ignore */ GNUNET_STATISTICS_update (GSC_stats, - gettext_noop - ("# bytes dropped (out of sequence)"), size, + gettext_noop ( + "# bytes dropped (out of sequence)"), + size, GNUNET_NO); return; } @@ -1766,7 +1719,8 @@ handle_encrypted (void *cls, "Received duplicate message, ignoring.\n"); GNUNET_STATISTICS_update (GSC_stats, gettext_noop ("# bytes dropped (duplicates)"), - size, GNUNET_NO); + size, + GNUNET_NO); /* duplicate, ignore */ return; } @@ -1790,11 +1744,13 @@ handle_encrypted (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Message received far too old (%s). Content ignored.\n", - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (t), - GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_duration (t), + GNUNET_YES)); GNUNET_STATISTICS_update (GSC_stats, - gettext_noop - ("# bytes dropped (ancient message)"), size, + gettext_noop ( + "# bytes dropped (ancient message)"), + size, GNUNET_NO); return; } @@ -1825,7 +1781,7 @@ handle_encrypted (void *cls, static void handle_transport_notify_excess_bw (void *cls, const struct GNUNET_PeerIdentity *pid, - void *connect_cls) + void *connect_cls) { struct GSC_KeyExchangeInfo *kx = connect_cls; @@ -1848,31 +1804,34 @@ sign_ephemeral_key () current_ekm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY); current_ekm.sender_status = 0; /* to be set later */ current_ekm.purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_SET_ECC_KEY); - current_ekm.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose) + - sizeof (struct GNUNET_TIME_AbsoluteNBO) + - sizeof (struct GNUNET_TIME_AbsoluteNBO) + - sizeof (struct GNUNET_CRYPTO_EcdhePublicKey) + - sizeof (struct GNUNET_PeerIdentity)); - current_ekm.creation_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - if (GNUNET_YES == - GNUNET_CONFIGURATION_get_value_yesno (GSC_cfg, - "core", - "USE_EPHEMERAL_KEYS")) + current_ekm.purpose.size = + htonl (sizeof (struct GNUNET_CRYPTO_EccSignaturePurpose) + + sizeof (struct GNUNET_TIME_AbsoluteNBO) + + sizeof (struct GNUNET_TIME_AbsoluteNBO) + + sizeof (struct GNUNET_CRYPTO_EcdhePublicKey) + + sizeof (struct GNUNET_PeerIdentity)); + current_ekm.creation_time = + GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); + if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno (GSC_cfg, + "core", + "USE_EPHEMERAL_KEYS")) { - current_ekm.expiration_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_add (REKEY_FREQUENCY, - REKEY_TOLERANCE))); + current_ekm.expiration_time = + GNUNET_TIME_absolute_hton (GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_relative_add (REKEY_FREQUENCY, REKEY_TOLERANCE))); } else { - current_ekm.expiration_time = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_FOREVER_ABS); + current_ekm.expiration_time = + GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_FOREVER_ABS); } GNUNET_CRYPTO_ecdhe_key_get_public (my_ephemeral_key, ¤t_ekm.ephemeral_key); current_ekm.origin_identity = GSC_my_identity; GNUNET_assert (GNUNET_OK == - GNUNET_CRYPTO_eddsa_sign (my_private_key, + GNUNET_CRYPTO_eddsa_sign (my_private_key, ¤t_ekm.purpose, - ¤t_ekm.signature)); + ¤t_ekm.signature)); } @@ -1886,9 +1845,7 @@ do_rekey (void *cls) { struct GSC_KeyExchangeInfo *pos; - rekey_task = GNUNET_SCHEDULER_add_delayed (REKEY_FREQUENCY, - &do_rekey, - NULL); + rekey_task = GNUNET_SCHEDULER_add_delayed (REKEY_FREQUENCY, &do_rekey, NULL); if (NULL != my_ephemeral_key) GNUNET_free (my_ephemeral_key); my_ephemeral_key = GNUNET_CRYPTO_ecdhe_key_create (); @@ -1900,9 +1857,7 @@ do_rekey (void *cls) GNUNET_CRYPTO_hash (¤t_ekm.ephemeral_key, sizeof (current_ekm.ephemeral_key), &eh); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Rekeying to %s\n", - GNUNET_h2s (&eh)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Rekeying to %s\n", GNUNET_h2s (&eh)); } for (pos = kx_head; NULL != pos; pos = pos->next) { @@ -1932,25 +1887,24 @@ do_rekey (void *cls) int GSC_KX_init (struct GNUNET_CRYPTO_EddsaPrivateKey *pk) { - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_fixed_size (ephemeral_key, - GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY, - struct EphemeralKeyMessage, - NULL), - GNUNET_MQ_hd_fixed_size (ping, - GNUNET_MESSAGE_TYPE_CORE_PING, - struct PingMessage, - NULL), - GNUNET_MQ_hd_fixed_size (pong, - GNUNET_MESSAGE_TYPE_CORE_PONG, - struct PongMessage, - NULL), - GNUNET_MQ_hd_var_size (encrypted, - GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE, - struct EncryptedMessage, - NULL), - GNUNET_MQ_handler_end() - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_fixed_size (ephemeral_key, + GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY, + struct EphemeralKeyMessage, + NULL), + GNUNET_MQ_hd_fixed_size (ping, + GNUNET_MESSAGE_TYPE_CORE_PING, + struct PingMessage, + NULL), + GNUNET_MQ_hd_fixed_size (pong, + GNUNET_MESSAGE_TYPE_CORE_PONG, + struct PongMessage, + NULL), + GNUNET_MQ_hd_var_size (encrypted, + GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE, + struct EncryptedMessage, + NULL), + GNUNET_MQ_handler_end ()}; my_private_key = pk; GNUNET_CRYPTO_eddsa_key_get_public (my_private_key, @@ -1976,17 +1930,15 @@ GSC_KX_init (struct GNUNET_CRYPTO_EddsaPrivateKey *pk) } nc = GNUNET_notification_context_create (1); - rekey_task = GNUNET_SCHEDULER_add_delayed (REKEY_FREQUENCY, - &do_rekey, - NULL); - transport - = GNUNET_TRANSPORT_core_connect (GSC_cfg, - &GSC_my_identity, - handlers, - NULL, - &handle_transport_notify_connect, - &handle_transport_notify_disconnect, - &handle_transport_notify_excess_bw); + rekey_task = GNUNET_SCHEDULER_add_delayed (REKEY_FREQUENCY, &do_rekey, NULL); + transport = + GNUNET_TRANSPORT_core_connect (GSC_cfg, + &GSC_my_identity, + handlers, + NULL, + &handle_transport_notify_connect, + &handle_transport_notify_disconnect, + &handle_transport_notify_excess_bw); if (NULL == transport) { GSC_KX_done (); @@ -2030,7 +1982,7 @@ GSC_KX_done () } - /** +/** * Check how many messages are queued for the given neighbour. * * @param kxinfo data about neighbour to check @@ -2071,27 +2023,22 @@ GSC_KX_handle_client_monitor_peers (struct GNUNET_MQ_Handle *mq) struct MonitorNotifyMessage *done_msg; struct GSC_KeyExchangeInfo *kx; - GNUNET_notification_context_add (nc, - mq); + GNUNET_notification_context_add (nc, mq); for (kx = kx_head; NULL != kx; kx = kx->next) { struct GNUNET_MQ_Envelope *env; struct MonitorNotifyMessage *msg; - env = GNUNET_MQ_msg (msg, - GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY); + env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY); msg->state = htonl ((uint32_t) kx->status); msg->peer = *kx->peer; msg->timeout = GNUNET_TIME_absolute_hton (kx->timeout); - GNUNET_MQ_send (mq, - env); + GNUNET_MQ_send (mq, env); } - env = GNUNET_MQ_msg (done_msg, - GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY); + env = GNUNET_MQ_msg (done_msg, GNUNET_MESSAGE_TYPE_CORE_MONITOR_NOTIFY); done_msg->state = htonl ((uint32_t) GNUNET_CORE_KX_ITERATION_FINISHED); done_msg->timeout = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_FOREVER_ABS); - GNUNET_MQ_send (mq, - env); + GNUNET_MQ_send (mq, env); } diff --git a/src/hostlist/test_gnunet_daemon_hostlist.c b/src/hostlist/test_gnunet_daemon_hostlist.c index 1bcf4e86a..557b91d1c 100644 --- a/src/hostlist/test_gnunet_daemon_hostlist.c +++ b/src/hostlist/test_gnunet_daemon_hostlist.c @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -26,7 +26,6 @@ #include "gnunet_util_lib.h" #include "gnunet_arm_service.h" #include "gnunet_transport_service.h" -#include "gnunet_transport_core_service.h" #include "gnunet_transport_hello_service.h" @@ -103,26 +102,23 @@ timeout_error (void *cls) */ static void * notify_connect (void *cls, - const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *mq) + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peers connected, shutting down.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peers connected, shutting down.\n"); ok = 0; if (NULL != timeout_task) { GNUNET_SCHEDULER_cancel (timeout_task); timeout_task = NULL; } - GNUNET_SCHEDULER_add_now (&clean_up, - NULL); + GNUNET_SCHEDULER_add_now (&clean_up, NULL); return NULL; } static void -process_hello (void *cls, - const struct GNUNET_MessageHeader *message) +process_hello (void *cls, const struct GNUNET_MessageHeader *message) { struct PeerContext *p = cls; @@ -134,39 +130,35 @@ process_hello (void *cls, static void -setup_peer (struct PeerContext *p, - const char *cfgname) +setup_peer (struct PeerContext *p, const char *cfgname) { char *binary; binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-arm"); p->cfg = GNUNET_CONFIGURATION_create (); - p->arm_proc = - GNUNET_OS_start_process (GNUNET_YES, - GNUNET_OS_INHERIT_STD_OUT_AND_ERR, - NULL, - NULL, - NULL, - binary, - "gnunet-service-arm", - "-c", - cfgname, - NULL); - GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_load (p->cfg, - cfgname)); + p->arm_proc = GNUNET_OS_start_process (GNUNET_YES, + GNUNET_OS_INHERIT_STD_OUT_AND_ERR, + NULL, + NULL, + NULL, + binary, + "gnunet-service-arm", + "-c", + cfgname, + NULL); + GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_load (p->cfg, cfgname)); p->th = GNUNET_TRANSPORT_core_connect (p->cfg, - NULL, - NULL, - p, - ¬ify_connect, - NULL, - NULL); + NULL, + NULL, + p, + ¬ify_connect, + NULL, + NULL); GNUNET_assert (NULL != p->th); p->ghh = GNUNET_TRANSPORT_hello_get (p->cfg, - GNUNET_TRANSPORT_AC_ANY, - &process_hello, - p); + GNUNET_TRANSPORT_AC_ANY, + &process_hello, + p); GNUNET_free (binary); } @@ -176,18 +168,13 @@ waitpid_task (void *cls) { struct PeerContext *p = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Killing ARM process.\n"); + if (0 != GNUNET_OS_process_kill (p->arm_proc, GNUNET_TERM_SIG)) + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); + if (GNUNET_OK != GNUNET_OS_process_wait (p->arm_proc)) + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "waitpid"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Killing ARM process.\n"); - if (0 != GNUNET_OS_process_kill (p->arm_proc, - GNUNET_TERM_SIG)) - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, - "kill"); - if (GNUNET_OK != - GNUNET_OS_process_wait (p->arm_proc)) - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, - "waitpid"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "ARM process %u stopped\n", + "ARM process %u stopped\n", GNUNET_OS_process_get_pid (p->arm_proc)); GNUNET_OS_process_destroy (p->arm_proc); p->arm_proc = NULL; @@ -198,11 +185,8 @@ waitpid_task (void *cls) static void stop_arm (struct PeerContext *p) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking ARM to stop core service\n"); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, - &waitpid_task, - p); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking ARM to stop core service\n"); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &waitpid_task, p); } @@ -225,36 +209,28 @@ run (void *cls, { GNUNET_assert (ok == 1); ok++; - timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, - &timeout_error, - NULL); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - NULL); - setup_peer (&p1, - "test_gnunet_daemon_hostlist_peer1.conf"); - setup_peer (&p2, - "test_gnunet_daemon_hostlist_peer2.conf"); + timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &timeout_error, NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); + setup_peer (&p1, "test_gnunet_daemon_hostlist_peer1.conf"); + setup_peer (&p2, "test_gnunet_daemon_hostlist_peer2.conf"); } static int check () { - char *const argv[] = { - "test-gnunet-daemon-hostlist", - "-c", "test_gnunet_daemon_hostlist_data.conf", - NULL - }; - struct GNUNET_GETOPT_CommandLineOption options[] = { - GNUNET_GETOPT_OPTION_END - }; + char *const argv[] = {"test-gnunet-daemon-hostlist", + "-c", + "test_gnunet_daemon_hostlist_data.conf", + NULL}; + struct GNUNET_GETOPT_CommandLineOption options[] = {GNUNET_GETOPT_OPTION_END}; ok = 1; GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, - argv, + argv, "test-gnunet-daemon-hostlist", - "nohelp", - options, - &run, + "nohelp", + options, + &run, &ok); return ok; } @@ -271,9 +247,7 @@ main (int argc, char *argv[]) "GNUNET_TEST_HOME"); GNUNET_DISK_purge_cfg_dir ("test_gnunet_daemon_hostlist_data.conf", "GNUNET_TEST_HOME"); - GNUNET_log_setup ("test-gnunet-daemon-hostlist", - "WARNING", - NULL); + GNUNET_log_setup ("test-gnunet-daemon-hostlist", "WARNING", NULL); ret = check (); GNUNET_DISK_purge_cfg_dir ("test_gnunet_daemon_hostlist_peer1.conf", "GNUNET_TEST_HOME"); diff --git a/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c b/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c index 5dc116ccd..28b2db0de 100644 --- a/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c +++ b/src/hostlist/test_gnunet_daemon_hostlist_reconnect.c @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -26,7 +26,7 @@ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_arm_service.h" -#include "gnunet_transport_core_service.h" +#include "gnunet_transport_service.h" #include "gnunet_transport_hello_service.h" /** @@ -76,11 +76,10 @@ timeout_error (void *cls) */ static void * notify_connect (void *cls, - const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *mq) + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peers connected, shutting down.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peers connected, shutting down.\n"); ok = 0; GNUNET_SCHEDULER_shutdown (); return NULL; @@ -88,8 +87,7 @@ notify_connect (void *cls, static void -process_hello (void *cls, - const struct GNUNET_MessageHeader *message) +process_hello (void *cls, const struct GNUNET_MessageHeader *message) { struct PeerContext *p = cls; @@ -101,39 +99,35 @@ process_hello (void *cls, static void -setup_peer (struct PeerContext *p, - const char *cfgname) +setup_peer (struct PeerContext *p, const char *cfgname) { char *binary; binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-arm"); p->cfg = GNUNET_CONFIGURATION_create (); - p->arm_proc = - GNUNET_OS_start_process (GNUNET_YES, - GNUNET_OS_INHERIT_STD_OUT_AND_ERR, - NULL, - NULL, - NULL, - binary, - "gnunet-service-arm", - "-c", - cfgname, - NULL); - GNUNET_assert (GNUNET_OK == - GNUNET_CONFIGURATION_load (p->cfg, - cfgname)); + p->arm_proc = GNUNET_OS_start_process (GNUNET_YES, + GNUNET_OS_INHERIT_STD_OUT_AND_ERR, + NULL, + NULL, + NULL, + binary, + "gnunet-service-arm", + "-c", + cfgname, + NULL); + GNUNET_assert (GNUNET_OK == GNUNET_CONFIGURATION_load (p->cfg, cfgname)); p->th = GNUNET_TRANSPORT_core_connect (p->cfg, - NULL, - NULL, - p, - ¬ify_connect, - NULL, - NULL); + NULL, + NULL, + p, + ¬ify_connect, + NULL, + NULL); GNUNET_assert (NULL != p->th); p->ghh = GNUNET_TRANSPORT_hello_get (p->cfg, - GNUNET_TRANSPORT_AC_ANY, - &process_hello, - p); + GNUNET_TRANSPORT_AC_ANY, + &process_hello, + p); GNUNET_free (binary); } @@ -143,18 +137,13 @@ waitpid_task (void *cls) { struct PeerContext *p = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Killing ARM process.\n"); + if (0 != GNUNET_OS_process_kill (p->arm_proc, GNUNET_TERM_SIG)) + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); + if (GNUNET_OK != GNUNET_OS_process_wait (p->arm_proc)) + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "waitpid"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Killing ARM process.\n"); - if (0 != GNUNET_OS_process_kill (p->arm_proc, - GNUNET_TERM_SIG)) - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, - "kill"); - if (GNUNET_OK != - GNUNET_OS_process_wait (p->arm_proc)) - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, - "waitpid"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "ARM process %u stopped\n", + "ARM process %u stopped\n", GNUNET_OS_process_get_pid (p->arm_proc)); GNUNET_OS_process_destroy (p->arm_proc); p->arm_proc = NULL; @@ -165,11 +154,8 @@ waitpid_task (void *cls) static void stop_arm (struct PeerContext *p) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking ARM to stop core service\n"); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, - &waitpid_task, - p); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking ARM to stop core service\n"); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &waitpid_task, p); } @@ -217,30 +203,22 @@ run (void *cls, { GNUNET_assert (ok == 1); ok++; - timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, - &timeout_error, - NULL); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - NULL); - setup_peer (&p1, - "test_gnunet_daemon_hostlist_peer1.conf"); - setup_peer (&p2, - "test_gnunet_daemon_hostlist_peer2.conf"); + timeout_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &timeout_error, NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); + setup_peer (&p1, "test_gnunet_daemon_hostlist_peer1.conf"); + setup_peer (&p2, "test_gnunet_daemon_hostlist_peer2.conf"); } int -main (int argcx, - char *argvx[]) +main (int argcx, char *argvx[]) { - static char *const argv[] = { - "test-gnunet-daemon-hostlist", - "-c", "test_gnunet_daemon_hostlist_data.conf", - NULL - }; + static char *const argv[] = {"test-gnunet-daemon-hostlist", + "-c", + "test_gnunet_daemon_hostlist_data.conf", + NULL}; static struct GNUNET_GETOPT_CommandLineOption options[] = { - GNUNET_GETOPT_OPTION_END - }; + GNUNET_GETOPT_OPTION_END}; GNUNET_DISK_purge_cfg_dir ("test_gnunet_daemon_hostlist_peer1.conf", "GNUNET_TEST_HOME"); @@ -248,32 +226,28 @@ main (int argcx, "GNUNET_TEST_HOME"); GNUNET_DISK_purge_cfg_dir ("test_gnunet_daemon_hostlist_data.conf", "GNUNET_TEST_HOME"); - GNUNET_log_setup ("test-gnunet-daemon-hostlist", - "WARNING", - NULL); + GNUNET_log_setup ("test-gnunet-daemon-hostlist", "WARNING", NULL); ok = 1; GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, - argv, + argv, "test-gnunet-daemon-hostlist", - "nohelp", - options, - &run, + "nohelp", + options, + &run, &ok); if (0 == ok) { - FPRINTF (stderr, "%s", "."); + FPRINTF (stderr, "%s", "."); /* now do it again */ ok = 1; GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1, - argv, - "test-gnunet-daemon-hostlist", - "nohelp", - options, - &run, - &ok); - FPRINTF (stderr, - "%s", - ".\n"); + argv, + "test-gnunet-daemon-hostlist", + "nohelp", + options, + &run, + &ok); + FPRINTF (stderr, "%s", ".\n"); } GNUNET_DISK_purge_cfg_dir ("test_gnunet_daemon_hostlist_peer1.conf", "GNUNET_TEST_HOME"); diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 27a7034b0..7a089ad65 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -1089,9 +1089,17 @@ extern "C" { /** * Message telling transport to limit its receive rate. + * (FIXME: was the above comment ever accurate?) + * + * Note: dead in TNG, replaced by RECV_OK! */ #define GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA 366 +/** + * Message telling transport to limit its receive rate. + */ +#define GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK 366 + /** * Request to look addresses of peers in server. */ diff --git a/src/include/gnunet_transport_communication_service.h b/src/include/gnunet_transport_communication_service.h index ca5a86074..ea1ff732e 100644 --- a/src/include/gnunet_transport_communication_service.h +++ b/src/include/gnunet_transport_communication_service.h @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2018 GNUnet e.V. + Copyright (C) 2009-2019 GNUnet e.V. GNUnet is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published @@ -36,9 +36,8 @@ #define GNUNET_TRANSPORT_COMMUNICATION_SERVICE_H #ifdef __cplusplus -extern "C" -{ -#if 0 /* keep Emacsens' auto-indent happy */ +extern "C" { +#if 0 /* keep Emacsens' auto-indent happy */ } #endif #endif @@ -67,12 +66,13 @@ extern "C" * @param peer identity of the other peer * @param address where to send the message, human-readable * communicator-specific format, 0-terminated, UTF-8 - * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is invalid + * @return #GNUNET_OK on success, #GNUNET_SYSERR if the provided address is + * invalid */ -typedef int -(*GNUNET_TRANSPORT_CommunicatorMqInit) (void *cls, - const struct GNUNET_PeerIdentity *peer, - const char *address); +typedef int (*GNUNET_TRANSPORT_CommunicatorMqInit) ( + void *cls, + const struct GNUNET_PeerIdentity *peer, + const char *address); /** @@ -87,7 +87,8 @@ struct GNUNET_TRANSPORT_CommunicatorHandle; * FIXME: may want to distinguish bi-directional as well, * should we define a bit for that? Needed in DV logic (handle_dv_learn)! */ -enum GNUNET_TRANSPORT_CommunicatorCharacteristics { +enum GNUNET_TRANSPORT_CommunicatorCharacteristics +{ /** * Characteristics are unknown (i.e. DV). @@ -122,10 +123,10 @@ enum GNUNET_TRANSPORT_CommunicatorCharacteristics { * @param sender which peer sent the notification * @param msg payload */ -typedef void -(*GNUNET_TRANSPORT_CommunicatorNotify) (void *cls, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_MessageHeader *msg); +typedef void (*GNUNET_TRANSPORT_CommunicatorNotify) ( + void *cls, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *msg); /** @@ -145,14 +146,15 @@ typedef void * @return NULL on error */ struct GNUNET_TRANSPORT_CommunicatorHandle * -GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, - const char *config_section_name, - const char *addr_prefix, - enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc, - GNUNET_TRANSPORT_CommunicatorMqInit mq_init, - void *mq_init_cls, - GNUNET_TRANSPORT_CommunicatorNotify notify_cb, - void *notify_cb_cls); +GNUNET_TRANSPORT_communicator_connect ( + const struct GNUNET_CONFIGURATION_Handle *cfg, + const char *config_section_name, + const char *addr_prefix, + enum GNUNET_TRANSPORT_CommunicatorCharacteristics cc, + GNUNET_TRANSPORT_CommunicatorMqInit mq_init, + void *mq_init_cls, + GNUNET_TRANSPORT_CommunicatorNotify notify_cb, + void *notify_cb_cls); /** @@ -161,22 +163,23 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle * @param ch handle returned from connect */ void -GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch); +GNUNET_TRANSPORT_communicator_disconnect ( + struct GNUNET_TRANSPORT_CommunicatorHandle *ch); /* ************************* Receiving *************************** */ /** * Function called to notify communicator that we have received - * and processed the message. + * and processed the message. Used for flow control (if supported + * by the communicator). * * @param cls closure * @param success #GNUNET_SYSERR on failure (try to disconnect/reset connection) * #GNUNET_OK on success */ -typedef void -(*GNUNET_TRANSPORT_MessageCompletedCallback) (void *cls, - int success); +typedef void (*GNUNET_TRANSPORT_MessageCompletedCallback) (void *cls, + int success); /** @@ -200,12 +203,13 @@ typedef void * the tranport service is not yet up */ int -GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_MessageHeader *msg, - struct GNUNET_TIME_Relative expected_addr_validity, - GNUNET_TRANSPORT_MessageCompletedCallback cb, - void *cb_cls); +GNUNET_TRANSPORT_communicator_receive ( + struct GNUNET_TRANSPORT_CommunicatorHandle *handle, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *msg, + struct GNUNET_TIME_Relative expected_addr_validity, + GNUNET_TRANSPORT_MessageCompletedCallback cb, + void *cb_cls); /* ************************* Discovery *************************** */ @@ -220,7 +224,8 @@ struct GNUNET_TRANSPORT_QueueHandle; /** * Possible states of a connection. */ -enum GNUNET_TRANSPORT_ConnectionStatus { +enum GNUNET_TRANSPORT_ConnectionStatus +{ /** * Connection is down. @@ -255,13 +260,14 @@ enum GNUNET_TRANSPORT_ConnectionStatus { * @return API handle identifying the new MQ */ struct GNUNET_TRANSPORT_QueueHandle * -GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, - const struct GNUNET_PeerIdentity *peer, - const char *address, - uint32_t mtu, - enum GNUNET_NetworkType nt, - enum GNUNET_TRANSPORT_ConnectionStatus cs, - struct GNUNET_MQ_Handle *mq); +GNUNET_TRANSPORT_communicator_mq_add ( + struct GNUNET_TRANSPORT_CommunicatorHandle *ch, + const struct GNUNET_PeerIdentity *peer, + const char *address, + uint32_t mtu, + enum GNUNET_NetworkType nt, + enum GNUNET_TRANSPORT_ConnectionStatus cs, + struct GNUNET_MQ_Handle *mq); /** @@ -291,10 +297,11 @@ struct GNUNET_TRANSPORT_AddressIdentifier; * @param expiration when does the communicator forsee this address expiring? */ struct GNUNET_TRANSPORT_AddressIdentifier * -GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, - const char *address, - enum GNUNET_NetworkType nt, - struct GNUNET_TIME_Relative expiration); +GNUNET_TRANSPORT_communicator_address_add ( + struct GNUNET_TRANSPORT_CommunicatorHandle *ch, + const char *address, + enum GNUNET_NetworkType nt, + struct GNUNET_TIME_Relative expiration); /** @@ -304,7 +311,8 @@ GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorH * @param ai address that is no longer provided */ void -GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIdentifier *ai); +GNUNET_TRANSPORT_communicator_address_remove ( + struct GNUNET_TRANSPORT_AddressIdentifier *ai); /** @@ -326,13 +334,14 @@ GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIde * notify-API to @a pid's communicator @a comm */ void -GNUNET_TRANSPORT_communicator_notify (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, - const struct GNUNET_PeerIdentity *pid, - const char *comm, - const struct GNUNET_MessageHeader *header); +GNUNET_TRANSPORT_communicator_notify ( + struct GNUNET_TRANSPORT_CommunicatorHandle *ch, + const struct GNUNET_PeerIdentity *pid, + const char *comm, + const struct GNUNET_MessageHeader *header); -#if 0 /* keep Emacsens' auto-indent happy */ +#if 0 /* keep Emacsens' auto-indent happy */ { #endif #ifdef __cplusplus @@ -342,6 +351,6 @@ GNUNET_TRANSPORT_communicator_notify (struct GNUNET_TRANSPORT_CommunicatorHandle /* ifndef GNUNET_TRANSPORT_COMMUNICATOR_SERVICE_H */ #endif -/** @} */ /* end of group */ +/** @} */ /* end of group */ /* end of gnunet_transport_communicator_service.h */ diff --git a/src/include/gnunet_transport_core_service.h b/src/include/gnunet_transport_core_service.h index f442b53c0..076514779 100644 --- a/src/include/gnunet_transport_core_service.h +++ b/src/include/gnunet_transport_core_service.h @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2016 GNUnet e.V. + Copyright (C) 2009-2019 GNUnet e.V. GNUnet is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -21,7 +21,7 @@ * @author Christian Grothoff * * @file - * API of the transport service towards the CORE service. + * API of the transport service towards the CORE service (TNG version) * * @defgroup transport TRANSPORT service * Communication with other peers @@ -34,9 +34,8 @@ #define GNUNET_TRANSPORT_CORE_SERVICE_H #ifdef __cplusplus -extern "C" -{ -#if 0 /* keep Emacsens' auto-indent happy */ +extern "C" { +#if 0 /* keep Emacsens' auto-indent happy */ } #endif #endif @@ -62,15 +61,15 @@ struct GNUNET_TRANSPORT_CoreHandle; * @param cls closure * @param peer the identity of the peer that connected; this * pointer will remain valid until the disconnect, hence - * applications do not necessarily have to make a copy + * applications do not necessarily have to make a copy * of the value if they only need it until disconnect * @param mq message queue to use to transmit to @a peer * @return closure to use in MQ handlers */ -typedef void * -(*GNUNET_TRANSPORT_NotifyConnect) (void *cls, - const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *mq); +typedef void *(*GNUNET_TRANSPORT_NotifyConnect) ( + void *cls, + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq); /** @@ -84,33 +83,10 @@ typedef void * * @param handlers_cls closure of the handlers, was returned from the * connect notification callback */ -typedef void -(*GNUNET_TRANSPORT_NotifyDisconnect) (void *cls, - const struct GNUNET_PeerIdentity *peer, - void *handler_cls); - - -/** - * Function called if we have "excess" bandwidth to a peer. - * The notification will happen the first time we have excess - * bandwidth, and then only again after the client has performed - * some transmission to the peer. - * - * Excess bandwidth is defined as being allowed (by ATS) to send - * more data, and us reaching the limit of the capacity build-up - * (which, if we go past it, means we don't use available bandwidth). - * See also the "max carry" in `struct GNUNET_BANDWIDTH_Tracker`. - * - * @param cls the closure - * @param neighbour peer that we have excess bandwidth to - * @param handlers_cls closure of the handlers, was returned from the - * connect notification callback - */ -typedef void -(*GNUNET_TRANSPORT_NotifyExcessBandwidth)(void *cls, - const struct GNUNET_PeerIdentity *neighbour, - void *handlers_cls); - +typedef void (*GNUNET_TRANSPORT_NotifyDisconnect) ( + void *cls, + const struct GNUNET_PeerIdentity *peer, + void *handler_cls); /** @@ -136,8 +112,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_MQ_MessageHandler *handlers, void *cls, GNUNET_TRANSPORT_NotifyConnect nc, - GNUNET_TRANSPORT_NotifyDisconnect nd, - GNUNET_TRANSPORT_NotifyExcessBandwidth neb); + GNUNET_TRANSPORT_NotifyDisconnect nd); /** @@ -149,8 +124,34 @@ void GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle); +/** + * Notification from the CORE service to the TRANSPORT service + * that the CORE service has finished processing a message from + * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect()) + * and that it is thus now OK for TRANSPORT to send more messages + * for @a pid. + * + * Used to provide flow control, this is our equivalent to + * #GNUNET_SERVICE_client_continue() of an ordinary service. + * + * Note that due to the use of a window, TRANSPORT may send multiple + * messages destined for the same peer even without an intermediate + * call to this function. However, CORE must still call this function + * once per message received, as otherwise eventually the window will + * be full and TRANSPORT will stop providing messages to CORE for @a + * pid. + * + * @param ch core handle + * @param pid which peer was the message from that was fully processed by CORE + */ +void +GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch, + const struct GNUNET_PeerIdentity *pid); + + /** * Checks if a given peer is connected to us and get the message queue. + * Convenience function. * * @param handle connection to transport service * @param peer the peer to check @@ -161,7 +162,7 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, const struct GNUNET_PeerIdentity *peer); -#if 0 /* keep Emacsens' auto-indent happy */ +#if 0 /* keep Emacsens' auto-indent happy */ { #endif #ifdef __cplusplus @@ -171,6 +172,6 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, /* ifndef GNUNET_TRANSPORT_CORE_SERVICE_H */ #endif -/** @} */ /* end of group */ +/** @} */ /* end of group */ /* end of gnunet_transport_core_service.h */ diff --git a/src/include/gnunet_transport_service.h b/src/include/gnunet_transport_service.h index c5cb10ad8..80949b417 100644 --- a/src/include/gnunet_transport_service.h +++ b/src/include/gnunet_transport_service.h @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -36,9 +36,8 @@ #define GNUNET_TRANSPORT_SERVICE_H #ifdef __cplusplus -extern "C" -{ -#if 0 /* keep Emacsens' auto-indent happy */ +extern "C" { +#if 0 /* keep Emacsens' auto-indent happy */ } #endif #endif @@ -71,8 +70,8 @@ struct GNUNET_TRANSPORT_OfferHelloHandle; * tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail * tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success * @param cont_cls closure for @a cont - * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure, - * in case of failure @a cont will not be called + * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on + * failure, in case of failure @a cont will not be called * */ struct GNUNET_TRANSPORT_OfferHelloHandle * @@ -88,7 +87,8 @@ GNUNET_TRANSPORT_offer_hello (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param ohh the `struct GNUNET_TRANSPORT_OfferHelloHandle` to cancel */ void -GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh); +GNUNET_TRANSPORT_offer_hello_cancel ( + struct GNUNET_TRANSPORT_OfferHelloHandle *ohh); /* *********************** Address to String ******************* */ @@ -115,10 +115,9 @@ struct GNUNET_TRANSPORT_AddressToStringContext; * if #GNUNET_NO: address was invalid (or not supported) * if #GNUNET_SYSERR: communication error (IPC error) */ -typedef void -(*GNUNET_TRANSPORT_AddressToStringCallback) (void *cls, - const char *address, - int res); +typedef void (*GNUNET_TRANSPORT_AddressToStringCallback) (void *cls, + const char *address, + int res); /** @@ -134,12 +133,13 @@ typedef void * @return handle to cancel the operation, NULL on error */ struct GNUNET_TRANSPORT_AddressToStringContext * -GNUNET_TRANSPORT_address_to_string (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_HELLO_Address *address, - int numeric, - struct GNUNET_TIME_Relative timeout, - GNUNET_TRANSPORT_AddressToStringCallback aluc, - void *aluc_cls); +GNUNET_TRANSPORT_address_to_string ( + const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_HELLO_Address *address, + int numeric, + struct GNUNET_TIME_Relative timeout, + GNUNET_TRANSPORT_AddressToStringCallback aluc, + void *aluc_cls); /** @@ -148,14 +148,16 @@ GNUNET_TRANSPORT_address_to_string (const struct GNUNET_CONFIGURATION_Handle *cf * @param alc the context handle */ void -GNUNET_TRANSPORT_address_to_string_cancel (struct GNUNET_TRANSPORT_AddressToStringContext *alc); +GNUNET_TRANSPORT_address_to_string_cancel ( + struct GNUNET_TRANSPORT_AddressToStringContext *alc); /* *********************** Monitoring ************************** */ /** - * Possible state of a neighbour. Initially, we are #GNUNET_TRANSPORT_PS_NOT_CONNECTED. + * Possible state of a neighbour. Initially, we are + * #GNUNET_TRANSPORT_PS_NOT_CONNECTED. * * Then, there are two main paths. If we receive a SYN message, we give * the inbound address to ATS. After the check we ask ATS for a suggestion @@ -174,14 +176,14 @@ GNUNET_TRANSPORT_address_to_string_cancel (struct GNUNET_TRANSPORT_AddressToStri * #GNUNET_TRANSPORT_PS_DISCONNECT. * * If the session is in trouble (i.e. transport-level disconnect or - * timeout), we go to #GNUNET_TRANSPORT_PS_RECONNECT_ATS where we ask ATS for a new - * address (we don't notify anyone about the disconnect yet). Once we - * have a new address, we enter #GNUNET_TRANSPORT_PS_RECONNECT_SENT and send a - * SYN message. If we receive a - * SYN_ACK, we go to #GNUNET_TRANSPORT_PS_CONNECTED and nobody noticed that we had - * trouble; we also send a ACK at this time just in case. If - * the operation times out, we go to #GNUNET_TRANSPORT_PS_DISCONNECT (and notify everyone - * about the lost connection). + * timeout), we go to #GNUNET_TRANSPORT_PS_RECONNECT_ATS where we ask ATS for a + * new address (we don't notify anyone about the disconnect yet). Once we have + * a new address, we enter #GNUNET_TRANSPORT_PS_RECONNECT_SENT and send a SYN + * message. If we receive a SYN_ACK, we go to #GNUNET_TRANSPORT_PS_CONNECTED + * and nobody noticed that we had trouble; we also send a ACK at this time just + * in case. If the operation times out, we go to + * #GNUNET_TRANSPORT_PS_DISCONNECT (and notify everyone about the lost + * connection). * * If ATS decides to switch addresses while we have a normal * connection, we go to #GNUNET_TRANSPORT_PS_CONNECTED_SWITCHING_SYN_SENT @@ -189,13 +191,14 @@ GNUNET_TRANSPORT_address_to_string_cancel (struct GNUNET_TRANSPORT_AddressToStri * primary connection to the suggested alternative from ATS, go back * to #GNUNET_TRANSPORT_PS_CONNECTED and send a ACK to the other peer just to be * sure. If the operation times out - * we go to #GNUNET_TRANSPORT_PS_CONNECTED (and notify ATS that the given alternative - * address is "invalid"). + * we go to #GNUNET_TRANSPORT_PS_CONNECTED (and notify ATS that the given + * alternative address is "invalid"). * - * Once a session is in #GNUNET_TRANSPORT_PS_DISCONNECT, it is cleaned up and then goes - * to (#GNUNET_TRANSPORT_PS_DISCONNECT_FINISHED). If we receive an explicit disconnect - * request, we can go from any state to #GNUNET_TRANSPORT_PS_DISCONNECT, possibly after - * generating disconnect notifications. + * Once a session is in #GNUNET_TRANSPORT_PS_DISCONNECT, it is cleaned up and + * then goes to (#GNUNET_TRANSPORT_PS_DISCONNECT_FINISHED). If we receive an + * explicit disconnect request, we can go from any state to + * #GNUNET_TRANSPORT_PS_DISCONNECT, possibly after generating disconnect + * notifications. * * Note that it is quite possible that while we are in any of these * states, we could receive a 'SYN' request from the other peer. @@ -323,12 +326,12 @@ struct GNUNET_TRANSPORT_PeerMonitoringContext; * @param state current state this peer is in * @param state_timeout timeout for the current state of the peer */ -typedef void -(*GNUNET_TRANSPORT_PeerIterateCallback) (void *cls, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HELLO_Address *address, - enum GNUNET_TRANSPORT_PeerState state, - struct GNUNET_TIME_Absolute state_timeout); +typedef void (*GNUNET_TRANSPORT_PeerIterateCallback) ( + void *cls, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_HELLO_Address *address, + enum GNUNET_TRANSPORT_PeerState state, + struct GNUNET_TIME_Absolute state_timeout); /** @@ -352,17 +355,18 @@ typedef void * @param cfg configuration to use * @param peer a specific peer identity to obtain information for, * NULL for all peers - * @param one_shot #GNUNET_YES to return the current state and then end (with NULL+NULL), - * #GNUNET_NO to monitor peers continuously + * @param one_shot #GNUNET_YES to return the current state and then end (with + * NULL+NULL), #GNUNET_NO to monitor peers continuously * @param peer_callback function to call with the results * @param peer_callback_cls closure for @a peer_callback */ struct GNUNET_TRANSPORT_PeerMonitoringContext * -GNUNET_TRANSPORT_monitor_peers (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_PeerIdentity *peer, - int one_shot, - GNUNET_TRANSPORT_PeerIterateCallback peer_callback, - void *peer_callback_cls); +GNUNET_TRANSPORT_monitor_peers ( + const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_PeerIdentity *peer, + int one_shot, + GNUNET_TRANSPORT_PeerIterateCallback peer_callback, + void *peer_callback_cls); /** @@ -371,7 +375,8 @@ GNUNET_TRANSPORT_monitor_peers (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param pic handle for the request to cancel */ void -GNUNET_TRANSPORT_monitor_peers_cancel (struct GNUNET_TRANSPORT_PeerMonitoringContext *pic); +GNUNET_TRANSPORT_monitor_peers_cancel ( + struct GNUNET_TRANSPORT_PeerMonitoringContext *pic); /* *********************** Blacklisting ************************ */ @@ -389,9 +394,9 @@ struct GNUNET_TRANSPORT_Blacklist; * @param pid peer to approve or disapproave * @return #GNUNET_OK if the connection is allowed, #GNUNET_SYSERR if not */ -typedef int -(*GNUNET_TRANSPORT_BlacklistCallback) (void *cls, - const struct GNUNET_PeerIdentity *pid); +typedef int (*GNUNET_TRANSPORT_BlacklistCallback) ( + void *cls, + const struct GNUNET_PeerIdentity *pid); /** @@ -539,11 +544,11 @@ struct GNUNET_TRANSPORT_SessionInfo * NULL with @a session being non-NULL if the monitor * was being cancelled while sessions were active */ -typedef void -(*GNUNET_TRANSPORT_SessionMonitorCallback) (void *cls, - struct GNUNET_TRANSPORT_PluginSession *session, - void **session_ctx, - const struct GNUNET_TRANSPORT_SessionInfo *info); +typedef void (*GNUNET_TRANSPORT_SessionMonitorCallback) ( + void *cls, + struct GNUNET_TRANSPORT_PluginSession *session, + void **session_ctx, + const struct GNUNET_TRANSPORT_SessionInfo *info); /** @@ -569,11 +574,122 @@ GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param pm handle of the request that is to be cancelled */ void -GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm); +GNUNET_TRANSPORT_monitor_plugins_cancel ( + struct GNUNET_TRANSPORT_PluginMonitor *pm); + + +/** + * Opaque handle to the service. + */ +struct GNUNET_TRANSPORT_CoreHandle; + + +/** + * Function called to notify transport users that another + * peer connected to us. + * + * @param cls closure + * @param peer the identity of the peer that connected; this + * pointer will remain valid until the disconnect, hence + * applications do not necessarily have to make a copy + * of the value if they only need it until disconnect + * @param mq message queue to use to transmit to @a peer + * @return closure to use in MQ handlers + */ +typedef void *(*GNUNET_TRANSPORT_NotifyConnect) ( + void *cls, + const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Handle *mq); + + +/** + * Function called to notify transport users that another peer + * disconnected from us. The message queue that was given to the + * connect notification will be destroyed and must not be used + * henceforth. + * + * @param cls closure from #GNUNET_TRANSPORT_core_connect + * @param peer the peer that disconnected + * @param handlers_cls closure of the handlers, was returned from the + * connect notification callback + */ +typedef void (*GNUNET_TRANSPORT_NotifyDisconnect) ( + void *cls, + const struct GNUNET_PeerIdentity *peer, + void *handler_cls); + + +/** + * Function called if we have "excess" bandwidth to a peer. + * The notification will happen the first time we have excess + * bandwidth, and then only again after the client has performed + * some transmission to the peer. + * + * Excess bandwidth is defined as being allowed (by ATS) to send + * more data, and us reaching the limit of the capacity build-up + * (which, if we go past it, means we don't use available bandwidth). + * See also the "max carry" in `struct GNUNET_BANDWIDTH_Tracker`. + * + * @param cls the closure + * @param neighbour peer that we have excess bandwidth to + * @param handlers_cls closure of the handlers, was returned from the + * connect notification callback + */ +typedef void (*GNUNET_TRANSPORT_NotifyExcessBandwidth) ( + void *cls, + const struct GNUNET_PeerIdentity *neighbour, + void *handlers_cls); + + +/** + * Connect to the transport service. Note that the connection may + * complete (or fail) asynchronously. + * + * @param cfg configuration to use + * @param self our own identity (API should check that it matches + * the identity found by transport), or NULL (no check) + * @param handlers array of message handlers; note that the + * closures provided will be ignored and replaced + * with the respective return value from @a nc + * @param handlers array with handlers to call when we receive messages, or NULL + * @param cls closure for the @a nc, @a nd and @a neb callbacks + * @param nc function to call on connect events, or NULL + * @param nd function to call on disconnect events, or NULL + * @param neb function to call if we have excess bandwidth to a peer, or NULL + * @return NULL on error + */ +struct GNUNET_TRANSPORT_CoreHandle * +GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_PeerIdentity *self, + const struct GNUNET_MQ_MessageHandler *handlers, + void *cls, + GNUNET_TRANSPORT_NotifyConnect nc, + GNUNET_TRANSPORT_NotifyDisconnect nd, + GNUNET_TRANSPORT_NotifyExcessBandwidth neb); + + +/** + * Disconnect from the transport service. + * + * @param handle handle returned from connect + */ +void +GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle); +/** + * Checks if a given peer is connected to us and get the message queue. + * + * @param handle connection to transport service + * @param peer the peer to check + * @return NULL if disconnected, otherwise message queue for @a peer + */ +struct GNUNET_MQ_Handle * +GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, + const struct GNUNET_PeerIdentity *peer); + -#if 0 /* keep Emacsens' auto-indent happy */ +#if 0 /* keep Emacsens' auto-indent happy */ { #endif #ifdef __cplusplus @@ -583,6 +699,6 @@ GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor * /* ifndef GNUNET_TRANSPORT_SERVICE_H */ #endif -/** @} */ /* end of group */ +/** @} */ /* end of group */ /* end of gnunet_transport_service.h */ diff --git a/src/namestore/namestore_api_monitor.c b/src/namestore/namestore_api_monitor.c index 99e3864c9..2881a28ac 100644 --- a/src/namestore/namestore_api_monitor.c +++ b/src/namestore/namestore_api_monitor.c @@ -88,7 +88,6 @@ struct GNUNET_NAMESTORE_ZoneMonitor * Do we first iterate over all existing records? */ int iterate_first; - }; @@ -108,8 +107,7 @@ reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm); * @param msg the sync message */ static void -handle_sync (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_sync (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; @@ -128,10 +126,8 @@ handle_sync (void *cls, * @param lrm the message from the service. */ static int -check_result (void *cls, - const struct RecordResultMessage *lrm) +check_result (void *cls, const struct RecordResultMessage *lrm) { - static struct GNUNET_CRYPTO_EcdsaPrivateKey zero; struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; size_t lrm_len; size_t exp_lrm_len; @@ -142,9 +138,8 @@ check_result (void *cls, const char *rd_ser_tmp; (void) cls; - if ( (0 != GNUNET_memcmp (&lrm->private_key, - &zm->zone)) && - (0 != GNUNET_is_zero (&zm->zone)) ) + if ((0 != GNUNET_memcmp (&lrm->private_key, &zm->zone)) && + (0 != GNUNET_is_zero (&zm->zone))) { GNUNET_break (0); return GNUNET_SYSERR; @@ -170,7 +165,7 @@ check_result (void *cls, return GNUNET_SYSERR; } name_tmp = (const char *) &lrm[1]; - if (name_tmp[name_len -1] != '\0') + if (name_tmp[name_len - 1] != '\0') { GNUNET_break (0); return GNUNET_SYSERR; @@ -180,10 +175,7 @@ check_result (void *cls, struct GNUNET_GNSRECORD_Data rd[rd_count]; if (GNUNET_OK != - GNUNET_GNSRECORD_records_deserialize (rd_len, - rd_ser_tmp, - rd_count, - rd)) + GNUNET_GNSRECORD_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd)) { GNUNET_break (0); return GNUNET_SYSERR; @@ -201,8 +193,7 @@ check_result (void *cls, * @param lrm the message from the service. */ static void -handle_result (void *cls, - const struct RecordResultMessage *lrm) +handle_result (void *cls, const struct RecordResultMessage *lrm) { struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; size_t name_len; @@ -219,16 +210,10 @@ handle_result (void *cls, { struct GNUNET_GNSRECORD_Data rd[rd_count]; - GNUNET_assert (GNUNET_OK == - GNUNET_GNSRECORD_records_deserialize (rd_len, - rd_ser_tmp, - rd_count, - rd)); - zm->monitor (zm->monitor_cls, - &lrm->private_key, - name_tmp, - rd_count, - rd); + GNUNET_assert ( + GNUNET_OK == + GNUNET_GNSRECORD_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd)); + zm->monitor (zm->monitor_cls, &lrm->private_key, name_tmp, rd_count, rd); } } @@ -242,8 +227,7 @@ handle_result (void *cls, * @param error error code */ static void -mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) +mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; @@ -260,17 +244,16 @@ mq_error_handler (void *cls, static void reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm) { - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_fixed_size (sync, - GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC, - struct GNUNET_MessageHeader, - zm), - GNUNET_MQ_hd_var_size (result, - GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT, - struct RecordResultMessage, - zm), - GNUNET_MQ_handler_end () - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_fixed_size (sync, + GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC, + struct GNUNET_MessageHeader, + zm), + GNUNET_MQ_hd_var_size (result, + GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT, + struct RecordResultMessage, + zm), + GNUNET_MQ_handler_end ()}; struct GNUNET_MQ_Envelope *env; struct ZoneMonitorStartMessage *sm; @@ -286,12 +269,10 @@ reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm) zm); if (NULL == zm->mq) return; - env = GNUNET_MQ_msg (sm, - GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START); + env = GNUNET_MQ_msg (sm, GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START); sm->iterate_first = htonl (zm->iterate_first); sm->zone = zm->zone; - GNUNET_MQ_send (zm->mq, - env); + GNUNET_MQ_send (zm->mq, env); } @@ -304,7 +285,8 @@ reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm) * @param cfg configuration to use to connect to namestore * @param zone zone to monitor * @param iterate_first #GNUNET_YES to first iterate over all existing records, - * #GNUNET_NO to only return changes that happen from now on + * #GNUNET_NO to only return changes that happen from now + * on * @param error_cb function to call on error (i.e. disconnect); note that * unlike the other error callbacks in this API, a call to this * function does NOT destroy the monitor handle, it merely signals @@ -318,15 +300,16 @@ reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm) * @return handle to stop monitoring */ struct GNUNET_NAMESTORE_ZoneMonitor * -GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone, - int iterate_first, - GNUNET_SCHEDULER_TaskCallback error_cb, - void *error_cb_cls, - GNUNET_NAMESTORE_RecordMonitor monitor, - void *monitor_cls, - GNUNET_SCHEDULER_TaskCallback sync_cb, - void *sync_cb_cls) +GNUNET_NAMESTORE_zone_monitor_start ( + const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_CRYPTO_EcdsaPrivateKey *zone, + int iterate_first, + GNUNET_SCHEDULER_TaskCallback error_cb, + void *error_cb_cls, + GNUNET_NAMESTORE_RecordMonitor monitor, + void *monitor_cls, + GNUNET_SCHEDULER_TaskCallback sync_cb, + void *sync_cb_cls) { struct GNUNET_NAMESTORE_ZoneMonitor *zm; @@ -379,11 +362,9 @@ GNUNET_NAMESTORE_zone_monitor_next (struct GNUNET_NAMESTORE_ZoneMonitor *zm, struct GNUNET_MQ_Envelope *env; struct ZoneMonitorNextMessage *nm; - env = GNUNET_MQ_msg (nm, - GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT); + env = GNUNET_MQ_msg (nm, GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_NEXT); nm->limit = GNUNET_htonll (limit); - GNUNET_MQ_send (zm->mq, - env); + GNUNET_MQ_send (zm->mq, env); } diff --git a/src/testbed/gnunet-service-testbed_connectionpool.c b/src/testbed/gnunet-service-testbed_connectionpool.c index d8461f8e2..e173a2349 100644 --- a/src/testbed/gnunet-service-testbed_connectionpool.c +++ b/src/testbed/gnunet-service-testbed_connectionpool.c @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -27,7 +27,7 @@ #include "gnunet-service-testbed.h" #include "gnunet-service-testbed_connectionpool.h" #include "testbed_api_operations.h" -#include "gnunet_transport_core_service.h" +#include "gnunet_transport_service.h" /** * Redefine LOG with a changed log component string @@ -35,14 +35,14 @@ #ifdef LOG #undef LOG #endif -#define LOG(kind,...) \ +#define LOG(kind, ...) \ GNUNET_log_from (kind, "testbed-connectionpool", __VA_ARGS__) /** * Time to expire a cache entry */ -#define CACHE_EXPIRY \ +#define CACHE_EXPIRY \ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15) @@ -107,8 +107,8 @@ struct PooledConnection struct GNUNET_PeerIdentity *peer_identity; /** - * The configuration of the peer. Should be not NULL as long as the core_handle - * or transport_handle are valid + * The configuration of the peer. Should be not NULL as long as the + * core_handle or transport_handle are valid */ struct GNUNET_CONFIGURATION_Handle *cfg; @@ -137,12 +137,12 @@ struct PooledConnection /** * The task to expire this connection from the connection pool */ - struct GNUNET_SCHEDULER_Task * expire_task; + struct GNUNET_SCHEDULER_Task *expire_task; /** * The task to notify a waiting #GST_ConnectionPool_GetHandle object */ - struct GNUNET_SCHEDULER_Task * notify_task; + struct GNUNET_SCHEDULER_Task *notify_task; /** * Number of active requests using this pooled connection @@ -286,17 +286,16 @@ static void destroy_pooled_connection (struct PooledConnection *entry) { GNUNET_assert ((NULL == entry->head_notify) && (NULL == entry->tail_notify)); - GNUNET_assert ((NULL == entry->head_waiting) && (NULL == - entry->tail_waiting)); + GNUNET_assert ((NULL == entry->head_waiting) && + (NULL == entry->tail_waiting)); GNUNET_assert (0 == entry->demand); expire_task_cancel (entry); if (entry->in_lru) GNUNET_CONTAINER_DLL_remove (head_lru, tail_lru, entry); if (entry->in_pool) - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap32_remove (map, - entry->index, - entry)); + GNUNET_assert ( + GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_remove (map, entry->index, entry)); if (NULL != entry->notify_task) { GNUNET_SCHEDULER_cancel (entry->notify_task); @@ -370,12 +369,12 @@ static void add_to_lru (struct PooledConnection *entry) { GNUNET_assert (0 == entry->demand); - GNUNET_assert (!entry->in_lru); + GNUNET_assert (! entry->in_lru); GNUNET_CONTAINER_DLL_insert_tail (head_lru, tail_lru, entry); entry->in_lru = GNUNET_YES; GNUNET_assert (NULL == entry->expire_task); - entry->expire_task = GNUNET_SCHEDULER_add_delayed (CACHE_EXPIRY, - &expire, entry); + entry->expire_task = + GNUNET_SCHEDULER_add_delayed (CACHE_EXPIRY, &expire, entry); } @@ -404,7 +403,7 @@ search_waiting (const struct PooledConnection *entry, if (NULL == entry->handle_core) continue; if (NULL == entry->peer_identity) - continue; /* CORE connection isn't ready yet */ + continue; /* CORE connection isn't ready yet */ break; case GST_CONNECTIONPOOL_SERVICE_TRANSPORT: if (NULL == entry->handle_transport) @@ -425,7 +424,8 @@ search_waiting (const struct PooledConnection *entry, * A handle in the #PooledConnection object pointed by @a cls is ready and there * is a #GST_ConnectionPool_GetHandle object waiting in the waiting list. This * function retrieves that object and calls the handle ready callback. It - * further schedules itself if there are similar waiting objects which can be notified. + * further schedules itself if there are similar waiting objects which can be + * notified. * * @param cls the #PooledConnection object */ @@ -443,23 +443,18 @@ connection_ready (void *cls) gh_next = NULL; if (NULL != gh->next) gh_next = search_waiting (entry, gh->next); - GNUNET_CONTAINER_DLL_remove (entry->head_waiting, - entry->tail_waiting, - gh); + GNUNET_CONTAINER_DLL_remove (entry->head_waiting, entry->tail_waiting, gh); gh->connection_ready_called = 1; if (NULL != gh_next) - entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready, - entry); - if ( (NULL != gh->target) && - (NULL != gh->connect_notify_cb) ) + entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready, entry); + if ((NULL != gh->target) && (NULL != gh->connect_notify_cb)) { GNUNET_CONTAINER_DLL_insert_tail (entry->head_notify, entry->tail_notify, gh); gh->notify_waiting = 1; } - LOG_DEBUG ("Connection ready for handle type %u\n", - gh->service); + LOG_DEBUG ("Connection ready for handle type %u\n", gh->service); gh->cb (gh->cb_cls, entry->handle_core, entry->handle_transport, @@ -499,9 +494,7 @@ peer_connect_notify_cb (void *cls, gh = gh->next; continue; } - if (0 != memcmp (gh->target, - peer, - sizeof (struct GNUNET_PeerIdentity))) + if (0 != memcmp (gh->target, peer, sizeof (struct GNUNET_PeerIdentity))) { gh = gh->next; continue; @@ -532,13 +525,11 @@ peer_connect_notify_cb (void *cls, static void * transport_peer_connect_notify_cb (void *cls, const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *mq) + struct GNUNET_MQ_Handle *mq) { struct PooledConnection *entry = cls; - peer_connect_notify_cb (entry, - peer, - GST_CONNECTIONPOOL_SERVICE_TRANSPORT); + peer_connect_notify_cb (entry, peer, GST_CONNECTIONPOOL_SERVICE_TRANSPORT); return NULL; } @@ -555,16 +546,15 @@ opstart_get_handle_transport (void *cls) struct PooledConnection *entry = cls; GNUNET_assert (NULL != entry); - LOG_DEBUG ("Opening a transport connection to peer %u\n", - entry->index); + LOG_DEBUG ("Opening a transport connection to peer %u\n", entry->index); entry->handle_transport = - GNUNET_TRANSPORT_core_connect (entry->cfg, - NULL, - NULL, - entry, - &transport_peer_connect_notify_cb, - NULL, - NULL); + GNUNET_TRANSPORT_core_connect (entry->cfg, + NULL, + NULL, + entry, + &transport_peer_connect_notify_cb, + NULL, + NULL); if (NULL == entry->handle_transport) { GNUNET_break (0); @@ -610,14 +600,12 @@ oprelease_get_handle_transport (void *cls) */ static void * core_peer_connect_cb (void *cls, - const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_PeerIdentity *peer, struct GNUNET_MQ_Handle *mq) { struct PooledConnection *entry = cls; - peer_connect_notify_cb (entry, - peer, - GST_CONNECTIONPOOL_SERVICE_CORE); + peer_connect_notify_cb (entry, peer, GST_CONNECTIONPOOL_SERVICE_CORE); return (void *) peer; } @@ -635,8 +623,7 @@ core_peer_connect_cb (void *cls, * @param my_identity ID of this peer, NULL if we failed */ static void -core_startup_cb (void *cls, - const struct GNUNET_PeerIdentity *my_identity) +core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) { struct PooledConnection *entry = cls; @@ -672,15 +659,14 @@ opstart_get_handle_core (void *cls) struct PooledConnection *entry = cls; GNUNET_assert (NULL != entry); - LOG_DEBUG ("Opening a CORE connection to peer %u\n", - entry->index); - entry->handle_core - = GNUNET_CORE_connect (entry->cfg, - entry, /* closure */ - &core_startup_cb, /* core startup notify */ - &core_peer_connect_cb, /* peer connect notify */ - NULL, /* peer disconnect notify */ - NULL); + LOG_DEBUG ("Opening a CORE connection to peer %u\n", entry->index); + entry->handle_core = + GNUNET_CORE_connect (entry->cfg, + entry, /* closure */ + &core_startup_cb, /* core startup notify */ + &core_peer_connect_cb, /* peer connect notify */ + NULL, /* peer disconnect notify */ + NULL); } @@ -715,8 +701,7 @@ opstart_get_handle_ats_connectivity (void *cls) { struct PooledConnection *entry = cls; - entry->handle_ats_connectivity = - GNUNET_ATS_connectivity_init (entry->cfg); + entry->handle_ats_connectivity = GNUNET_ATS_connectivity_init (entry->cfg); } @@ -749,9 +734,7 @@ oprelease_get_handle_ats_connectivity (void *cls) * #GNUNET_NO if not. */ static int -cleanup_iterator (void *cls, - uint32_t key, - void *value) +cleanup_iterator (void *cls, uint32_t key, void *value) { struct PooledConnection *entry = value; @@ -789,10 +772,9 @@ GST_connection_pool_destroy () if (NULL != map) { - GNUNET_assert (GNUNET_SYSERR != - GNUNET_CONTAINER_multihashmap32_iterate (map, - &cleanup_iterator, - NULL)); + GNUNET_assert ( + GNUNET_SYSERR != + GNUNET_CONTAINER_multihashmap32_iterate (map, &cleanup_iterator, NULL)); GNUNET_CONTAINER_multihashmap32_destroy (map); map = NULL; } @@ -817,9 +799,9 @@ GST_connection_pool_destroy () * @note @a connect_notify_cb will not be called if @a target is * already connected @a service level. Use * GNUNET_TRANSPORT_check_peer_connected() or a similar function from the - * respective @a service's API to check if the target peer is already connected or - * not. @a connect_notify_cb will be called only once or never (in case @a target - * cannot be connected or is already connected). + * respective @a service's API to check if the target peer is already connected + * or not. @a connect_notify_cb will be called only once or never (in case @a + * target cannot be connected or is already connected). * * @param peer_id the index of the peer * @param cfg the configuration with which the transport handle has to be @@ -828,7 +810,8 @@ GST_connection_pool_destroy () * @param cb the callback to notify when the transport handle is available * @param cb_cls the closure for @a cb * @param target the peer identify of the peer whose connection to our TRANSPORT - * subsystem will be notified through the @a connect_notify_cb. Can be NULL + * subsystem will be notified through the @a connect_notify_cb. Can be + * NULL * @param connect_notify_cb the callback to call when the @a target peer is * connected. This callback will only be called once or never again (in * case the target peer cannot be connected). Can be NULL @@ -837,14 +820,15 @@ GST_connection_pool_destroy () * longer being used */ struct GST_ConnectionPool_GetHandle * -GST_connection_pool_get_handle (unsigned int peer_id, - const struct GNUNET_CONFIGURATION_Handle *cfg, - enum GST_ConnectionPool_Service service, - GST_connection_pool_connection_ready_cb cb, - void *cb_cls, - const struct GNUNET_PeerIdentity *target, - GST_connection_pool_peer_connect_notify connect_notify_cb, - void *connect_notify_cb_cls) +GST_connection_pool_get_handle ( + unsigned int peer_id, + const struct GNUNET_CONFIGURATION_Handle *cfg, + enum GST_ConnectionPool_Service service, + GST_connection_pool_connection_ready_cb cb, + void *cb_cls, + const struct GNUNET_PeerIdentity *target, + GST_connection_pool_peer_connect_notify connect_notify_cb, + void *connect_notify_cb_cls) { struct GST_ConnectionPool_GetHandle *gh; struct PooledConnection *entry; @@ -871,20 +855,17 @@ GST_connection_pool_get_handle (unsigned int peer_id, case GST_CONNECTIONPOOL_SERVICE_TRANSPORT: handle = entry->handle_transport; if (NULL != handle) - LOG_DEBUG ("Found TRANSPORT handle for peer %u\n", - entry->index); + LOG_DEBUG ("Found TRANSPORT handle for peer %u\n", entry->index); break; case GST_CONNECTIONPOOL_SERVICE_CORE: handle = entry->handle_core; if (NULL != handle) - LOG_DEBUG ("Found CORE handle for peer %u\n", - entry->index); + LOG_DEBUG ("Found CORE handle for peer %u\n", entry->index); break; case GST_CONNECTIONPOOL_SERVICE_ATS_CONNECTIVITY: handle = entry->handle_ats_connectivity; if (NULL != handle) - LOG_DEBUG ("Found ATS CONNECTIVITY handle for peer %u\n", - entry->index); + LOG_DEBUG ("Found ATS CONNECTIVITY handle for peer %u\n", entry->index); break; } } @@ -892,14 +873,15 @@ GST_connection_pool_get_handle (unsigned int peer_id, { entry = GNUNET_new (struct PooledConnection); entry->index = peer_id32; - if ((NULL != map) - && (GNUNET_CONTAINER_multihashmap32_size (map) < max_size)) + if ((NULL != map) && + (GNUNET_CONTAINER_multihashmap32_size (map) < max_size)) { GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap32_put (map, - entry->index, - entry, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); + GNUNET_CONTAINER_multihashmap32_put ( + map, + entry->index, + entry, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST)); entry->in_pool = GNUNET_YES; } else @@ -919,16 +901,14 @@ GST_connection_pool_get_handle (unsigned int peer_id, gh->connect_notify_cb = connect_notify_cb; gh->connect_notify_cb_cls = connect_notify_cb_cls; gh->service = service; - GNUNET_CONTAINER_DLL_insert (entry->head_waiting, - entry->tail_waiting, - gh); + GNUNET_CONTAINER_DLL_insert (entry->head_waiting, entry->tail_waiting, gh); if (NULL != handle) { if (NULL == entry->notify_task) { if (NULL != search_waiting (entry, entry->head_waiting)) - entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready, - entry); + entry->notify_task = + GNUNET_SCHEDULER_add_now (&connection_ready, entry); } return gh; } @@ -937,7 +917,7 @@ GST_connection_pool_get_handle (unsigned int peer_id, { case GST_CONNECTIONPOOL_SERVICE_TRANSPORT: if (NULL != entry->op_transport) - return gh; /* Operation pending */ + return gh; /* Operation pending */ op = GNUNET_TESTBED_operation_create_ (entry, &opstart_get_handle_transport, &oprelease_get_handle_transport); @@ -945,7 +925,7 @@ GST_connection_pool_get_handle (unsigned int peer_id, break; case GST_CONNECTIONPOOL_SERVICE_CORE: if (NULL != entry->op_core) - return gh; /* Operation pending */ + return gh; /* Operation pending */ op = GNUNET_TESTBED_operation_create_ (entry, &opstart_get_handle_core, &oprelease_get_handle_core); @@ -953,15 +933,15 @@ GST_connection_pool_get_handle (unsigned int peer_id, break; case GST_CONNECTIONPOOL_SERVICE_ATS_CONNECTIVITY: if (NULL != entry->op_ats_connectivity) - return gh; /* Operation pending */ - op = GNUNET_TESTBED_operation_create_ (entry, - &opstart_get_handle_ats_connectivity, - &oprelease_get_handle_ats_connectivity); + return gh; /* Operation pending */ + op = + GNUNET_TESTBED_operation_create_ (entry, + &opstart_get_handle_ats_connectivity, + &oprelease_get_handle_ats_connectivity); entry->op_ats_connectivity = op; break; } - GNUNET_TESTBED_operation_queue_insert_ (GST_opq_openfds, - op); + GNUNET_TESTBED_operation_queue_insert_ (GST_opq_openfds, op); GNUNET_TESTBED_operation_begin_wait_ (op); return gh; } @@ -973,9 +953,9 @@ GST_connection_pool_get_handle (unsigned int peer_id, * #GST_ConnectionPool_GetHandle objects, it is left in the connection pool. If * no other objects are using the connection and the connection pool is not full * then it is placed in a LRU queue. If the connection pool is full, then - * connections from the LRU queue are evicted and closed to create place for this - * connection. If the connection pool if full and the LRU queue is empty, then - * the connection is closed. + * connections from the LRU queue are evicted and closed to create place for + * this connection. If the connection pool if full and the LRU queue is empty, + * then the connection is closed. * * @param gh the handle */ @@ -989,14 +969,13 @@ GST_connection_pool_get_handle_done (struct GST_ConnectionPool_GetHandle *gh) entry = gh->entry; LOG_DEBUG ("Cleaning up get handle %p for service %u, peer %u\n", gh, - gh->service, entry->index); + gh->service, + entry->index); if (! gh->connection_ready_called) { - GNUNET_CONTAINER_DLL_remove (entry->head_waiting, - entry->tail_waiting, - gh); - if ( (NULL == search_waiting (entry, entry->head_waiting)) && - (NULL != entry->notify_task) ) + GNUNET_CONTAINER_DLL_remove (entry->head_waiting, entry->tail_waiting, gh); + if ((NULL == search_waiting (entry, entry->head_waiting)) && + (NULL != entry->notify_task)) { GNUNET_SCHEDULER_cancel (entry->notify_task); entry->notify_task = NULL; @@ -1004,22 +983,18 @@ GST_connection_pool_get_handle_done (struct GST_ConnectionPool_GetHandle *gh) } if (gh->notify_waiting) { - GNUNET_CONTAINER_DLL_remove (entry->head_notify, - entry->tail_notify, - gh); + GNUNET_CONTAINER_DLL_remove (entry->head_notify, entry->tail_notify, gh); gh->notify_waiting = 0; } GNUNET_free (gh); gh = NULL; GNUNET_assert (! entry->in_lru); if (! entry->in_pool) - GNUNET_CONTAINER_DLL_remove (head_not_pooled, - tail_not_pooled, - entry); + GNUNET_CONTAINER_DLL_remove (head_not_pooled, tail_not_pooled, entry); if (NULL != map) { - if (GNUNET_YES == GNUNET_CONTAINER_multihashmap32_contains (map, - entry->index)) + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap32_contains (map, entry->index)) goto unallocate; if (GNUNET_CONTAINER_multihashmap32_size (map) == max_size) { @@ -1028,14 +1003,15 @@ GST_connection_pool_get_handle_done (struct GST_ConnectionPool_GetHandle *gh) destroy_pooled_connection (head_lru); } GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap32_put (map, - entry->index, - entry, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_multihashmap32_put ( + map, + entry->index, + entry, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); entry->in_pool = GNUNET_YES; } - unallocate: +unallocate: GNUNET_assert (0 < entry->demand); entry->demand--; if (0 != entry->demand) diff --git a/src/testbed/gnunet-service-testbed_connectionpool.h b/src/testbed/gnunet-service-testbed_connectionpool.h index ca4ea22ce..558918c30 100644 --- a/src/testbed/gnunet-service-testbed_connectionpool.h +++ b/src/testbed/gnunet-service-testbed_connectionpool.h @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -25,7 +25,7 @@ */ #include "gnunet_ats_service.h" #include "gnunet_core_service.h" -#include "gnunet_transport_core_service.h" +#include "gnunet_transport_service.h" /** * The request handle for obtaining a pooled connection @@ -87,13 +87,13 @@ GST_connection_pool_destroy (void); * cases, its value being NULL means that CORE connection has failed. * @param cfg configuration of the peer */ -typedef void -(*GST_connection_pool_connection_ready_cb) (void *cls, - struct GNUNET_CORE_Handle *ch, - struct GNUNET_TRANSPORT_CoreHandle *th, - struct GNUNET_ATS_ConnectivityHandle *ac, - const struct GNUNET_PeerIdentity *peer_id, - const struct GNUNET_CONFIGURATION_Handle *cfg); +typedef void (*GST_connection_pool_connection_ready_cb) ( + void *cls, + struct GNUNET_CORE_Handle *ch, + struct GNUNET_TRANSPORT_CoreHandle *th, + struct GNUNET_ATS_ConnectivityHandle *ac, + const struct GNUNET_PeerIdentity *peer_id, + const struct GNUNET_CONFIGURATION_Handle *cfg); /** @@ -104,9 +104,9 @@ typedef void * callback * @param target the peer identity of the target peer */ -typedef void -(*GST_connection_pool_peer_connect_notify) (void *cls, - const struct GNUNET_PeerIdentity *target); +typedef void (*GST_connection_pool_peer_connect_notify) ( + void *cls, + const struct GNUNET_PeerIdentity *target); /** @@ -121,9 +121,9 @@ typedef void * @note @a connect_notify_cb will not be called if @a target is * already connected @a service level. Use * GNUNET_TRANSPORT_check_peer_connected() or a similar function from the - * respective @a service's API to check if the target peer is already connected or - * not. @a connect_notify_cb will be called only once or never (in case @a target - * cannot be connected or is already connected). + * respective @a service's API to check if the target peer is already connected + * or not. @a connect_notify_cb will be called only once or never (in case @a + * target cannot be connected or is already connected). * * @param peer_id the index of the peer * @param cfg the configuration with which the transport handle has to be @@ -132,7 +132,8 @@ typedef void * @param cb the callback to notify when the transport handle is available * @param cb_cls the closure for @a cb * @param target the peer identify of the peer whose connection to our TRANSPORT - * subsystem will be notified through the @a connect_notify_cb. Can be NULL + * subsystem will be notified through the @a connect_notify_cb. Can be + * NULL * @param connect_notify_cb the callback to call when the @a target peer is * connected. This callback will only be called once or never again (in * case the target peer cannot be connected). Can be NULL @@ -141,14 +142,15 @@ typedef void * longer being used */ struct GST_ConnectionPool_GetHandle * -GST_connection_pool_get_handle (unsigned int peer_id, - const struct GNUNET_CONFIGURATION_Handle *cfg, - enum GST_ConnectionPool_Service service, - GST_connection_pool_connection_ready_cb cb, - void *cb_cls, - const struct GNUNET_PeerIdentity *target, - GST_connection_pool_peer_connect_notify connect_notify_cb, - void *connect_notify_cb_cls); +GST_connection_pool_get_handle ( + unsigned int peer_id, + const struct GNUNET_CONFIGURATION_Handle *cfg, + enum GST_ConnectionPool_Service service, + GST_connection_pool_connection_ready_cb cb, + void *cb_cls, + const struct GNUNET_PeerIdentity *target, + GST_connection_pool_peer_connect_notify connect_notify_cb, + void *connect_notify_cb_cls); /** @@ -157,9 +159,9 @@ GST_connection_pool_get_handle (unsigned int peer_id, * #GST_ConnectionPool_GetHandle objects, it is left in the connection pool. If * no other objects are using the connection and the connection pool is not full * then it is placed in a LRU queue. If the connection pool is full, then - * connections from the LRU queue are evicted and closed to create place for this - * connection. If the connection pool if full and the LRU queue is empty, then - * the connection is closed. + * connections from the LRU queue are evicted and closed to create place for + * this connection. If the connection pool if full and the LRU queue is empty, + * then the connection is closed. * * @param gh the handle */ diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index c2922dd7e..825d45522 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -24,6 +24,11 @@ * * TODO: * Implement next: + * - complete flow control push back from CORE via TRANSPORT to communicators: + * + resume communicators in handle_client_recv_ok (see FIXME) + * + count transmissions to CORE and suspend communicators if window is full + * - check flow control push back from TRANSPROT to CORE: + * + check when to send ACKs * - change transport-core API to provide proper flow control in both * directions, allow multiple messages per peer simultaneously (tag * confirmations with unique message ID), and replace quota-out with @@ -113,6 +118,16 @@ */ #define MAX_DV_DISCOVERY_SELECTION 16 +/** + * Window size. How many messages to the same target do we pass + * to CORE without a RECV_OK in between? Small values limit + * thoughput, large values will increase latency. + * + * FIXME-OPTIMIZE: find out what good values are experimentally, + * maybe set adaptively (i.e. to observed available bandwidth). + */ +#define RECV_WINDOW_SIZE 4 + /** * Minimum number of hops we should forward DV learn messages * even if they are NOT useful for us in hope of looping @@ -1100,6 +1115,48 @@ struct PendingMessage; */ struct DistanceVectorHop; +/** + * A virtual link is another reachable peer that is known to CORE. It + * can be either a `struct Neighbour` with at least one confirmed + * `struct Queue`, or a `struct DistanceVector` with at least one + * confirmed `struct DistanceVectorHop`. With a virtual link we track + * data that is per neighbour that is not specific to how the + * connectivity is established. + */ +struct VirtualLink +{ + /** + * Identity of the peer at the other end of the link. + */ + struct GNUNET_PeerIdentity target; + + /** + * Task scheduled to possibly notfiy core that this peer is no + * longer counting as confirmed. Runs the #core_visibility_check(), + * which checks that some DV-path or a queue exists that is still + * considered confirmed. + */ + struct GNUNET_SCHEDULER_Task *visibility_task; + + /** + * Neighbour used by this virtual link, NULL if @e dv is used. + */ + struct Neighbour *n; + + /** + * Distance vector used by this virtual link, NULL if @e n is used. + */ + struct DistanceVector *dv; + + /** + * How many more messages can we send to core before we exhaust + * the receive window of CORE for this peer? If this hits zero, + * we must tell communicators to stop providing us more messages + * for this peer. + */ + unsigned int core_recv_window; +}; + /** * Data structure kept when we are waiting for an acknowledgement. @@ -1316,31 +1373,10 @@ struct DistanceVector struct GNUNET_SCHEDULER_Task *timeout_task; /** - * Task scheduled to possibly notfiy core that this queue is no longer - * counting as confirmed. Runs the #core_queue_visibility_check(). - */ - struct GNUNET_SCHEDULER_Task *visibility_task; - - /** - * Quota at which CORE is allowed to transmit to this peer - * (note that the value CORE should actually be told is this - * value plus the respective value in `struct Neighbour`). - * Should match the sum of the quotas of all of the paths. - * - * FIXME: not yet set, tricky to get right given multiple paths, - * many of which may be inactive! (=> Idea: measure???) - * FIXME: how do we set this value initially when we tell CORE? - * Options: start at a minimum value or at literally zero? - * (=> Current thought: clean would be zero!) - */ - struct GNUNET_BANDWIDTH_Value32NBO quota_out; - - /** - * Is one of the DV paths in this struct 'confirmed' and thus - * the cause for CORE to see this peer as connected? (Note that - * the same may apply to a `struct Neighbour` at the same time.) + * Do we have a confirmed working queue and are thus visible to + * CORE? If so, this is the virtual link, otherwise NULL. */ - int core_visible; + struct VirtualLink *link; }; @@ -1450,12 +1486,6 @@ struct Queue */ struct GNUNET_SCHEDULER_Task *transmit_task; - /** - * Task scheduled to possibly notfiy core that this queue is no longer - * counting as confirmed. Runs the #core_queue_visibility_check(). - */ - struct GNUNET_SCHEDULER_Task *visibility_task; - /** * How long do *we* consider this @e address to be valid? In the past or * zero if we have not yet validated it. Can be updated based on @@ -1642,11 +1672,6 @@ struct Neighbour */ struct Queue *queue_tail; - /** - * Task run to cleanup pending messages that have exceeded their timeout. - */ - struct GNUNET_SCHEDULER_Task *timeout_task; - /** * Handle for an operation to fetch @e last_dv_learn_monotime information from * the PEERSTORE, or NULL. @@ -1660,18 +1685,10 @@ struct Neighbour struct GNUNET_PEERSTORE_StoreContext *sc; /** - * Quota at which CORE is allowed to transmit to this peer - * (note that the value CORE should actually be told is this - * value plus the respective value in `struct DistanceVector`). - * Should match the sum of the quotas of all of the queues. - * - * FIXME: not yet set, tricky to get right given multiple queues! - * (=> Idea: measure???) - * FIXME: how do we set this value initially when we tell CORE? - * Options: start at a minimum value or at literally zero? - * (=> Current thought: clean would be zero!) + * Do we have a confirmed working queue and are thus visible to + * CORE? If so, this is the virtual link, otherwise NULL. */ - struct GNUNET_BANDWIDTH_Value32NBO quota_out; + struct VirtualLink *link; /** * Latest DVLearn monotonic time seen from this peer. Initialized only @@ -1679,17 +1696,6 @@ struct Neighbour */ struct GNUNET_TIME_Absolute last_dv_learn_monotime; - /** - * What is the earliest timeout of any message in @e pending_msg_tail? - */ - struct GNUNET_TIME_Absolute earliest_timeout; - - /** - * Do we have a confirmed working queue and are thus visible to - * CORE? - */ - int core_visible; - /** * Do we have the lastest value for @e last_dv_learn_monotime from * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE? @@ -2416,6 +2422,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes; */ static struct GNUNET_CONTAINER_MultiPeerMap *validation_map; +/** + * Map from PIDs to `struct VirtualLink` entries describing + * links CORE knows to exist. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *links; + /** * Map from challenges to `struct LearnLaunchEntry` values. */ @@ -2563,6 +2575,26 @@ free_ephemeral (struct EphemeralCacheEntry *ece) } +/** + * Free virtual link. + * + * @param vl link data to free + */ +static void +free_virtual_link (struct VirtualLink *vl) +{ + GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl); + if (NULL != vl->visibility_task) + { + GNUNET_SCHEDULER_cancel (vl->visibility_task); + vl->visibility_task = NULL; + } + GNUNET_break (NULL == vl->n); + GNUNET_break (NULL == vl->dv); + GNUNET_free (vl); +} + + /** * Free validation state. * @@ -2684,8 +2716,6 @@ free_dv_route (struct DistanceVector *dv) GNUNET_assert ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); - if (NULL != dv->visibility_task) - GNUNET_SCHEDULER_cancel (dv->visibility_task); if (NULL != dv->timeout_task) GNUNET_SCHEDULER_cancel (dv->timeout_task); GNUNET_free (dv); @@ -2873,8 +2903,6 @@ free_neighbour (struct Neighbour *neighbour) GNUNET_CONTAINER_multipeermap_remove (neighbours, &neighbour->pid, neighbour)); - if (NULL != neighbour->timeout_task) - GNUNET_SCHEDULER_cancel (neighbour->timeout_task); if (NULL != neighbour->reassembly_map) { GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map, @@ -2917,19 +2945,16 @@ free_neighbour (struct Neighbour *neighbour) * * @param tc client to inform (must be CORE client) * @param pid peer the connection is for - * @param quota_out current quota for the peer */ static void core_send_connect_info (struct TransportClient *tc, - const struct GNUNET_PeerIdentity *pid, - struct GNUNET_BANDWIDTH_Value32NBO quota_out) + const struct GNUNET_PeerIdentity *pid) { struct GNUNET_MQ_Envelope *env; struct ConnectInfoMessage *cim; GNUNET_assert (CT_CORE == tc->type); env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim->quota_out = quota_out; cim->id = *pid; GNUNET_MQ_send (tc->mq, env); } @@ -2939,11 +2964,9 @@ core_send_connect_info (struct TransportClient *tc, * Send message to CORE clients that we gained a connection * * @param pid peer the queue was for - * @param quota_out current quota for the peer */ static void -cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, - struct GNUNET_BANDWIDTH_Value32NBO quota_out) +cores_send_connect_info (const struct GNUNET_PeerIdentity *pid) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Informing CORE clients about connection to %s\n", @@ -2952,7 +2975,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, { if (CT_CORE != tc->type) continue; - core_send_connect_info (tc, pid, quota_out); + core_send_connect_info (tc, pid); } } @@ -3059,13 +3082,43 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) /** - * Check whether the CORE visibility of @a n changed. If so, - * check whether we need to notify CORE. + * Task run to check whether the hops of the @a cls still + * are validated, or if we need to core about disconnection. * - * @param n neighbour to perform the check for + * @param cls a `struct VirtualLink` */ static void -update_neighbour_core_visibility (struct Neighbour *n); +check_link_down (void *cls) +{ + struct VirtualLink *vl = cls; + struct DistanceVector *dv = vl->dv; + struct Neighbour *n = vl->n; + struct GNUNET_TIME_Absolute dvh_timeout; + struct GNUNET_TIME_Absolute q_timeout; + + vl->visibility_task = NULL; + dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS; + for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; + pos = pos->next_dv) + dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until); + if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) + vl->dv = NULL; + q_timeout = GNUNET_TIME_UNIT_ZERO_ABS; + for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) + q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until); + if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) + vl->n = NULL; + if ((NULL == vl->n) && (NULL == vl->dv)) + { + cores_send_disconnect_info (&dv->target); + free_virtual_link (vl); + return; + } + vl->visibility_task = + GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout), + &check_link_down, + vl); +} /** @@ -3083,17 +3136,13 @@ free_queue (struct Queue *queue) struct QueueEntry *qe; int maxxed; struct PendingAcknowledgement *pa; + struct VirtualLink *vl; if (NULL != queue->transmit_task) { GNUNET_SCHEDULER_cancel (queue->transmit_task); queue->transmit_task = NULL; } - if (NULL != queue->visibility_task) - { - GNUNET_SCHEDULER_cancel (queue->visibility_task); - queue->visibility_task = NULL; - } while (NULL != (pa = queue->pa_head)) { GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa); @@ -3139,9 +3188,12 @@ free_queue (struct Queue *queue) notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); GNUNET_free (queue); - update_neighbour_core_visibility (neighbour); - cores_send_disconnect_info (&neighbour->pid); - + vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid); + if ((NULL != vl) && (neighbour == vl->n)) + { + GNUNET_SCHEDULER_cancel (vl->visibility_task); + check_link_down (vl); + } if (NULL == neighbour->queue_head) { free_neighbour (neighbour); @@ -3281,12 +3333,12 @@ notify_client_connect_info (void *cls, void *value) { struct TransportClient *tc = cls; - struct Neighbour *neighbour = value; + (void) value; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Telling new CORE client about existing connection to %s\n", GNUNET_i2s (pid)); - core_send_connect_info (tc, pid, neighbour->quota_out); + core_send_connect_info (tc, pid); return GNUNET_OK; } @@ -3469,9 +3521,6 @@ client_send_response (struct PendingMessage *pm, if (NULL != tc) { env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - som->success = htonl ((uint32_t) success); - som->bytes_msg = htons (pm->bytes_msg); - som->bytes_physical = htonl (bytes_physical); som->peer = target->pid; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Confirming %s transmission of %u/%u bytes to %s\n", @@ -3485,45 +3534,6 @@ client_send_response (struct PendingMessage *pm, } -/** - * Checks the message queue for a neighbour for messages that have timed - * out and purges them. - * - * @param cls a `struct Neighbour` - */ -static void -check_queue_timeouts (void *cls) -{ - struct Neighbour *n = cls; - struct PendingMessage *pm; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Absolute earliest_timeout; - - n->timeout_task = NULL; - earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; - now = GNUNET_TIME_absolute_get (); - for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm) - { - pm = pos->next_neighbour; - if (pos->timeout.abs_value_us <= now.abs_value_us) - { - GNUNET_STATISTICS_update (GST_stats, - "# messages dropped (timeout before confirmation)", - 1, - GNUNET_NO); - client_send_response (pm, GNUNET_NO, 0); - continue; - } - earliest_timeout = - GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout); - } - n->earliest_timeout = earliest_timeout; - if (NULL != n->pending_msg_head) - n->timeout_task = - GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n); -} - - /** * Create a DV Box message. * @@ -3689,30 +3699,18 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) const void *payload; size_t payload_size; struct TransportDVBoxMessage *dvb; + struct VirtualLink *vl; GNUNET_assert (CT_CORE == tc->type); obmm = (const struct GNUNET_MessageHeader *) &obm[1]; bytes_msg = ntohs (obmm->size); - target = lookup_neighbour (&obm->peer); - if (NULL == target) - dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer); - else - dv = NULL; - if ((NULL == target) && ((NULL == dv) || (GNUNET_NO == dv->core_visible))) + vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer); + if (NULL == vl) { /* Failure: don't have this peer as a neighbour (anymore). Might have gone down asynchronously, so this is NOT a protocol violation by CORE. Still count the event, as this should be rare. */ - struct GNUNET_MQ_Envelope *env; - struct SendOkMessage *som; - - env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - som->success = htonl (GNUNET_SYSERR); - som->bytes_msg = htonl (bytes_msg); - som->bytes_physical = htonl (0); - som->peer = obm->peer; - GNUNET_MQ_send (tc->mq, env); GNUNET_SERVICE_client_continue (tc->client); GNUNET_STATISTICS_update (GST_stats, "# messages dropped (neighbour unknown)", @@ -3720,6 +3718,12 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) GNUNET_NO); return; } + target = lookup_neighbour (&obm->peer); + if (NULL == target) + dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer); + else + dv = NULL; + GNUNET_assert ((NULL != target) || (NULL != dv)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending %u bytes to %s using %s\n", bytes_msg, @@ -3756,8 +3760,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) pm->client = tc; pm->target = target; pm->bytes_msg = payload_size; - pm->timeout = - GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout)); memcpy (&pm[1], payload, payload_size); GNUNET_free_non_null (dvb); dvb = NULL; @@ -3777,15 +3779,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); - if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) - { - target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us; - if (NULL != target->timeout_task) - GNUNET_SCHEDULER_cancel (target->timeout_task); - target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout, - &check_queue_timeouts, - target); - } if (! was_empty) return; /* all queues must already be busy */ for (struct Queue *queue = target->queue_head; NULL != queue; @@ -3833,6 +3826,47 @@ check_communicator_available ( } +/** + * Client confirms that it is done handling message(s) to a particular + * peer. We may now provide more messages to CORE for this peer. + * + * Notifies the respective queues that more messages can now be received. + * + * @param cls the client + * @param rom the message that was sent + */ +static void +handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom) +{ + struct TransportClient *tc = cls; + struct VirtualLink *vl; + uint32_t delta; + + if (CT_CORE != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer); + if (NULL == vl) + { + GNUNET_STATISTICS_update (GST_stats, + "# RECV_OK dropped: virtual link unknown", + 1, + GNUNET_NO); + GNUNET_SERVICE_client_continue (tc->client); + return; + } + delta = ntohl (rom->increase_window_delta); + vl->core_recv_window += delta; + if (delta == vl->core_recv_window) + { + // FIXME: resume communicators! + } +} + + /** * Communicator started. Process the request. * @@ -4090,20 +4124,18 @@ route_via_neighbour (const struct Neighbour *n, for (struct Queue *pos = n->queue_head; NULL != pos; pos = pos->next_neighbour) { - /* Count the queue with the visibility task in all cases, as - otherwise we may end up with no queues just because the - time for the visibility task just expired but the scheduler - just ran this task first */ if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || - (pos->validated_until.abs_value_us > now.abs_value_us) || - (NULL != pos->visibility_task)) + (pos->validated_until.abs_value_us > now.abs_value_us)) candidates++; } if (0 == candidates) { - /* Given that we above check for pos->visibility task, - this should be strictly impossible. */ - GNUNET_break (0); + /* This can happen rarely if the last confirmed queue timed + out just as we were beginning to process this message. */ + GNUNET_STATISTICS_update (GST_stats, + "# route selection failed (all no valid queue)", + 1, + GNUNET_NO); return; } sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates); @@ -4115,12 +4147,8 @@ route_via_neighbour (const struct Neighbour *n, for (struct Queue *pos = n->queue_head; NULL != pos; pos = pos->next_neighbour) { - /* Count the queue with the visibility task in all cases, as - otherwise we may end up with no queues just because the - time for the visibility task just expired but the scheduler - just ran this task first */ - if ((pos->validated_until.abs_value_us > now.abs_value_us) || - (NULL != pos->visibility_task)) + if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || + (pos->validated_until.abs_value_us > now.abs_value_us)) { if ((sel1 == candidates) || (sel2 == candidates)) queue_send_msg (pos, NULL, hdr, ntohs (hdr->size)); @@ -4197,21 +4225,21 @@ route_message (const struct GNUNET_PeerIdentity *target, struct GNUNET_MessageHeader *hdr, enum RouteMessageOptions options) { + struct VirtualLink *vl; struct Neighbour *n; struct DistanceVector *dv; - n = lookup_neighbour (target); - dv = (0 != (options & RMO_DV_ALLOWED)) - ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target) - : NULL; + vl = GNUNET_CONTAINER_multipeermap_get (links, target); + n = vl->n; + dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL; if (0 == (options & RMO_UNCONFIRMED_ALLOWED)) { /* if confirmed is required, and we do not have anything confirmed, drop respective options */ - if ((NULL != n) && (GNUNET_NO == n->core_visible)) - n = NULL; - if ((NULL != dv) && (GNUNET_NO == dv->core_visible)) - dv = NULL; + if (NULL == n) + n = lookup_neighbour (target); + if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED))) + dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target); } if ((NULL == n) && (NULL == dv)) { @@ -5758,40 +5786,6 @@ path_cleanup_cb (void *cls) GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv); } -/** - * Task run to check whether the hops of the @a cls still - * are validated, or if we need to core about disconnection. - * - * @param cls a `struct DistanceVector` (with core_visible set!) - */ -static void -check_dv_path_down (void *cls) -{ - struct DistanceVector *dv = cls; - struct Neighbour *n; - - dv->visibility_task = NULL; - GNUNET_assert (GNUNET_YES == dv->core_visible); - for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; - pos = pos->next_dv) - { - if (0 < - GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us) - { - dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until, - &check_dv_path_down, - dv); - return; - } - } - /* all paths invalid, make dv core-invisible */ - dv->core_visible = GNUNET_NO; - n = lookup_neighbour (&dv->target); - if ((NULL != n) && (GNUNET_YES == n->core_visible)) - return; /* no need to tell core, connection still up! */ - cores_send_disconnect_info (&dv->target); -} - /** * The @a hop is a validated path to the respective target @@ -5804,22 +5798,30 @@ static void activate_core_visible_dv_path (struct DistanceVectorHop *hop) { struct DistanceVector *dv = hop->dv; - struct Neighbour *n; - - GNUNET_assert (GNUNET_NO == dv->core_visible); - GNUNET_assert (NULL == dv->visibility_task); + struct VirtualLink *vl; - dv->core_visible = GNUNET_YES; - dv->visibility_task = - GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv); - n = lookup_neighbour (&dv->target); - if ((NULL != n) && (GNUNET_YES == n->core_visible)) - return; /* no need to tell core, connection already up! */ - cores_send_connect_info (&dv->target, - (NULL != n) - ? GNUNET_BANDWIDTH_value_sum (n->quota_out, - dv->quota_out) - : dv->quota_out); + vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target); + if (NULL != vl) + { + /* Link was already up, remember dv is also now available and we are done */ + vl->dv = dv; + return; + } + vl = GNUNET_new (struct VirtualLink); + vl->target = dv->target; + vl->dv = dv; + vl->core_recv_window = RECV_WINDOW_SIZE; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + links, + &vl->target, + vl, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + /* We lacked a confirmed connection to the target + before, so tell CORE about it (finally!) */ + cores_send_connect_info (&dv->target); } @@ -5934,9 +5936,8 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until); GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos); GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos); - if ((GNUNET_NO == dv->core_visible) && - (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until) - .rel_value_us)) + if (0 < + GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us) activate_core_visible_dv_path (pos); if (last_timeout.rel_value_us < GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT, @@ -5976,8 +5977,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, next_hop->dv_head, next_hop->dv_tail, hop); - if ((GNUNET_NO == dv->core_visible) && - (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)) + if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us) activate_core_visible_dv_path (hop); return GNUNET_YES; } @@ -6942,75 +6942,6 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const char *address) } -/** - * Task run periodically to check whether the validity of the given queue has - * run its course. If so, finds either another queue to take over, or clears - * the neighbour's `core_visible` flag. In the latter case, gives DV routes a - * chance to take over, and if that fails, notifies CORE about the disconnect. - * - * @param cls a `struct Queue` - */ -static void -core_queue_visibility_check (void *cls) -{ - struct Queue *q = cls; - - q->visibility_task = NULL; - if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) - { - q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, - &core_queue_visibility_check, - q); - return; - } - update_neighbour_core_visibility (q->neighbour); -} - - -/** - * Check whether the CORE visibility of @a n should change. Finds either a - * queue to preserve the visibility, or clears the neighbour's `core_visible` - * flag. In the latter case, gives DV routes a chance to take over, and if - * that fails, notifies CORE about the disconnect. If so, check whether we - * need to notify CORE. - * - * @param n neighbour to perform the check for - */ -static void -update_neighbour_core_visibility (struct Neighbour *n) -{ - struct DistanceVector *dv; - - GNUNET_assert (GNUNET_YES == n->core_visible); - /* Check if _any_ queue of this neighbour is still valid, if so, schedule - the #core_queue_visibility_check() task for that queue */ - for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) - { - if (0 != - GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) - { - /* found a valid queue, use this one */ - q->visibility_task = - GNUNET_SCHEDULER_add_at (q->validated_until, - &core_queue_visibility_check, - q); - return; - } - } - n->core_visible = GNUNET_NO; - - /* Check if _any_ DV route to this neighbour is currently - valid, if so, do NOT tell core about the loss of direct - connectivity (DV still counts!) */ - dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid); - if (GNUNET_YES == dv->core_visible) - return; - /* Nothing works anymore, need to tell CORE about the loss of - connectivity! */ - cores_send_disconnect_info (&n->pid); -} - - /** * Communicator gave us a transport address validation response. Process the * request. @@ -7030,8 +6961,8 @@ handle_validation_response ( .vs = NULL}; struct GNUNET_TIME_Absolute origin_time; struct Queue *q; - struct DistanceVector *dv; struct Neighbour *n; + struct VirtualLink *vl; /* check this is one of our challenges */ (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, @@ -7129,24 +7060,28 @@ handle_validation_response ( q->validated_until = vs->validated_until; q->pd.aged_rtt = vs->validation_rtt; n = q->neighbour; - if (GNUNET_NO != n->core_visible) - return; /* nothing changed, we are done here */ - n->core_visible = GNUNET_YES; - q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, - &core_queue_visibility_check, - q); - /* Check if _any_ DV route to this neighbour is - currently valid, if so, do NOT tell core anything! */ - dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid); - if ((NULL != dv) && (GNUNET_YES == dv->core_visible)) - return; /* nothing changed, done */ - /* We lacked a confirmed connection to the neighbour + vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid); + if (NULL != vl) + { + /* Link was already up, remember n is also now available and we are done */ + vl->n = n; + return; + } + vl = GNUNET_new (struct VirtualLink); + vl->target = n->pid; + vl->n = n; + vl->core_recv_window = RECV_WINDOW_SIZE; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + links, + &vl->target, + vl, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + /* We lacked a confirmed connection to the target before, so tell CORE about it (finally!) */ - cores_send_connect_info (&n->pid, - (NULL != dv) - ? GNUNET_BANDWIDTH_value_sum (dv->quota_out, - n->quota_out) - : n->quota_out); + cores_send_connect_info (&n->pid); } @@ -8256,7 +8191,6 @@ handle_add_queue_message (void *cls, if (NULL == neighbour) { neighbour = GNUNET_new (struct Neighbour); - neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; neighbour->pid = aqm->receiver; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put ( @@ -8872,8 +8806,12 @@ do_shutdown (void *cls) NULL); GNUNET_CONTAINER_multishortmap_destroy (pending_acks); pending_acks = NULL; + GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours)); GNUNET_CONTAINER_multipeermap_destroy (neighbours); neighbours = NULL; + GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links)); + GNUNET_CONTAINER_multipeermap_destroy (links); + links = NULL; GNUNET_CONTAINER_multipeermap_iterate (backtalkers, &free_backtalker_cb, NULL); @@ -8926,6 +8864,7 @@ run (void *cls, pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES); ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); + links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES); dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); ephemeral_heap = @@ -8995,6 +8934,10 @@ GNUNET_SERVICE_MAIN ( GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, struct OutboundMessage, NULL), + GNUNET_MQ_hd_fixed_size (client_recv_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK, + struct RecvOkMessage, + NULL), /* communication with communicators */ GNUNET_MQ_hd_var_size (communicator_available, GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, diff --git a/src/transport/gnunet-transport-profiler.c b/src/transport/gnunet-transport-profiler.c index 9160a78b2..89f5b4108 100644 --- a/src/transport/gnunet-transport-profiler.c +++ b/src/transport/gnunet-transport-profiler.c @@ -32,7 +32,6 @@ #include "gnunet_protocols.h" #include "gnunet_ats_service.h" #include "gnunet_transport_service.h" -#include "gnunet_transport_core_service.h" struct Iteration @@ -54,7 +53,8 @@ struct Iteration /** * Timeout for a connections */ -#define CONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) +#define CONNECT_TIMEOUT \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) /** * Benchmarking block size in bye @@ -214,15 +214,16 @@ shutdown_task (void *cls) { inext = icur->next; icur->rate = ((benchmark_count * benchmark_size) / 1024) / - ((float) icur->dur.rel_value_us / (1000 * 1000)); + ((float) icur->dur.rel_value_us / (1000 * 1000)); if (verbosity > 0) - FPRINTF (stdout, _("%llu B in %llu ms == %.2f KB/s!\n"), - ((long long unsigned int) benchmark_count * benchmark_size), - ((long long unsigned int) icur->dur.rel_value_us / 1000), - (float) icur->rate); + FPRINTF (stdout, + _ ("%llu B in %llu ms == %.2f KB/s!\n"), + ((long long unsigned int) benchmark_count * benchmark_size), + ((long long unsigned int) icur->dur.rel_value_us / 1000), + (float) icur->rate); avg_duration += icur->dur.rel_value_us / (1000); - avg_rate += icur->rate; + avg_rate += icur->rate; iterations++; } if (0 == iterations) @@ -238,19 +239,17 @@ shutdown_task (void *cls) while (NULL != (icur = inext)) { inext = icur->next; - stddev_rate += ((icur->rate-avg_rate) * - (icur->rate-avg_rate)); + stddev_rate += ((icur->rate - avg_rate) * (icur->rate - avg_rate)); stddev_duration += (((icur->dur.rel_value_us / 1000) - avg_duration) * - ((icur->dur.rel_value_us / 1000) - avg_duration)); - + ((icur->dur.rel_value_us / 1000) - avg_duration)); } /* Calculate standard deviation rate */ stddev_rate = stddev_rate / iterations; - stddev_rate = sqrtf(stddev_rate); + stddev_rate = sqrtf (stddev_rate); /* Calculate standard deviation duration */ stddev_duration = stddev_duration / iterations; - stddev_duration = sqrtf(stddev_duration); + stddev_duration = sqrtf (stddev_duration); /* Output */ FPRINTF (stdout, @@ -266,9 +265,7 @@ shutdown_task (void *cls) while (NULL != (icur = inext)) { inext = icur->next; - GNUNET_CONTAINER_DLL_remove (ihead, - itail, - icur); + GNUNET_CONTAINER_DLL_remove (ihead, itail, icur); FPRINTF (stdout, ";%llu;%.2f", @@ -316,27 +313,19 @@ send_msg (void *cls) if (NULL == mq) return; - env = GNUNET_MQ_msg_extra (m, - benchmark_size, - GNUNET_MESSAGE_TYPE_DUMMY); - memset (&m[1], - 52, - benchmark_size - sizeof(struct GNUNET_MessageHeader)); - + env = GNUNET_MQ_msg_extra (m, benchmark_size, GNUNET_MESSAGE_TYPE_DUMMY); + memset (&m[1], 52, benchmark_size - sizeof (struct GNUNET_MessageHeader)); + if (itail->msgs_sent < benchmark_count) { - GNUNET_MQ_notify_sent (env, - &send_msg, - NULL); + GNUNET_MQ_notify_sent (env, &send_msg, NULL); } else { iteration_done (); } - GNUNET_MQ_send (mq, - env); - if ( (verbosity > 0) && - (0 == itail->msgs_sent % 10) ) + GNUNET_MQ_send (mq, env); + if ((verbosity > 0) && (0 == itail->msgs_sent % 10)) FPRINTF (stdout, "."); } @@ -351,15 +340,14 @@ iteration_start () return; benchmark_running = GNUNET_YES; icur = GNUNET_new (struct Iteration); - GNUNET_CONTAINER_DLL_insert_tail (ihead, - itail, - icur); - icur->start = GNUNET_TIME_absolute_get(); + GNUNET_CONTAINER_DLL_insert_tail (ihead, itail, icur); + icur->start = GNUNET_TIME_absolute_get (); if (verbosity > 0) - FPRINTF (stdout, - "\nStarting benchmark, starting to send %u messages in %u byte blocks\n", - benchmark_count, - benchmark_size); + FPRINTF ( + stdout, + "\nStarting benchmark, starting to send %u messages in %u byte blocks\n", + benchmark_count, + benchmark_size); send_msg (NULL); } @@ -393,22 +381,16 @@ iteration_done () static void * notify_connect (void *cls, const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *m) + struct GNUNET_MQ_Handle *m) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) { - FPRINTF (stdout, - "Connected to different peer `%s'\n", - GNUNET_i2s (&pid)); + FPRINTF (stdout, "Connected to different peer `%s'\n", GNUNET_i2s (&pid)); return NULL; } if (verbosity > 0) - FPRINTF (stdout, - "Successfully connected to `%s'\n", - GNUNET_i2s (&pid)); + FPRINTF (stdout, "Successfully connected to `%s'\n", GNUNET_i2s (&pid)); mq = m; iteration_start (); return NULL; @@ -426,18 +408,16 @@ notify_connect (void *cls, static void notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer, - void *internal_cls) + void *internal_cls) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) return; mq = NULL; if (GNUNET_YES == benchmark_running) { FPRINTF (stdout, "Disconnected from peer `%s' while benchmarking\n", - GNUNET_i2s (&pid)); + GNUNET_i2s (&pid)); return; } } @@ -451,8 +431,7 @@ notify_disconnect (void *cls, * @return #GNUNET_OK */ static int -check_dummy (void *cls, - const struct GNUNET_MessageHeader *message) +check_dummy (void *cls, const struct GNUNET_MessageHeader *message) { return GNUNET_OK; /* all messages are fine */ } @@ -465,30 +444,24 @@ check_dummy (void *cls, * @param message the message */ static void -handle_dummy (void *cls, - const struct GNUNET_MessageHeader *message) +handle_dummy (void *cls, const struct GNUNET_MessageHeader *message) { if (! benchmark_receive) return; if (verbosity > 0) FPRINTF (stdout, - "Received %u bytes\n", - (unsigned int) ntohs (message->size)); + "Received %u bytes\n", + (unsigned int) ntohs (message->size)); } static int -blacklist_cb (void *cls, - const struct GNUNET_PeerIdentity *peer) +blacklist_cb (void *cls, const struct GNUNET_PeerIdentity *peer) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) { if (verbosity > 0) - FPRINTF (stdout, - "Denying connection to `%s'\n", - GNUNET_i2s (peer)); + FPRINTF (stdout, "Denying connection to `%s'\n", GNUNET_i2s (peer)); return GNUNET_SYSERR; } return GNUNET_OK; @@ -509,38 +482,32 @@ run (void *cls, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *mycfg) { - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_var_size (dummy, - GNUNET_MESSAGE_TYPE_DUMMY, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_handler_end () - }; - + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_var_size (dummy, + GNUNET_MESSAGE_TYPE_DUMMY, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end ()}; + cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg; ret = 1; if (GNUNET_MAX_MESSAGE_SIZE <= benchmark_size) { - FPRINTF (stderr, - "Message size too big!\n"); + FPRINTF (stderr, "Message size too big!\n"); return; } if (NULL == cpid) { - FPRINTF (stderr, - "No peer identity given\n"); + FPRINTF (stderr, "No peer identity given\n"); return; } - if (GNUNET_OK != - GNUNET_CRYPTO_eddsa_public_key_from_string (cpid, - strlen (cpid), - &pid.public_key)) + if (GNUNET_OK != GNUNET_CRYPTO_eddsa_public_key_from_string (cpid, + strlen (cpid), + &pid.public_key)) { - FPRINTF (stderr, - "Failed to parse peer identity `%s'\n", - cpid); + FPRINTF (stderr, "Failed to parse peer identity `%s'\n", cpid); return; } if (1 == benchmark_send) @@ -548,7 +515,8 @@ run (void *cls, if (verbosity > 0) FPRINTF (stderr, "Trying to send %u messages with size %u to peer `%s'\n", - benchmark_count, benchmark_size, + benchmark_count, + benchmark_size, GNUNET_i2s (&pid)); } else if (1 == benchmark_receive) @@ -559,50 +527,42 @@ run (void *cls, } else { - FPRINTF (stderr, - "No operation given\n"); + FPRINTF (stderr, "No operation given\n"); return; } ats = GNUNET_ATS_connectivity_init (cfg); if (NULL == ats) { - FPRINTF (stderr, - "Failed to connect to ATS service\n"); + FPRINTF (stderr, "Failed to connect to ATS service\n"); ret = 1; return; } handle = GNUNET_TRANSPORT_core_connect (cfg, - NULL, - handlers, - NULL, - ¬ify_connect, - ¬ify_disconnect, - NULL); + NULL, + handlers, + NULL, + ¬ify_connect, + ¬ify_disconnect, + NULL); if (NULL == handle) { - FPRINTF (stderr, - "Failed to connect to transport service\n"); + FPRINTF (stderr, "Failed to connect to transport service\n"); GNUNET_ATS_connectivity_done (ats); ats = NULL; ret = 1; return; } - bl_handle = GNUNET_TRANSPORT_blacklist (cfg, - &blacklist_cb, - NULL); - ats_sh = GNUNET_ATS_connectivity_suggest (ats, - &pid, - 1); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - NULL); + bl_handle = GNUNET_TRANSPORT_blacklist (cfg, &blacklist_cb, NULL); + ats_sh = GNUNET_ATS_connectivity_suggest (ats, &pid, 1); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); } int -main (int argc, char * const *argv) +main (int argc, char *const *argv) { int res; benchmark_count = DEFAULT_MESSAGE_COUNT; @@ -613,46 +573,48 @@ main (int argc, char * const *argv) struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_flag ('s', - "send", - gettext_noop ("send data to peer"), - &benchmark_send), + "send", + gettext_noop ("send data to peer"), + &benchmark_send), GNUNET_GETOPT_option_flag ('r', - "receive", - gettext_noop ("receive data from peer"), - &benchmark_receive), + "receive", + gettext_noop ("receive data from peer"), + &benchmark_receive), GNUNET_GETOPT_option_uint ('i', - "iterations", - NULL, - gettext_noop ("iterations"), - &benchmark_iterations), + "iterations", + NULL, + gettext_noop ("iterations"), + &benchmark_iterations), GNUNET_GETOPT_option_uint ('n', - "number", - NULL, - gettext_noop ("number of messages to send"), - &benchmark_count), + "number", + NULL, + gettext_noop ("number of messages to send"), + &benchmark_count), GNUNET_GETOPT_option_uint ('m', - "messagesize", - NULL, - gettext_noop ("message size to use"), - &benchmark_size), + "messagesize", + NULL, + gettext_noop ("message size to use"), + &benchmark_size), GNUNET_GETOPT_option_string ('p', "peer", "PEER", gettext_noop ("peer identity"), &cpid), GNUNET_GETOPT_option_verbose (&verbosity), - GNUNET_GETOPT_OPTION_END - }; + GNUNET_GETOPT_OPTION_END}; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return 2; - res = GNUNET_PROGRAM_run (argc, argv, - "gnunet-transport", - gettext_noop ("Direct access to transport service."), - options, - &run, NULL); - GNUNET_free((void *) argv); + res = + GNUNET_PROGRAM_run (argc, + argv, + "gnunet-transport", + gettext_noop ("Direct access to transport service."), + options, + &run, + NULL); + GNUNET_free ((void *) argv); if (GNUNET_OK == res) return ret; return 1; diff --git a/src/transport/gnunet-transport.c b/src/transport/gnunet-transport.c index c3c1afc38..36c8fc451 100644 --- a/src/transport/gnunet-transport.c +++ b/src/transport/gnunet-transport.c @@ -29,12 +29,12 @@ #include "gnunet_resolver_service.h" #include "gnunet_protocols.h" #include "gnunet_transport_service.h" -#include "gnunet_transport_core_service.h" /** * Timeout for a name resolution */ -#define RESOLUTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) +#define RESOLUTION_TIMEOUT \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) /** * Timeout for an operation @@ -332,16 +332,13 @@ static struct PeerResolutionContext *rc_tail; * @return #GNUNET_OK (continue to iterate) */ static int -destroy_it (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +destroy_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value) { struct MonitoredPeer *m = value; - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_remove (monitored_peers, - key, - value)); + GNUNET_assert ( + GNUNET_OK == + GNUNET_CONTAINER_multipeermap_remove (monitored_peers, key, value)); GNUNET_free_non_null (m->address); GNUNET_free (value); return GNUNET_OK; @@ -384,18 +381,14 @@ shutdown_task (void *cls) next = cur->next; GNUNET_TRANSPORT_address_to_string_cancel (cur->asc); - GNUNET_CONTAINER_DLL_remove (vc_head, - vc_tail, - cur); + GNUNET_CONTAINER_DLL_remove (vc_head, vc_tail, cur); GNUNET_free (cur->transport); GNUNET_HELLO_address_free (cur->addrcp); GNUNET_free (cur); } while (NULL != (rc = rc_head)) { - GNUNET_CONTAINER_DLL_remove (rc_head, - rc_tail, - rc); + GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc); GNUNET_TRANSPORT_address_to_string_cancel (rc->asc); GNUNET_free (rc->transport); GNUNET_free (rc->addrcp); @@ -410,35 +403,30 @@ shutdown_task (void *cls) { duration = GNUNET_TIME_absolute_get_duration (start_time); FPRINTF (stdout, - _("Transmitted %llu bytes/s (%llu bytes in %s)\n"), + _ ("Transmitted %llu bytes/s (%llu bytes in %s)\n"), 1000LL * 1000LL * traffic_sent / (1 + duration.rel_value_us), traffic_sent, - GNUNET_STRINGS_relative_time_to_string (duration, - GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_YES)); } if (benchmark_receive) { duration = GNUNET_TIME_absolute_get_duration (start_time); FPRINTF (stdout, - _("Received %llu bytes/s (%llu bytes in %s)\n"), + _ ("Received %llu bytes/s (%llu bytes in %s)\n"), 1000LL * 1000LL * traffic_received / (1 + duration.rel_value_us), traffic_received, - GNUNET_STRINGS_relative_time_to_string (duration, - GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_YES)); } if (NULL != monitored_peers) { - GNUNET_CONTAINER_multipeermap_iterate (monitored_peers, - &destroy_it, - NULL); + GNUNET_CONTAINER_multipeermap_iterate (monitored_peers, &destroy_it, NULL); GNUNET_CONTAINER_multipeermap_destroy (monitored_peers); monitored_peers = NULL; } if (NULL != monitored_plugins) { - GNUNET_break (0 == - GNUNET_CONTAINER_multipeermap_size (monitored_plugins)); + GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (monitored_plugins)); GNUNET_CONTAINER_multipeermap_destroy (monitored_plugins); monitored_plugins = NULL; } @@ -463,9 +451,7 @@ operation_timeout (void *cls) op_timeout = NULL; if ((benchmark_send) || (benchmark_receive)) { - FPRINTF (stdout, - _("Failed to connect to `%s'\n"), - GNUNET_i2s_full (&pid)); + FPRINTF (stdout, _ ("Failed to connect to `%s'\n"), GNUNET_i2s_full (&pid)); GNUNET_SCHEDULER_shutdown (); ret = 1; return; @@ -477,21 +463,18 @@ operation_timeout (void *cls) { next = cur->next; FPRINTF (stdout, - _("Failed to resolve address for peer `%s'\n"), + _ ("Failed to resolve address for peer `%s'\n"), GNUNET_i2s (&cur->addrcp->peer)); - GNUNET_CONTAINER_DLL_remove(rc_head, - rc_tail, - cur); + GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, cur); GNUNET_TRANSPORT_address_to_string_cancel (cur->asc); GNUNET_free (cur->transport); GNUNET_free (cur->addrcp); GNUNET_free (cur); - } FPRINTF (stdout, "%s", - _("Failed to list connections, timeout occurred\n")); + _ ("Failed to list connections, timeout occurred\n")); GNUNET_SCHEDULER_shutdown (); ret = 1; return; @@ -512,22 +495,15 @@ do_send (void *cls) struct GNUNET_MessageHeader *m; struct GNUNET_MQ_Envelope *env; - env = GNUNET_MQ_msg_extra (m, - BLOCKSIZE * 1024, - GNUNET_MESSAGE_TYPE_DUMMY); - memset (&m[1], - 52, - BLOCKSIZE * 1024 - sizeof(struct GNUNET_MessageHeader)); + env = GNUNET_MQ_msg_extra (m, BLOCKSIZE * 1024, GNUNET_MESSAGE_TYPE_DUMMY); + memset (&m[1], 52, BLOCKSIZE * 1024 - sizeof (struct GNUNET_MessageHeader)); traffic_sent += BLOCKSIZE * 1024; - GNUNET_MQ_notify_sent (env, - &do_send, - mq); + GNUNET_MQ_notify_sent (env, &do_send, mq); if (verbosity > 0) FPRINTF (stdout, - _("Transmitting %u bytes\n"), - (unsigned int) BLOCKSIZE * 1024); - GNUNET_MQ_send (mq, - env); + _ ("Transmitting %u bytes\n"), + (unsigned int) BLOCKSIZE * 1024); + GNUNET_MQ_send (mq, env); } @@ -542,11 +518,9 @@ do_send (void *cls) static void * notify_connect (void *cls, const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *mq) + struct GNUNET_MQ_Handle *mq) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) return NULL; ret = 0; if (! benchmark_send) @@ -557,10 +531,12 @@ notify_connect (void *cls, op_timeout = NULL; } if (verbosity > 0) - FPRINTF (stdout, - _("Successfully connected to `%s', starting to send benchmark data in %u Kb blocks\n"), - GNUNET_i2s (peer), - BLOCKSIZE); + FPRINTF ( + stdout, + _ ( + "Successfully connected to `%s', starting to send benchmark data in %u Kb blocks\n"), + GNUNET_i2s (peer), + BLOCKSIZE); start_time = GNUNET_TIME_absolute_get (); do_send (mq); return mq; @@ -578,19 +554,17 @@ notify_connect (void *cls, static void notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer, - void *internal_cls) + void *internal_cls) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) return; if (NULL == internal_cls) return; /* not about target peer */ if (! benchmark_send) return; /* not transmitting */ FPRINTF (stdout, - _("Disconnected from peer `%s' while benchmarking\n"), - GNUNET_i2s (&pid)); + _ ("Disconnected from peer `%s' while benchmarking\n"), + GNUNET_i2s (&pid)); } @@ -606,16 +580,16 @@ notify_disconnect (void *cls, static void * monitor_notify_connect (void *cls, const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *mq) + struct GNUNET_MQ_Handle *mq) { struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now); monitor_connect_counter++; FPRINTF (stdout, - _("%24s: %-17s %4s (%u connections in total)\n"), + _ ("%24s: %-17s %4s (%u connections in total)\n"), now_str, - _("Connected to"), + _ ("Connected to"), GNUNET_i2s (peer), monitor_connect_counter); return NULL; @@ -633,18 +607,18 @@ monitor_notify_connect (void *cls, static void monitor_notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer, - void *internal_cls) + void *internal_cls) { struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now); - GNUNET_assert(monitor_connect_counter > 0); + GNUNET_assert (monitor_connect_counter > 0); monitor_connect_counter--; FPRINTF (stdout, - _("%24s: %-17s %4s (%u connections in total)\n"), + _ ("%24s: %-17s %4s (%u connections in total)\n"), now_str, - _("Disconnected from"), + _ ("Disconnected from"), GNUNET_i2s (peer), monitor_connect_counter); } @@ -658,8 +632,7 @@ monitor_notify_disconnect (void *cls, * @return #GNUNET_OK */ static int -check_dummy (void *cls, - const struct GNUNET_MessageHeader *message) +check_dummy (void *cls, const struct GNUNET_MessageHeader *message) { return GNUNET_OK; /* all messages are fine */ } @@ -672,15 +645,14 @@ check_dummy (void *cls, * @param message the message */ static void -handle_dummy (void *cls, - const struct GNUNET_MessageHeader *message) +handle_dummy (void *cls, const struct GNUNET_MessageHeader *message) { if (! benchmark_receive) return; if (verbosity > 0) FPRINTF (stdout, - _("Received %u bytes\n"), - (unsigned int) ntohs (message->size)); + _ ("Received %u bytes\n"), + (unsigned int) ntohs (message->size)); if (0 == traffic_received) start_time = GNUNET_TIME_absolute_get (); traffic_received += ntohs (message->size); @@ -711,24 +683,23 @@ print_info (const struct GNUNET_PeerIdentity *id, struct GNUNET_TIME_Absolute state_timeout) { - if ( ((GNUNET_YES == iterate_connections) && - (GNUNET_YES == iterate_all)) || - (GNUNET_YES == monitor_connections)) + if (((GNUNET_YES == iterate_connections) && (GNUNET_YES == iterate_all)) || + (GNUNET_YES == monitor_connections)) { FPRINTF (stdout, - _("Peer `%s': %s %s in state `%s' until %s\n"), + _ ("Peer `%s': %s %s in state `%s' until %s\n"), GNUNET_i2s (id), (NULL == transport) ? "" : transport, (NULL == transport) ? "" : addr, GNUNET_TRANSPORT_ps2s (state), GNUNET_STRINGS_absolute_time_to_string (state_timeout)); } - else if ( (GNUNET_YES == iterate_connections) && - (GNUNET_TRANSPORT_is_connected(state)) ) + else if ((GNUNET_YES == iterate_connections) && + (GNUNET_TRANSPORT_is_connected (state))) { /* Only connected peers, skip state */ FPRINTF (stdout, - _("Peer `%s': %s %s\n"), + _ ("Peer `%s': %s %s\n"), GNUNET_i2s (id), transport, addr); @@ -753,9 +724,7 @@ print_info (const struct GNUNET_PeerIdentity *id, * if #GNUNET_SYSERR: communication error (IPC error) */ static void -process_peer_string (void *cls, - const char *address, - int res) +process_peer_string (void *cls, const char *address, int res) { struct PeerResolutionContext *rc = cls; @@ -763,11 +732,12 @@ process_peer_string (void *cls, { if (GNUNET_SYSERR == res) { - FPRINTF (stderr, - "Failed to convert address for peer `%s' plugin `%s' length %u to string \n", - GNUNET_i2s (&rc->addrcp->peer), - rc->addrcp->transport_name, - (unsigned int) rc->addrcp->address_length); + FPRINTF ( + stderr, + "Failed to convert address for peer `%s' plugin `%s' length %u to string \n", + GNUNET_i2s (&rc->addrcp->peer), + rc->addrcp->transport_name, + (unsigned int) rc->addrcp->address_length); print_info (&rc->addrcp->peer, rc->transport, NULL, @@ -818,9 +788,7 @@ process_peer_string (void *cls, } GNUNET_free (rc->transport); GNUNET_free (rc->addrcp); - GNUNET_CONTAINER_DLL_remove (rc_head, - rc_tail, - rc); + GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc); GNUNET_free (rc); if ((0 == address_resolutions) && (iterate_connections)) { @@ -854,9 +822,7 @@ resolve_peer_address (const struct GNUNET_HELLO_Address *address, struct PeerResolutionContext *rc; rc = GNUNET_new (struct PeerResolutionContext); - GNUNET_CONTAINER_DLL_insert (rc_head, - rc_tail, - rc); + GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc); address_resolutions++; rc->transport = GNUNET_strdup (address->transport_name); rc->addrcp = GNUNET_HELLO_address_copy (address); @@ -869,7 +835,7 @@ resolve_peer_address (const struct GNUNET_HELLO_Address *address, numeric, RESOLUTION_TIMEOUT, &process_peer_string, - rc); + rc); } @@ -897,15 +863,14 @@ process_peer_iteration_cb (void *cls, return; } - if ( (GNUNET_NO == iterate_all) && - (GNUNET_NO == GNUNET_TRANSPORT_is_connected(state))) - return; /* Display only connected peers */ + if ((GNUNET_NO == iterate_all) && + (GNUNET_NO == GNUNET_TRANSPORT_is_connected (state))) + return; /* Display only connected peers */ if (NULL != op_timeout) GNUNET_SCHEDULER_cancel (op_timeout); - op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, - &operation_timeout, - NULL); + op_timeout = + GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received address for peer `%s': %s\n", @@ -913,16 +878,9 @@ process_peer_iteration_cb (void *cls, address ? address->transport_name : ""); if (NULL != address) - resolve_peer_address (address, - numeric, - state, - state_timeout); + resolve_peer_address (address, numeric, state, state_timeout); else - print_info (peer, - NULL, - NULL, - state, - state_timeout); + print_info (peer, NULL, NULL, state, state_timeout); } @@ -958,7 +916,7 @@ struct PluginMonitorAddress */ static void print_plugin_event_info (struct PluginMonitorAddress *addr, - const struct GNUNET_TRANSPORT_SessionInfo *info) + const struct GNUNET_TRANSPORT_SessionInfo *info) { const char *state; @@ -987,20 +945,22 @@ print_plugin_event_info (struct PluginMonitorAddress *addr, "%s: state %s timeout in %s @ %s%s\n", GNUNET_i2s (&info->address->peer), state, - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (info->session_timeout), - GNUNET_YES), - addr->str, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_remaining (info->session_timeout), + GNUNET_YES), + addr->str, (info->is_inbound == GNUNET_YES) ? " (INBOUND)" : ""); fprintf (stdout, "%s: queue has %3u messages and %6u bytes\n", GNUNET_i2s (&info->address->peer), info->num_msg_pending, info->num_bytes_pending); - if (0 != GNUNET_TIME_absolute_get_remaining (info->receive_delay).rel_value_us) + if (0 != + GNUNET_TIME_absolute_get_remaining (info->receive_delay).rel_value_us) fprintf (stdout, - "%s: receiving blocked until %s\n", - GNUNET_i2s (&info->address->peer), - GNUNET_STRINGS_absolute_time_to_string (info->receive_delay)); + "%s: receiving blocked until %s\n", + GNUNET_i2s (&info->address->peer), + GNUNET_STRINGS_absolute_time_to_string (info->receive_delay)); } @@ -1021,9 +981,7 @@ print_plugin_event_info (struct PluginMonitorAddress *addr, * if #GNUNET_SYSERR: communication error (IPC error) */ static void -address_cb (void *cls, - const char *address, - int res) +address_cb (void *cls, const char *address, int res) { struct PluginMonitorAddress *addr = cls; @@ -1035,8 +993,7 @@ address_cb (void *cls, if (NULL != addr->str) return; addr->str = GNUNET_strdup (address); - print_plugin_event_info (addr, - &addr->si); + print_plugin_event_info (addr, &addr->si); } @@ -1065,8 +1022,7 @@ plugin_monitoring_cb (void *cls, { struct PluginMonitorAddress *addr; - if ( (NULL == info) && - (NULL == session) ) + if ((NULL == info) && (NULL == session)) return; /* in sync with transport service */ addr = *session_ctx; if (NULL == info) @@ -1084,26 +1040,25 @@ plugin_monitoring_cb (void *cls, } return; /* shutdown */ } - if (0 != memcmp (&info->address->peer, - &pid, - sizeof (struct GNUNET_PeerIdentity))) + if (0 != + memcmp (&info->address->peer, &pid, sizeof (struct GNUNET_PeerIdentity))) return; /* filtered */ if (NULL == addr) { addr = GNUNET_new (struct PluginMonitorAddress); - addr->asc = GNUNET_TRANSPORT_address_to_string (cfg, - info->address, - numeric, - GNUNET_TIME_UNIT_FOREVER_REL, - &address_cb, - addr); + addr->asc = + GNUNET_TRANSPORT_address_to_string (cfg, + info->address, + numeric, + GNUNET_TIME_UNIT_FOREVER_REL, + &address_cb, + addr); *session_ctx = addr; } if (NULL == addr->str) addr->si = *info; else - print_plugin_event_info (addr, - info); + print_plugin_event_info (addr, info); if (GNUNET_TRANSPORT_SS_DONE == info->state) { if (NULL != addr->asc) @@ -1141,38 +1096,35 @@ process_peer_monitoring_cb (void *cls, { FPRINTF (stdout, "%s", - _("Monitor disconnected from transport service. Reconnecting.\n")); + _ ( + "Monitor disconnected from transport service. Reconnecting.\n")); return; } if (NULL != op_timeout) GNUNET_SCHEDULER_cancel (op_timeout); - op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, - &operation_timeout, - NULL); + op_timeout = + GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL); - if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers, - peer))) + if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers, peer))) { m = GNUNET_new (struct MonitoredPeer); - GNUNET_CONTAINER_multipeermap_put (monitored_peers, - peer, - m, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + GNUNET_CONTAINER_multipeermap_put ( + monitored_peers, + peer, + m, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } else { - if ( (m->state == state) && - (m->state_timeout.abs_value_us == state_timeout.abs_value_us) && - (NULL == address) && - (NULL == m->address) ) + if ((m->state == state) && + (m->state_timeout.abs_value_us == state_timeout.abs_value_us) && + (NULL == address) && (NULL == m->address)) { return; /* No real change */ } - if ( (m->state == state) && - (NULL != address) && - (NULL != m->address) && - (0 == GNUNET_HELLO_address_cmp(m->address, address)) ) + if ((m->state == state) && (NULL != address) && (NULL != m->address) && + (0 == GNUNET_HELLO_address_cmp (m->address, address))) return; /* No real change */ } @@ -1187,16 +1139,9 @@ process_peer_monitoring_cb (void *cls, m->state_timeout = state_timeout; if (NULL != address) - resolve_peer_address (m->address, - numeric, - m->state, - m->state_timeout); + resolve_peer_address (m->address, numeric, m->state, m->state_timeout); else - print_info (peer, - NULL, - NULL, - m->state, - m->state_timeout); + print_info (peer, NULL, NULL, m->state, m->state_timeout); } @@ -1210,12 +1155,9 @@ process_peer_monitoring_cb (void *cls, * @return #GNUNET_OK if the connection is allowed, #GNUNET_SYSERR if not */ static int -blacklist_cb (void *cls, - const struct GNUNET_PeerIdentity *cpid) +blacklist_cb (void *cls, const struct GNUNET_PeerIdentity *cpid) { - if (0 == memcmp (cpid, - &pid, - sizeof (struct GNUNET_PeerIdentity))) + if (0 == memcmp (cpid, &pid, sizeof (struct GNUNET_PeerIdentity))) return GNUNET_SYSERR; return GNUNET_OK; } @@ -1231,7 +1173,7 @@ blacklist_cb (void *cls, */ static void run (void *cls, - char * const *args, + char *const *args, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *mycfg) { @@ -1241,127 +1183,119 @@ run (void *cls, cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg; - counter = benchmark_send + benchmark_receive + iterate_connections - + monitor_connections + monitor_connects + do_disconnect + - monitor_plugins; + counter = benchmark_send + benchmark_receive + iterate_connections + + monitor_connections + monitor_connects + do_disconnect + + monitor_plugins; if (1 < counter) { - FPRINTF (stderr, - _("Multiple operations given. Please choose only one operation: %s, %s, %s, %s, %s, %s %s\n"), - "disconnect", - "benchmark send", - "benchmark receive", - "information", - "monitor", - "events", - "plugins"); + FPRINTF ( + stderr, + _ ( + "Multiple operations given. Please choose only one operation: %s, %s, %s, %s, %s, %s %s\n"), + "disconnect", + "benchmark send", + "benchmark receive", + "information", + "monitor", + "events", + "plugins"); return; } if (0 == counter) { - FPRINTF (stderr, - _("No operation given. Please choose one operation: %s, %s, %s, %s, %s, %s, %s\n"), - "disconnect", - "benchmark send", - "benchmark receive", - "information", - "monitor", - "events", - "plugins"); + FPRINTF ( + stderr, + _ ( + "No operation given. Please choose one operation: %s, %s, %s, %s, %s, %s, %s\n"), + "disconnect", + "benchmark send", + "benchmark receive", + "information", + "monitor", + "events", + "plugins"); return; } if (do_disconnect) /* -D: Disconnect from peer */ { - if (0 == memcmp (&zero_pid, - &pid, - sizeof (pid))) + if (0 == memcmp (&zero_pid, &pid, sizeof (pid))) { FPRINTF (stderr, - _("Option `%s' makes no sense without option `%s'.\n"), - "-D", "-p"); + _ ("Option `%s' makes no sense without option `%s'.\n"), + "-D", + "-p"); ret = 1; return; } - blacklist = GNUNET_TRANSPORT_blacklist (cfg, - &blacklist_cb, - NULL); + blacklist = GNUNET_TRANSPORT_blacklist (cfg, &blacklist_cb, NULL); if (NULL == blacklist) { FPRINTF (stderr, "%s", - _("Failed to connect to transport service for disconnection\n")); + _ ( + "Failed to connect to transport service for disconnection\n")); ret = 1; return; } FPRINTF (stdout, "%s", - _("Blacklisting request in place, stop with CTRL-C\n")); + _ ("Blacklisting request in place, stop with CTRL-C\n")); } else if (benchmark_send) /* -s: Benchmark sending */ { - if (0 == memcmp (&zero_pid, - &pid, - sizeof (pid))) + if (0 == memcmp (&zero_pid, &pid, sizeof (pid))) { FPRINTF (stderr, - _("Option `%s' makes no sense without option `%s'.\n"), - "-s", "-p"); + _ ("Option `%s' makes no sense without option `%s'.\n"), + "-s", + "-p"); ret = 1; return; } handle = GNUNET_TRANSPORT_core_connect (cfg, - NULL, - NULL, - NULL, - ¬ify_connect, - ¬ify_disconnect, - NULL); + NULL, + NULL, + NULL, + ¬ify_connect, + ¬ify_disconnect, + NULL); if (NULL == handle) { - FPRINTF (stderr, - "%s", - _("Failed to connect to transport service\n")); + FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n")); ret = 1; return; } start_time = GNUNET_TIME_absolute_get (); - op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, - &operation_timeout, - NULL); + op_timeout = + GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL); } else if (benchmark_receive) /* -b: Benchmark receiving */ { - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_var_size (dummy, - GNUNET_MESSAGE_TYPE_DUMMY, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_handler_end () - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_var_size (dummy, + GNUNET_MESSAGE_TYPE_DUMMY, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end ()}; handle = GNUNET_TRANSPORT_core_connect (cfg, - NULL, - handlers, - NULL, - NULL, - NULL, - NULL); + NULL, + handlers, + NULL, + NULL, + NULL, + NULL); if (NULL == handle) { - FPRINTF (stderr, - "%s", - _("Failed to connect to transport service\n")); + FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n")); ret = 1; return; } if (verbosity > 0) - FPRINTF (stdout, - "%s", - _("Starting to receive benchmark data\n")); + FPRINTF (stdout, "%s", _ ("Starting to receive benchmark data\n")); start_time = GNUNET_TIME_absolute_get (); - } else if (iterate_connections) /* -i: List information about peers once */ { @@ -1370,42 +1304,38 @@ run (void *cls, GNUNET_YES, &process_peer_iteration_cb, (void *) cfg); - op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, - &operation_timeout, - NULL); + op_timeout = + GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL); } - else if (monitor_connections) /* -m: List information about peers continuously */ + else if (monitor_connections) /* -m: List information about peers continuously + */ { - monitored_peers = GNUNET_CONTAINER_multipeermap_create (10, - GNUNET_NO); + monitored_peers = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); pic = GNUNET_TRANSPORT_monitor_peers (cfg, - &pid, + &pid, GNUNET_NO, &process_peer_monitoring_cb, NULL); } - else if (monitor_plugins) /* -P: List information about plugins continuously */ + else if (monitor_plugins) /* -P: List information about plugins continuously + */ { monitored_plugins = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); - pm = GNUNET_TRANSPORT_monitor_plugins (cfg, - &plugin_monitoring_cb, - NULL); + pm = GNUNET_TRANSPORT_monitor_plugins (cfg, &plugin_monitoring_cb, NULL); } else if (monitor_connects) /* -e : Monitor (dis)connect events continuously */ { monitor_connect_counter = 0; handle = GNUNET_TRANSPORT_core_connect (cfg, - NULL, - NULL, - NULL, - &monitor_notify_connect, - &monitor_notify_disconnect, - NULL); + NULL, + NULL, + NULL, + &monitor_notify_connect, + &monitor_notify_disconnect, + NULL); if (NULL == handle) { - FPRINTF (stderr, - "%s", - _("Failed to connect to transport service\n")); + FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n")); ret = 1; return; } @@ -1413,75 +1343,86 @@ run (void *cls, } else { - GNUNET_break(0); + GNUNET_break (0); return; } - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); } int -main (int argc, - char * const *argv) +main (int argc, char *const *argv) { int res; - struct GNUNET_GETOPT_CommandLineOption options[] = { - GNUNET_GETOPT_option_flag ('a', - "all", - gettext_noop ("print information for all peers (instead of only connected peers)"), - &iterate_all), - GNUNET_GETOPT_option_flag ('b', - "benchmark", - gettext_noop ("measure how fast we are receiving data from all peers (until CTRL-C)"), - &benchmark_receive), - GNUNET_GETOPT_option_flag ('D', - "disconnect", - gettext_noop ("disconnect from a peer"), - &do_disconnect), - GNUNET_GETOPT_option_flag ('i', - "information", - gettext_noop ("provide information about all current connections (once)"), - &iterate_connections), - GNUNET_GETOPT_option_flag ('m', - "monitor", - gettext_noop ("provide information about all current connections (continuously)"), - &monitor_connections), - GNUNET_GETOPT_option_flag ('e', - "events", - gettext_noop ("provide information about all connects and disconnect events (continuously)"), - &monitor_connects), - GNUNET_GETOPT_option_flag ('n', - "numeric", - gettext_noop ("do not resolve hostnames"), - &numeric), - GNUNET_GETOPT_option_base32_auto ('p', - "peer", - "PEER", - gettext_noop ("peer identity"), - &pid), - GNUNET_GETOPT_option_flag ('P', - "plugins", - gettext_noop ("monitor plugin sessions"), - &monitor_plugins), - GNUNET_GETOPT_option_flag ('s', - "send", - gettext_noop - ("send data for benchmarking to the other peer (until CTRL-C)"), - &benchmark_send), - GNUNET_GETOPT_option_verbose (&verbosity), - GNUNET_GETOPT_OPTION_END - }; + struct GNUNET_GETOPT_CommandLineOption options[] = + {GNUNET_GETOPT_option_flag ( + 'a', + "all", + gettext_noop ( + "print information for all peers (instead of only connected peers)"), + &iterate_all), + GNUNET_GETOPT_option_flag ( + 'b', + "benchmark", + gettext_noop ( + "measure how fast we are receiving data from all peers (until CTRL-C)"), + &benchmark_receive), + GNUNET_GETOPT_option_flag ('D', + "disconnect", + gettext_noop ("disconnect from a peer"), + &do_disconnect), + GNUNET_GETOPT_option_flag ( + 'i', + "information", + gettext_noop ( + "provide information about all current connections (once)"), + &iterate_connections), + GNUNET_GETOPT_option_flag ( + 'm', + "monitor", + gettext_noop ( + "provide information about all current connections (continuously)"), + &monitor_connections), + GNUNET_GETOPT_option_flag ( + 'e', + "events", + gettext_noop ( + "provide information about all connects and disconnect events (continuously)"), + &monitor_connects), + GNUNET_GETOPT_option_flag ('n', + "numeric", + gettext_noop ("do not resolve hostnames"), + &numeric), + GNUNET_GETOPT_option_base32_auto ('p', + "peer", + "PEER", + gettext_noop ("peer identity"), + &pid), + GNUNET_GETOPT_option_flag ('P', + "plugins", + gettext_noop ("monitor plugin sessions"), + &monitor_plugins), + GNUNET_GETOPT_option_flag ( + 's', + "send", + gettext_noop ( + "send data for benchmarking to the other peer (until CTRL-C)"), + &benchmark_send), + GNUNET_GETOPT_option_verbose (&verbosity), + GNUNET_GETOPT_OPTION_END}; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return 2; - res = GNUNET_PROGRAM_run (argc, argv, - "gnunet-transport", - gettext_noop ("Direct access to transport service."), - options, - &run, NULL); + res = + GNUNET_PROGRAM_run (argc, + argv, + "gnunet-transport", + gettext_noop ("Direct access to transport service."), + options, + &run, + NULL); GNUNET_free ((void *) argv); if (GNUNET_OK == res) return ret; diff --git a/src/transport/transport-testing.h b/src/transport/transport-testing.h index 4629d6125..83bbf277b 100644 --- a/src/transport/transport-testing.h +++ b/src/transport/transport-testing.h @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -30,7 +30,6 @@ #include "gnunet_util_lib.h" #include "gnunet_hello_lib.h" #include "gnunet_transport_service.h" -#include "gnunet_transport_core_service.h" #include "gnunet_transport_hello_service.h" #include "gnunet_transport_manipulation_service.h" #include "gnunet_testing_lib.h" @@ -143,7 +142,7 @@ struct GNUNET_TRANSPORT_TESTING_PeerContext * Closure for @e start_cb. */ void *start_cb_cls; - + /** * An unique number to identify the peer */ @@ -207,12 +206,12 @@ struct GNUNET_TRANSPORT_TESTING_ConnectRequest */ struct GNUNET_MQ_Handle *mq; - /** + /** * Set if peer1 says the connection is up to peer2. */ int p1_c; - /** + /** * Set if peer2 says the connection is up to peer1. */ int p2_c; @@ -289,15 +288,16 @@ GNUNET_TRANSPORT_TESTING_done (struct GNUNET_TRANSPORT_TESTING_Handle *tth); * @return the peer context */ struct GNUNET_TRANSPORT_TESTING_PeerContext * -GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth, - const char *cfgname, - int peer_id, - const struct GNUNET_MQ_MessageHandler *handlers, - GNUNET_TRANSPORT_NotifyConnect nc, - GNUNET_TRANSPORT_NotifyDisconnect nd, - void *cb_cls, - GNUNET_SCHEDULER_TaskCallback start_cb, - void *start_cb_cls); +GNUNET_TRANSPORT_TESTING_start_peer ( + struct GNUNET_TRANSPORT_TESTING_Handle *tth, + const char *cfgname, + int peer_id, + const struct GNUNET_MQ_MessageHandler *handlers, + GNUNET_TRANSPORT_NotifyConnect nc, + GNUNET_TRANSPORT_NotifyDisconnect nd, + void *cb_cls, + GNUNET_SCHEDULER_TaskCallback start_cb, + void *start_cb_cls); /** @@ -306,7 +306,8 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth * @param p the peer */ void -GNUNET_TRANSPORT_TESTING_stop_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext *pc); +GNUNET_TRANSPORT_TESTING_stop_peer ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *pc); /** @@ -318,10 +319,10 @@ GNUNET_TRANSPORT_TESTING_stop_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext * @return #GNUNET_OK in success otherwise #GNUNET_SYSERR */ int -GNUNET_TRANSPORT_TESTING_restart_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext *p, - GNUNET_SCHEDULER_TaskCallback restart_cb, - void *restart_cb_cls); - +GNUNET_TRANSPORT_TESTING_restart_peer ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *p, + GNUNET_SCHEDULER_TaskCallback restart_cb, + void *restart_cb_cls); /** @@ -331,15 +332,17 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct GNUNET_TRANSPORT_TESTING_PeerConte * * @param p1 peer 1 * @param p2 peer 2 - * @param cb the callback to call when both peers notified that they are connected + * @param cb the callback to call when both peers notified that they are + * connected * @param cls callback cls * @return a connect request handle */ struct GNUNET_TRANSPORT_TESTING_ConnectRequest * -GNUNET_TRANSPORT_TESTING_connect_peers (struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, - struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, - GNUNET_SCHEDULER_TaskCallback cb, - void *cls); +GNUNET_TRANSPORT_TESTING_connect_peers ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, + struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, + GNUNET_SCHEDULER_TaskCallback cb, + void *cls); /** @@ -350,7 +353,8 @@ GNUNET_TRANSPORT_TESTING_connect_peers (struct GNUNET_TRANSPORT_TESTING_PeerCont * @param cc a connect request handle */ void -GNUNET_TRANSPORT_TESTING_connect_peers_cancel (struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); +GNUNET_TRANSPORT_TESTING_connect_peers_cancel ( + struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); /** @@ -359,9 +363,9 @@ GNUNET_TRANSPORT_TESTING_connect_peers_cancel (struct GNUNET_TRANSPORT_TESTING_C * @param cls closure * @param cc request matching the query */ -typedef void -(*GNUNET_TRANSPORT_TESTING_ConnectContextCallback)(void *cls, - struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); +typedef void (*GNUNET_TRANSPORT_TESTING_ConnectContextCallback) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); /** @@ -369,14 +373,15 @@ typedef void * * @param p1 first peer * @param p2 second peer - * @param cb function to call + * @param cb function to call * @param cb_cls closure for @a cb */ void -GNUNET_TRANSPORT_TESTING_find_connecting_context (struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, - struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, - GNUNET_TRANSPORT_TESTING_ConnectContextCallback cb, - void *cb_cls); +GNUNET_TRANSPORT_TESTING_find_connecting_context ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, + struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, + GNUNET_TRANSPORT_TESTING_ConnectContextCallback cb, + void *cb_cls); /* ********************** high-level process functions *************** */ @@ -390,10 +395,10 @@ GNUNET_TRANSPORT_TESTING_find_connecting_context (struct GNUNET_TRANSPORT_TESTIN * @param num_peers size of the @a p array * @param p the peers that were launched */ -typedef void -(*GNUNET_TRANSPORT_TESTING_ConnectContinuation)(void *cls, - unsigned int num_peers, - struct GNUNET_TRANSPORT_TESTING_PeerContext *p[]); +typedef void (*GNUNET_TRANSPORT_TESTING_ConnectContinuation) ( + void *cls, + unsigned int num_peers, + struct GNUNET_TRANSPORT_TESTING_PeerContext *p[]); /** @@ -423,7 +428,6 @@ struct GNUNET_TRANSPORT_TESTING_TestMessage GNUNET_NETWORK_STRUCT_END - /** * Function called by the transport for each received message. * @@ -432,11 +436,11 @@ GNUNET_NETWORK_STRUCT_END * @param sender sender of the message * @param message the message */ -typedef void -(*GNUNET_TRANSPORT_TESTING_ReceiveCallback) (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_TRANSPORT_TESTING_TestMessage *message); +typedef void (*GNUNET_TRANSPORT_TESTING_ReceiveCallback) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_TRANSPORT_TESTING_TestMessage *message); /** @@ -447,10 +451,10 @@ typedef void * @param me peer experiencing the event * @param other peer that connected to @a me */ -typedef void -(*GNUNET_TRANSPORT_TESTING_NotifyConnect) (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *me, - const struct GNUNET_PeerIdentity *other); +typedef void (*GNUNET_TRANSPORT_TESTING_NotifyConnect) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *me, + const struct GNUNET_PeerIdentity *other); /** @@ -461,10 +465,10 @@ typedef void * @param me peer experiencing the event * @param other peer that disconnected from @a me */ -typedef void -(*GNUNET_TRANSPORT_TESTING_NotifyDisconnect) (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *me, - const struct GNUNET_PeerIdentity *other); +typedef void (*GNUNET_TRANSPORT_TESTING_NotifyDisconnect) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *me, + const struct GNUNET_PeerIdentity *other); /** @@ -593,7 +597,7 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext * message. */ uint32_t send_num_gen; - + /* ******* internal state, clients should not mess with this **** */ /** @@ -625,7 +629,6 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext * Array with @e num_peers entries. */ struct GNUNET_TRANSPORT_TESTING_InternalPeerContext *ip; - }; @@ -637,8 +640,9 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext * @return NULL if @a peer was not found */ struct GNUNET_TRANSPORT_TESTING_PeerContext * -GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext *ccc, - const struct GNUNET_PeerIdentity *peer); +GNUNET_TRANSPORT_TESTING_find_peer ( + struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext *ccc, + const struct GNUNET_PeerIdentity *peer); /** @@ -648,7 +652,8 @@ GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheck * abort the test, and a shutdown handler to clean up properly * on exit. * - * @param cls closure of type `struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext` + * @param cls closure of type `struct + * GNUNET_TRANSPORT_TESTING_ConnectCheckContext` * @param tth_ initialized testing handle * @param test_plugin_ name of the plugin * @param test_name_ name of the test @@ -657,12 +662,13 @@ GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheck * @return #GNUNET_SYSERR on error */ int -GNUNET_TRANSPORT_TESTING_connect_check (void *cls, - struct GNUNET_TRANSPORT_TESTING_Handle *tth_, - const char *test_plugin_, - const char *test_name_, - unsigned int num_peers, - char *cfg_files[]); +GNUNET_TRANSPORT_TESTING_connect_check ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_Handle *tth_, + const char *test_plugin_, + const char *test_name_, + unsigned int num_peers, + char *cfg_files[]); /** @@ -677,13 +683,13 @@ GNUNET_TRANSPORT_TESTING_connect_check (void *cls, * @param cfg_files array of names of configuration files for the peers * @return #GNUNET_SYSERR on error */ -typedef int -(*GNUNET_TRANSPORT_TESTING_CheckCallback)(void *cls, - struct GNUNET_TRANSPORT_TESTING_Handle *tth_, - const char *test_plugin_, - const char *test_name_, - unsigned int num_peers, - char *cfg_files[]); +typedef int (*GNUNET_TRANSPORT_TESTING_CheckCallback) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_Handle *tth_, + const char *test_plugin_, + const char *test_name_, + unsigned int num_peers, + char *cfg_files[]); /** @@ -712,8 +718,12 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0, * @param check_cls closure for @a check * @return #GNUNET_OK on success */ -#define GNUNET_TRANSPORT_TESTING_main(num_peers,check,check_cls) \ - GNUNET_TRANSPORT_TESTING_main_ (argv[0], __FILE__, num_peers, check, check_cls) +#define GNUNET_TRANSPORT_TESTING_main(num_peers, check, check_cls) \ + GNUNET_TRANSPORT_TESTING_main_ (argv[0], \ + __FILE__, \ + num_peers, \ + check, \ + check_cls) /* ***************** Convenience functions for sending ********* */ @@ -725,7 +735,8 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0, * @param sender the sending peer * @param receiver the receiving peer * @param mtype message type to use - * @param msize size of the message, at least `sizeof (struct GNUNET_TRANSPORT_TESTING_TestMessage)` + * @param msize size of the message, at least `sizeof (struct + * GNUNET_TRANSPORT_TESTING_TestMessage)` * @param num unique message number * @param cont continuation to call after transmission * @param cont_cls closure for @a cont @@ -734,13 +745,14 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0, * #GNUNET_SYSERR if @a msize is illegal */ int -GNUNET_TRANSPORT_TESTING_send (struct GNUNET_TRANSPORT_TESTING_PeerContext *sender, - struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, - uint16_t mtype, - uint16_t msize, - uint32_t num, - GNUNET_SCHEDULER_TaskCallback cont, - void *cont_cls); +GNUNET_TRANSPORT_TESTING_send ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *sender, + struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, + uint16_t mtype, + uint16_t msize, + uint32_t num, + GNUNET_SCHEDULER_TaskCallback cont, + void *cont_cls); /** @@ -771,14 +783,14 @@ struct GNUNET_TRANSPORT_TESTING_SendClosure * the message size, can be NULL in which case the message * size is the default. */ - size_t (*get_size_cb)(unsigned int n); - + size_t (*get_size_cb) (unsigned int n); + /** * Number of messages to be transmitted in a loop. * Use zero for "forever" (until external shutdown). */ unsigned int num_messages; - + /** * Function to call after all transmissions, can be NULL. */ @@ -788,12 +800,11 @@ struct GNUNET_TRANSPORT_TESTING_SendClosure * Closure for @e cont. */ void *cont_cls; - }; /** - * Task that sends a minimalistic test message from the + * Task that sends a minimalistic test message from the * first peer to the second peer. * * @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure` @@ -804,14 +815,14 @@ void GNUNET_TRANSPORT_TESTING_simple_send (void *cls); /** - * Size of a message sent with + * Size of a message sent with * #GNUNET_TRANSPORT_TESTING_large_send(). Big enough * to usually force defragmentation. */ #define GNUNET_TRANSPORT_TESTING_LARGE_MESSAGE_SIZE 2600 /** - * Task that sends a large test message from the + * Task that sends a large test message from the * first peer to the second peer. * * @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure` @@ -833,9 +844,10 @@ GNUNET_TRANSPORT_TESTING_large_send (void *cls); * @param other peer that connected. */ void -GNUNET_TRANSPORT_TESTING_log_connect (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *me, - const struct GNUNET_PeerIdentity *other); +GNUNET_TRANSPORT_TESTING_log_connect ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *me, + const struct GNUNET_PeerIdentity *other); /** @@ -846,10 +858,10 @@ GNUNET_TRANSPORT_TESTING_log_connect (void *cls, * @param other peer that disconnected. */ void -GNUNET_TRANSPORT_TESTING_log_disconnect (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *me, - const struct GNUNET_PeerIdentity *other); - +GNUNET_TRANSPORT_TESTING_log_disconnect ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *me, + const struct GNUNET_PeerIdentity *other); /* ********************** low-level filename functions *************** */ @@ -875,8 +887,7 @@ GNUNET_TRANSPORT_TESTING_get_test_name (const char *file); * @return configuration name to use */ char * -GNUNET_TRANSPORT_TESTING_get_config_name (const char *file, - int count); +GNUNET_TRANSPORT_TESTING_get_config_name (const char *file, int count); /** diff --git a/src/transport/transport.h b/src/transport/transport.h index d2a3a262b..ed89940cc 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -123,10 +123,21 @@ struct ConnectInfoMessage */ struct GNUNET_MessageHeader header; +#if (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \ + defined(GNUNET_TRANSPORT_CORE_VERSION)) + + /** + * Always zero, for alignment. + */ + uint32_t reserved GNUNET_PACKED; + +#else + /** * Current outbound quota for this peer */ struct GNUNET_BANDWIDTH_Value32NBO quota_out; +#endif /** * Identity of the new neighbour. @@ -163,6 +174,8 @@ struct DisconnectInfoMessage * Message used to set a particular bandwidth quota. Sent TO the * service to set an incoming quota, sent FROM the service to update * an outgoing quota. + * + * NOTE: no longer used in TNG! */ struct QuotaSetMessage { @@ -215,6 +228,13 @@ struct SendOkMessage */ struct GNUNET_MessageHeader header; +#if (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \ + defined(GNUNET_TRANSPORT_CORE_VERSION)) + + uint32_t reserved GNUNET_PACKED; + +#else + /** * #GNUNET_OK if the transmission succeeded, * #GNUNET_SYSERR if it failed (i.e. network disconnect); @@ -229,11 +249,13 @@ struct SendOkMessage uint16_t bytes_msg GNUNET_PACKED; /** - * Size of message sent over wire - * Includes plugin and protocol specific overhead + * Size of message sent over wire. + * Includes plugin and protocol specific overheads. */ uint32_t bytes_physical GNUNET_PACKED; +#endif + /** * Which peer can send more now? */ @@ -241,6 +263,32 @@ struct SendOkMessage }; +/** + * Message used to notify the transport API that it can + * send another message to the transport service. + * (Used to implement flow control.) + */ +struct RecvOkMessage +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK + */ + struct GNUNET_MessageHeader header; + + /** + * Number of messages by which to increase the window, greater or + * equal to one. + */ + uint32_t increase_window_delta GNUNET_PACKED; + + /** + * Which peer can CORE handle more from now? + */ + struct GNUNET_PeerIdentity peer; +}; + + /** * Message used to notify the transport service about a message * to be transmitted to another peer. The actual message follows. @@ -258,10 +306,14 @@ struct OutboundMessage */ uint32_t reserved GNUNET_PACKED; +#if ! (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \ + defined(GNUNET_TRANSPORT_CORE_VERSION)) + /** * Allowed delay. */ struct GNUNET_TIME_RelativeNBO timeout; +#endif /** * Which peer should receive the message? diff --git a/src/transport/transport_api2_core.c b/src/transport/transport_api2_core.c index f00d00a44..a3c49e94f 100644 --- a/src/transport/transport_api2_core.c +++ b/src/transport/transport_api2_core.c @@ -32,13 +32,23 @@ #include "gnunet_transport_core_service.h" #include "transport.h" -#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) +#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__) /** * How large to start with for the hashmap of neighbours. */ #define STARTING_NEIGHBOURS_SIZE 16 +/** + * Window size. How many messages to the same target do we pass + * to TRANSPORT without a SEND_OK in between? Small values limit + * thoughput, large values will increase latency. + * + * FIXME-OPTIMIZE: find out what good values are experimentally, + * maybe set adaptively (i.e. to observed available bandwidth). + */ +#define SEND_WINDOW_SIZE 4 + /** * Entry in hash table of all of our current (connected) neighbours. @@ -72,46 +82,27 @@ struct Neighbour void *handlers_cls; /** - * Entry in our readyness heap (which is sorted by @e next_ready - * value). NULL if there is no pending transmission request for - * this neighbour or if we're waiting for @e is_ready to become - * true AFTER the @e out_tracker suggested that this peer's quota - * has been satisfied (so once @e is_ready goes to #GNUNET_YES, - * we should immediately go back into the heap). + * How many messages can we still send to this peer before we should + * throttle? */ - struct GNUNET_CONTAINER_HeapNode *hn; + unsigned int ready_window; /** - * Task to trigger MQ when we have enough bandwidth for the - * next transmission. + * Used to indicate our status if @e env is non-NULL. Set to + * #GNUNET_YES if we did pass a message to the MQ and are waiting + * for the call to #notify_send_done(). Set to #GNUNET_NO if the @e + * ready_window is 0 and @e env is waiting for a + * #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK? */ - struct GNUNET_SCHEDULER_Task *timeout_task; - - /** - * Outbound bandwidh tracker. - */ - struct GNUNET_BANDWIDTH_Tracker out_tracker; - - /** - * Sending consumed more bytes on wire than payload was announced - * This overhead is added to the delay of next sending operation - */ - unsigned long long traffic_overhead; - - /** - * Is this peer currently ready to receive a message? - */ - int is_ready; + int16_t awaiting_done; /** * Size of the message in @e env. */ uint16_t env_size; - }; - /** * Handle for the transport service (includes all of the * state for the transport service). @@ -140,11 +131,6 @@ struct GNUNET_TRANSPORT_CoreHandle */ GNUNET_TRANSPORT_NotifyDisconnect nd_cb; - /** - * function to call on excess bandwidth events - */ - GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb; - /** * My client connection to the transport service. */ @@ -181,7 +167,6 @@ struct GNUNET_TRANSPORT_CoreHandle * (if #GNUNET_NO, then @e self is all zeros!). */ int check_self; - }; @@ -206,31 +191,7 @@ static struct Neighbour * neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multipeermap_get (h->neighbours, - peer); -} - - -/** - * Function called by the bandwidth tracker if we have excess - * bandwidth. - * - * @param cls the `struct Neighbour` that has excess bandwidth - */ -static void -notify_excess_cb (void *cls) -{ - struct Neighbour *n = cls; - struct GNUNET_TRANSPORT_CoreHandle *h = n->h; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Notifying CORE that more bandwidth is available for %s\n", - GNUNET_i2s (&n->id)); - - if (NULL != h->neb_cb) - h->neb_cb (h->cls, - &n->id, - n->handlers_cls); + return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer); } @@ -245,9 +206,7 @@ notify_excess_cb (void *cls) * #GNUNET_NO if not. */ static int -neighbour_delete (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value) { struct GNUNET_TRANSPORT_CoreHandle *handle = cls; struct Neighbour *n = value; @@ -255,16 +214,8 @@ neighbour_delete (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping entry for neighbour `%s'.\n", GNUNET_i2s (key)); - GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker); if (NULL != handle->nd_cb) - handle->nd_cb (handle->cls, - &n->id, - n->handlers_cls); - if (NULL != n->timeout_task) - { - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = NULL; - } + handle->nd_cb (handle->cls, &n->id, n->handlers_cls); if (NULL != n->env) { GNUNET_MQ_send_cancel (n->env); @@ -272,10 +223,9 @@ neighbour_delete (void *cls, } GNUNET_MQ_destroy (n->mq); GNUNET_assert (NULL == n->mq); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, - key, - n)); + GNUNET_assert ( + GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n)); GNUNET_free (n); return GNUNET_YES; } @@ -291,8 +241,7 @@ neighbour_delete (void *cls, * @param error error code */ static void -mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) +mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; @@ -306,57 +255,42 @@ mq_error_handler (void *cls, * A message from the handler's message queue to a neighbour was * transmitted. Now trigger (possibly delayed) notification of the * neighbour's message queue that we are done and thus ready for - * the next message. + * the next message. Note that the MQ being ready is independent + * of the send window, as we may queue many messages and simply + * not pass them to TRANSPORT if the send window is insufficient. * * @param cls the `struct Neighbour` where the message was sent */ static void -notify_send_done_fin (void *cls) +notify_send_done (void *cls) { struct Neighbour *n = cls; - n->timeout_task = NULL; - n->is_ready = GNUNET_YES; + n->awaiting_done = GNUNET_NO; + n->env = NULL; GNUNET_MQ_impl_send_continue (n->mq); } /** - * A message from the handler's message queue to a neighbour was - * transmitted. Now trigger (possibly delayed) notification of the - * neighbour's message queue that we are done and thus ready for - * the next message. + * We have an envelope waiting for transmission at @a n, and + * our transmission window is positive. Perform the transmission. * - * @param cls the `struct Neighbour` where the message was sent + * @param n neighbour to perform transmission for */ static void -notify_send_done (void *cls) +do_send (struct Neighbour *n) { - struct Neighbour *n = cls; - struct GNUNET_TIME_Relative delay; - - n->timeout_task = NULL; - if (NULL != n->env) - { - GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, - n->env_size + n->traffic_overhead); - n->env = NULL; - n->traffic_overhead = 0; - } - delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, - 128); - if (0 == delay.rel_value_us) - { - n->is_ready = GNUNET_YES; - GNUNET_MQ_impl_send_continue (n->mq); - return; - } - GNUNET_MQ_impl_send_in_flight (n->mq); - /* cannot send even a small message without violating - quota, wait a before allowing MQ to send next message */ - n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, - ¬ify_send_done_fin, - n); + GNUNET_assert (0 < n->ready_window); + GNUNET_assert (NULL != n->env); + n->ready_window--; + n->awaiting_done = GNUNET_YES; + GNUNET_MQ_notify_sent (n->env, ¬ify_send_done, n); + GNUNET_MQ_send (n->h->mq, n->env); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Passed message of type %u for neighbour `%s' to TRANSPORT.\n", + ntohs (GNUNET_MQ_env_get_msg (n->env)->type), + GNUNET_i2s (&n->id)); } @@ -376,11 +310,9 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; - struct GNUNET_TRANSPORT_CoreHandle *h = n->h; struct OutboundMessage *obm; uint16_t msize; - GNUNET_assert (GNUNET_YES == n->is_ready); msize = ntohs (msg->size); if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm)) { @@ -388,25 +320,24 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, GNUNET_MQ_impl_send_continue (mq); return; } - GNUNET_assert (NULL == n->env); - n->env = GNUNET_MQ_msg_nested_mh (obm, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, - msg); - obm->reserved = htonl (0); - obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ - obm->peer = n->id; - GNUNET_assert (NULL == n->timeout_task); - n->is_ready = GNUNET_NO; - n->env_size = ntohs (msg->size); - GNUNET_MQ_notify_sent (n->env, - ¬ify_send_done, - n); - GNUNET_MQ_send (h->mq, - n->env); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queued message of type %u for neighbour `%s'.\n", + "CORE requested transmission of message of type %u to neighbour `%s'.\n", ntohs (msg->type), GNUNET_i2s (&n->id)); + + GNUNET_assert (NULL == n->env); + n->env = + GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg); + n->env_size = ntohs (msg->size); + obm->reserved = htonl (0); + obm->peer = n->id; + if (0 == n->ready_window) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Flow control delays transmission to CORE until we see SEND_OK.\n"); + return; /* can't send yet, need to wait for SEND_OK */ + } + do_send (n); } @@ -418,8 +349,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, * @param impl_state state of the implementation */ static void -mq_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) +mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; @@ -436,19 +366,22 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq, * @param impl_state state specific to the implementation */ static void -mq_cancel_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) +mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; - GNUNET_assert (GNUNET_NO == n->is_ready); - if (NULL != n->env) + n->ready_window++; + if (GNUNET_YES == n->awaiting_done) { GNUNET_MQ_send_cancel (n->env); n->env = NULL; + n->awaiting_done = GNUNET_NO; + } + else + { + GNUNET_assert (0 == n->ready_window); + n->env = NULL; } - - n->is_ready = GNUNET_YES; } @@ -461,8 +394,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq, * @param error error code */ static void -peer_mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) +peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { /* struct Neighbour *n = cls; */ @@ -470,29 +402,6 @@ peer_mq_error_handler (void *cls, } -/** - * The outbound quota has changed in a way that may require - * us to reset the timeout. Update the timeout. - * - * @param cls the `struct Neighbour` for which the timeout changed - */ -static void -outbound_bw_tracker_update (void *cls) -{ - struct Neighbour *n = cls; - struct GNUNET_TIME_Relative delay; - - if (NULL == n->timeout_task) - return; - delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, - 128); - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, - ¬ify_send_done, - n); -} - - /** * Function we use for handling incoming connect messages. * @@ -500,18 +409,15 @@ outbound_bw_tracker_update (void *cls) * @param cim message received */ static void -handle_connect (void *cls, - const struct ConnectInfoMessage *cim) +handle_connect (void *cls, const struct ConnectInfoMessage *cim) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving CONNECT message for `%s' with quota %u\n", - GNUNET_i2s (&cim->id), - ntohl (cim->quota_out.value__)); - n = neighbour_find (h, - &cim->id); + "Receiving CONNECT message for `%s'\n", + GNUNET_i2s (&cim->id)); + n = neighbour_find (h, &cim->id); if (NULL != n) { GNUNET_break (0); @@ -521,23 +427,14 @@ handle_connect (void *cls, n = GNUNET_new (struct Neighbour); n->id = cim->id; n->h = h; - n->is_ready = GNUNET_YES; - n->traffic_overhead = 0; - GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker, - &outbound_bw_tracker_update, - n, - GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, - MAX_BANDWIDTH_CARRY_S, - ¬ify_excess_cb, - n); + n->ready_window = SEND_WINDOW_SIZE; GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (h->neighbours, - &n->id, - n, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_multipeermap_put ( + h->neighbours, + &n->id, + n, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - cim->quota_out); n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, &mq_destroy_impl, &mq_cancel_impl, @@ -547,11 +444,8 @@ handle_connect (void *cls, n); if (NULL != h->nc_cb) { - n->handlers_cls = h->nc_cb (h->cls, - &n->id, - n->mq); - GNUNET_MQ_set_handlers_closure (n->mq, - n->handlers_cls); + n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq); + GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls); } } @@ -563,8 +457,7 @@ handle_connect (void *cls, * @param dim message received */ static void -handle_disconnect (void *cls, - const struct DisconnectInfoMessage *dim) +handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -573,18 +466,14 @@ handle_disconnect (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving DISCONNECT message for `%s'.\n", GNUNET_i2s (&dim->peer)); - n = neighbour_find (h, - &dim->peer); + n = neighbour_find (h, &dim->peer); if (NULL == n) { GNUNET_break (0); disconnect_and_schedule_reconnect (h); return; } - GNUNET_assert (GNUNET_YES == - neighbour_delete (h, - &dim->peer, - n)); + GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n)); } @@ -595,24 +484,15 @@ handle_disconnect (void *cls, * @param okm message received */ static void -handle_send_ok (void *cls, - const struct SendOkMessage *okm) +handle_send_ok (void *cls, const struct SendOkMessage *okm) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; - uint16_t bytes_msg; - uint32_t bytes_physical; - bytes_msg = ntohs (okm->bytes_msg); - bytes_physical = ntohl (okm->bytes_physical); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving SEND_OK message, transmission to %s %s.\n", - GNUNET_i2s (&okm->peer), - (GNUNET_OK == ntohs (okm->success)) - ? "succeeded" - : "failed"); - n = neighbour_find (h, - &okm->peer); + "Receiving SEND_OK message for transmission to %s\n", + GNUNET_i2s (&okm->peer)); + n = neighbour_find (h, &okm->peer); if (NULL == n) { /* We should never get a 'SEND_OK' for a peer that we are not @@ -621,14 +501,9 @@ handle_send_ok (void *cls, disconnect_and_schedule_reconnect (h); return; } - if (bytes_physical > bytes_msg) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Overhead for %u byte message was %u\n", - (unsigned int) bytes_msg, - (unsigned int) (bytes_physical - bytes_msg)); - n->traffic_overhead += bytes_physical - bytes_msg; - } + n->ready_window++; + if ((NULL != n->env) && (1 == n->ready_window)) + do_send (n); } @@ -639,8 +514,7 @@ handle_send_ok (void *cls, * @param im message received */ static int -check_recv (void *cls, - const struct InboundMessage *im) +check_recv (void *cls, const struct InboundMessage *im) { const struct GNUNET_MessageHeader *imm; uint16_t size; @@ -668,12 +542,11 @@ check_recv (void *cls, * @param im message received */ static void -handle_recv (void *cls, - const struct InboundMessage *im) +handle_recv (void *cls, const struct InboundMessage *im) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; - const struct GNUNET_MessageHeader *imm - = (const struct GNUNET_MessageHeader *) &im[1]; + const struct GNUNET_MessageHeader *imm = + (const struct GNUNET_MessageHeader *) &im[1]; struct Neighbour *n; LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -681,46 +554,14 @@ handle_recv (void *cls, (unsigned int) ntohs (imm->type), (unsigned int) ntohs (imm->size), GNUNET_i2s (&im->peer)); - n = neighbour_find (h, - &im->peer); - if (NULL == n) - { - GNUNET_break (0); - disconnect_and_schedule_reconnect (h); - return; - } - GNUNET_MQ_inject_message (n->mq, - imm); -} - - -/** - * Function we use for handling incoming set quota messages. - * - * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` - * @param msg message received - */ -static void -handle_set_quota (void *cls, - const struct QuotaSetMessage *qm) -{ - struct GNUNET_TRANSPORT_CoreHandle *h = cls; - struct Neighbour *n; - - n = neighbour_find (h, - &qm->peer); + n = neighbour_find (h, &im->peer); if (NULL == n) { GNUNET_break (0); disconnect_and_schedule_reconnect (h); return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving SET_QUOTA message for `%s' with quota %u\n", - GNUNET_i2s (&qm->peer), - (unsigned int) ntohl (qm->quota.value__)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - qm->quota); + GNUNET_MQ_inject_message (n->mq, imm); } @@ -733,46 +574,36 @@ static void reconnect (void *cls) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_fixed_size (connect, - GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, - struct ConnectInfoMessage, - h), - GNUNET_MQ_hd_fixed_size (disconnect, - GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, - struct DisconnectInfoMessage, - h), - GNUNET_MQ_hd_fixed_size (send_ok, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, - struct SendOkMessage, - h), - GNUNET_MQ_hd_var_size (recv, - GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, - struct InboundMessage, - h), - GNUNET_MQ_hd_fixed_size (set_quota, - GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, - struct QuotaSetMessage, - h), - GNUNET_MQ_handler_end () - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_fixed_size (connect, + GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, + struct ConnectInfoMessage, + h), + GNUNET_MQ_hd_fixed_size (disconnect, + GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, + struct DisconnectInfoMessage, + h), + GNUNET_MQ_hd_fixed_size (send_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, + struct SendOkMessage, + h), + GNUNET_MQ_hd_var_size (recv, + GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, + struct InboundMessage, + h), + GNUNET_MQ_handler_end ()}; struct GNUNET_MQ_Envelope *env; struct StartMessage *s; uint32_t options; h->reconnect_task = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to transport service.\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); GNUNET_assert (NULL == h->mq); - h->mq = GNUNET_CLIENT_connect (h->cfg, - "transport", - handlers, - &mq_error_handler, - h); + h->mq = + GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h); if (NULL == h->mq) return; - env = GNUNET_MQ_msg (s, - GNUNET_MESSAGE_TYPE_TRANSPORT_START); + env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START); options = 0; if (h->check_self) options |= 1; @@ -780,8 +611,7 @@ reconnect (void *cls) options |= 2; s->options = htonl (options); s->self = h->self; - GNUNET_MQ_send (h->mq, - env); + GNUNET_MQ_send (h->mq, env); } @@ -793,9 +623,7 @@ reconnect (void *cls) static void disconnect (struct GNUNET_TRANSPORT_CoreHandle *h) { - GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, - &neighbour_delete, - h); + GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h); if (NULL != h->mq) { GNUNET_MQ_destroy (h->mq); @@ -817,12 +645,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h) disconnect (h); LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduling task to reconnect to transport service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, - GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, - &reconnect, - h); + GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); } @@ -840,14 +665,52 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, { struct Neighbour *n; - n = neighbour_find (handle, - peer); + n = neighbour_find (handle, peer); if (NULL == n) return NULL; return n->mq; } +/** + * Notification from the CORE service to the TRANSPORT service + * that the CORE service has finished processing a message from + * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect()) + * and that it is thus now OK for TRANSPORT to send more messages + * for @a pid. + * + * Used to provide flow control, this is our equivalent to + * #GNUNET_SERVICE_client_continue() of an ordinary service. + * + * Note that due to the use of a window, TRANSPORT may send multiple + * messages destined for the same peer even without an intermediate + * call to this function. However, CORE must still call this function + * once per message received, as otherwise eventually the window will + * be full and TRANSPORT will stop providing messages to CORE for @a + * pid. + * + * @param ch core handle + * @param pid which peer was the message from that was fully processed by CORE + */ +void +GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch, + const struct GNUNET_PeerIdentity *pid) +{ + struct GNUNET_MQ_Envelope *env; + struct RecvOkMessage *rok; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message for %s finished CORE processing, sending RECV_OK.\n", + GNUNET_i2s (pid)); + if (NULL == ch->mq) + return; + env = GNUNET_MQ_msg (rok, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK); + rok->increase_window_delta = htonl (1); + rok->peer = *pid; + GNUNET_MQ_send (ch->mq, env); +} + + /** * Connect to the transport service. Note that the connection may * complete (or fail) asynchronously. @@ -859,17 +722,15 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, * @param rec receive function to call * @param nc function to call on connect events * @param nd function to call on disconnect events - * @param neb function to call if we have excess bandwidth to a peer * @return NULL on error */ struct GNUNET_TRANSPORT_CoreHandle * GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_PeerIdentity *self, - const struct GNUNET_MQ_MessageHandler *handlers, - void *cls, - GNUNET_TRANSPORT_NotifyConnect nc, - GNUNET_TRANSPORT_NotifyDisconnect nd, - GNUNET_TRANSPORT_NotifyExcessBandwidth neb) + const struct GNUNET_PeerIdentity *self, + const struct GNUNET_MQ_MessageHandler *handlers, + void *cls, + GNUNET_TRANSPORT_NotifyConnect nc, + GNUNET_TRANSPORT_NotifyDisconnect nd) { struct GNUNET_TRANSPORT_CoreHandle *h; unsigned int i; @@ -884,19 +745,17 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, h->cls = cls; h->nc_cb = nc; h->nd_cb = nd; - h->neb_cb = neb; h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; if (NULL != handlers) { - for (i=0;NULL != handlers[i].cb; i++) ; - h->handlers = GNUNET_new_array (i + 1, - struct GNUNET_MQ_MessageHandler); + for (i = 0; NULL != handlers[i].cb; i++) + ; + h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler); GNUNET_memcpy (h->handlers, - handlers, - i * sizeof (struct GNUNET_MQ_MessageHandler)); + handlers, + i * sizeof (struct GNUNET_MQ_MessageHandler)); } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to transport service\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n"); reconnect (h); if (NULL == h->mq) { @@ -905,8 +764,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, return NULL; } h->neighbours = - GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, - GNUNET_YES); + GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES); return h; } @@ -914,13 +772,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Disconnect from the transport service. * - * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() + * @param handle handle to the service as returned from + * #GNUNET_TRANSPORT_core_connect() */ void GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transport disconnect called!\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); /* this disconnects all neighbours... */ disconnect (handle); /* and now we stop trying to connect again... */ diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c index e86499173..a163d7ccf 100644 --- a/src/transport/transport_api_core.c +++ b/src/transport/transport_api_core.c @@ -29,11 +29,10 @@ #include "gnunet_arm_service.h" #include "gnunet_hello_lib.h" #include "gnunet_protocols.h" -#include "gnunet_transport_core_service.h" #include "gnunet_transport_service.h" #include "transport.h" -#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) +#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__) /** * If we could not send any payload to a peer for this amount of @@ -113,11 +112,9 @@ struct Neighbour * Size of the message in @e env. */ uint16_t env_size; - }; - /** * Handle for the transport service (includes all of the * state for the transport service). @@ -187,7 +184,6 @@ struct GNUNET_TRANSPORT_CoreHandle * (if #GNUNET_NO, then @e self is all zeros!). */ int check_self; - }; @@ -212,8 +208,7 @@ static struct Neighbour * neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multipeermap_get (h->neighbours, - peer); + return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer); } @@ -234,9 +229,7 @@ notify_excess_cb (void *cls) GNUNET_i2s (&n->id)); if (NULL != h->neb_cb) - h->neb_cb (h->cls, - &n->id, - n->handlers_cls); + h->neb_cb (h->cls, &n->id, n->handlers_cls); } @@ -251,9 +244,7 @@ notify_excess_cb (void *cls) * #GNUNET_NO if not. */ static int -neighbour_delete (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value) { struct GNUNET_TRANSPORT_CoreHandle *handle = cls; struct Neighbour *n = value; @@ -263,9 +254,7 @@ neighbour_delete (void *cls, GNUNET_i2s (key)); GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker); if (NULL != handle->nd_cb) - handle->nd_cb (handle->cls, - &n->id, - n->handlers_cls); + handle->nd_cb (handle->cls, &n->id, n->handlers_cls); if (NULL != n->timeout_task) { GNUNET_SCHEDULER_cancel (n->timeout_task); @@ -278,10 +267,9 @@ neighbour_delete (void *cls, } GNUNET_MQ_destroy (n->mq); GNUNET_assert (NULL == n->mq); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, - key, - n)); + GNUNET_assert ( + GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n)); GNUNET_free (n); return GNUNET_YES; } @@ -297,8 +285,7 @@ neighbour_delete (void *cls, * @param error error code */ static void -mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) +mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; @@ -317,14 +304,12 @@ mq_error_handler (void *cls, * @return #GNUNET_OK if message is well-formed */ static int -check_hello (void *cls, - const struct GNUNET_MessageHeader *msg) +check_hello (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_PeerIdentity me; if (GNUNET_OK != - GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, - &me)) + GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, &me)) { GNUNET_break (0); return GNUNET_SYSERR; @@ -340,8 +325,7 @@ check_hello (void *cls, * @param msg message received */ static void -handle_hello (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_hello (void *cls, const struct GNUNET_MessageHeader *msg) { /* we do not care => FIXME: signal in options to NEVER send HELLOs! */ } @@ -388,8 +372,7 @@ notify_send_done (void *cls) n->env = NULL; n->traffic_overhead = 0; } - delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, - 128); + delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128); if (0 == delay.rel_value_us) { n->is_ready = GNUNET_YES; @@ -399,9 +382,8 @@ notify_send_done (void *cls) GNUNET_MQ_impl_send_in_flight (n->mq); /* cannot send even a small message without violating quota, wait a before allowing MQ to send next message */ - n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, - ¬ify_send_done_fin, - n); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (delay, ¬ify_send_done_fin, n); } @@ -434,20 +416,17 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, return; } GNUNET_assert (NULL == n->env); - n->env = GNUNET_MQ_msg_nested_mh (obm, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, - msg); + n->env = + GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg); obm->reserved = htonl (0); - obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ + obm->timeout = GNUNET_TIME_relative_hton ( + GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ obm->peer = n->id; GNUNET_assert (NULL == n->timeout_task); n->is_ready = GNUNET_NO; n->env_size = ntohs (msg->size); - GNUNET_MQ_notify_sent (n->env, - ¬ify_send_done, - n); - GNUNET_MQ_send (h->mq, - n->env); + GNUNET_MQ_notify_sent (n->env, ¬ify_send_done, n); + GNUNET_MQ_send (h->mq, n->env); LOG (GNUNET_ERROR_TYPE_DEBUG, "Queued message of type %u for neighbour `%s'.\n", ntohs (msg->type), @@ -463,8 +442,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, * @param impl_state state of the implementation */ static void -mq_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) +mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; @@ -481,8 +459,7 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq, * @param impl_state state specific to the implementation */ static void -mq_cancel_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) +mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; @@ -506,8 +483,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq, * @param error error code */ static void -peer_mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) +peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { /* struct Neighbour *n = cls; */ @@ -529,12 +505,9 @@ outbound_bw_tracker_update (void *cls) if (NULL == n->timeout_task) return; - delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, - 128); + delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128); GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, - ¬ify_send_done, - n); + n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, ¬ify_send_done, n); } @@ -545,8 +518,7 @@ outbound_bw_tracker_update (void *cls) * @param cim message received */ static void -handle_connect (void *cls, - const struct ConnectInfoMessage *cim) +handle_connect (void *cls, const struct ConnectInfoMessage *cim) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -555,8 +527,7 @@ handle_connect (void *cls, "Receiving CONNECT message for `%s' with quota %u\n", GNUNET_i2s (&cim->id), ntohl (cim->quota_out.value__)); - n = neighbour_find (h, - &cim->id); + n = neighbour_find (h, &cim->id); if (NULL != n) { GNUNET_break (0); /* FIXME: this assertion seems to fail sometimes!? */ @@ -576,13 +547,13 @@ handle_connect (void *cls, ¬ify_excess_cb, n); GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (h->neighbours, - &n->id, - n, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_multipeermap_put ( + h->neighbours, + &n->id, + n, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - cim->quota_out); + GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, cim->quota_out); n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, &mq_destroy_impl, &mq_cancel_impl, @@ -592,11 +563,8 @@ handle_connect (void *cls, n); if (NULL != h->nc_cb) { - n->handlers_cls = h->nc_cb (h->cls, - &n->id, - n->mq); - GNUNET_MQ_set_handlers_closure (n->mq, - n->handlers_cls); + n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq); + GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls); } } @@ -608,8 +576,7 @@ handle_connect (void *cls, * @param dim message received */ static void -handle_disconnect (void *cls, - const struct DisconnectInfoMessage *dim) +handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -625,10 +592,7 @@ handle_disconnect (void *cls, disconnect_and_schedule_reconnect (h); return; } - GNUNET_assert (GNUNET_YES == - neighbour_delete (h, - &dim->peer, - n)); + GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n)); } @@ -639,8 +603,7 @@ handle_disconnect (void *cls, * @param okm message received */ static void -handle_send_ok (void *cls, - const struct SendOkMessage *okm) +handle_send_ok (void *cls, const struct SendOkMessage *okm) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -653,8 +616,7 @@ handle_send_ok (void *cls, "Receiving SEND_OK message, transmission to %s %s.\n", GNUNET_i2s (&okm->peer), ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); - n = neighbour_find (h, - &okm->peer); + n = neighbour_find (h, &okm->peer); if (NULL == n) { /* We should never get a 'SEND_OK' for a peer that we are not @@ -681,8 +643,7 @@ handle_send_ok (void *cls, * @param im message received */ static int -check_recv (void *cls, - const struct InboundMessage *im) +check_recv (void *cls, const struct InboundMessage *im) { const struct GNUNET_MessageHeader *imm; uint16_t size; @@ -710,12 +671,11 @@ check_recv (void *cls, * @param im message received */ static void -handle_recv (void *cls, - const struct InboundMessage *im) +handle_recv (void *cls, const struct InboundMessage *im) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; - const struct GNUNET_MessageHeader *imm - = (const struct GNUNET_MessageHeader *) &im[1]; + const struct GNUNET_MessageHeader *imm = + (const struct GNUNET_MessageHeader *) &im[1]; struct Neighbour *n; LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -730,8 +690,7 @@ handle_recv (void *cls, disconnect_and_schedule_reconnect (h); return; } - GNUNET_MQ_inject_message (n->mq, - imm); + GNUNET_MQ_inject_message (n->mq, imm); } @@ -742,8 +701,7 @@ handle_recv (void *cls, * @param msg message received */ static void -handle_set_quota (void *cls, - const struct QuotaSetMessage *qm) +handle_set_quota (void *cls, const struct QuotaSetMessage *qm) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -752,16 +710,15 @@ handle_set_quota (void *cls, "Receiving SET_QUOTA message for `%s' with quota %u\n", GNUNET_i2s (&qm->peer), ntohl (qm->quota.value__)); - n = neighbour_find (h, - &qm->peer); + n = neighbour_find (h, &qm->peer); if (NULL == n) { - GNUNET_break (0); /* FIXME: julius reports this assertion fails sometimes? */ + GNUNET_break ( + 0); /* FIXME: julius reports this assertion fails sometimes? */ disconnect_and_schedule_reconnect (h); return; } - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - qm->quota); + GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, qm->quota); } @@ -774,50 +731,44 @@ static void reconnect (void *cls) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_var_size (hello, - GNUNET_MESSAGE_TYPE_HELLO, - struct GNUNET_MessageHeader, - h), - GNUNET_MQ_hd_fixed_size (connect, - GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, - struct ConnectInfoMessage, - h), - GNUNET_MQ_hd_fixed_size (disconnect, - GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, - struct DisconnectInfoMessage, - h), - GNUNET_MQ_hd_fixed_size (send_ok, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, - struct SendOkMessage, - h), - GNUNET_MQ_hd_var_size (recv, - GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, - struct InboundMessage, - h), - GNUNET_MQ_hd_fixed_size (set_quota, - GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, - struct QuotaSetMessage, - h), - GNUNET_MQ_handler_end () - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_var_size (hello, + GNUNET_MESSAGE_TYPE_HELLO, + struct GNUNET_MessageHeader, + h), + GNUNET_MQ_hd_fixed_size (connect, + GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, + struct ConnectInfoMessage, + h), + GNUNET_MQ_hd_fixed_size (disconnect, + GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, + struct DisconnectInfoMessage, + h), + GNUNET_MQ_hd_fixed_size (send_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, + struct SendOkMessage, + h), + GNUNET_MQ_hd_var_size (recv, + GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, + struct InboundMessage, + h), + GNUNET_MQ_hd_fixed_size (set_quota, + GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, + struct QuotaSetMessage, + h), + GNUNET_MQ_handler_end ()}; struct GNUNET_MQ_Envelope *env; struct StartMessage *s; uint32_t options; h->reconnect_task = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to transport service.\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); GNUNET_assert (NULL == h->mq); - h->mq = GNUNET_CLIENT_connect (h->cfg, - "transport", - handlers, - &mq_error_handler, - h); + h->mq = + GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h); if (NULL == h->mq) return; - env = GNUNET_MQ_msg (s, - GNUNET_MESSAGE_TYPE_TRANSPORT_START); + env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START); options = 0; if (h->check_self) options |= 1; @@ -825,8 +776,7 @@ reconnect (void *cls) options |= 2; s->options = htonl (options); s->self = h->self; - GNUNET_MQ_send (h->mq, - env); + GNUNET_MQ_send (h->mq, env); } @@ -841,9 +791,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h) { GNUNET_assert (NULL == h->reconnect_task); /* Forget about all neighbours that we used to be connected to */ - GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, - &neighbour_delete, - h); + GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h); if (NULL != h->mq) { GNUNET_MQ_destroy (h->mq); @@ -851,12 +799,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h) } LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduling task to reconnect to transport service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, - GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, - &reconnect, - h); + GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); } @@ -874,8 +819,7 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, { struct Neighbour *n; - n = neighbour_find (handle, - peer); + n = neighbour_find (handle, peer); if (NULL == n) return NULL; return n->mq; @@ -898,12 +842,12 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, */ struct GNUNET_TRANSPORT_CoreHandle * GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_PeerIdentity *self, - const struct GNUNET_MQ_MessageHandler *handlers, - void *cls, - GNUNET_TRANSPORT_NotifyConnect nc, - GNUNET_TRANSPORT_NotifyDisconnect nd, - GNUNET_TRANSPORT_NotifyExcessBandwidth neb) + const struct GNUNET_PeerIdentity *self, + const struct GNUNET_MQ_MessageHandler *handlers, + void *cls, + GNUNET_TRANSPORT_NotifyConnect nc, + GNUNET_TRANSPORT_NotifyDisconnect nd, + GNUNET_TRANSPORT_NotifyExcessBandwidth neb) { struct GNUNET_TRANSPORT_CoreHandle *h; unsigned int i; @@ -922,15 +866,14 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; if (NULL != handlers) { - for (i=0;NULL != handlers[i].cb; i++) ; - h->handlers = GNUNET_new_array (i + 1, - struct GNUNET_MQ_MessageHandler); + for (i = 0; NULL != handlers[i].cb; i++) + ; + h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler); GNUNET_memcpy (h->handlers, - handlers, - i * sizeof (struct GNUNET_MQ_MessageHandler)); + handlers, + i * sizeof (struct GNUNET_MQ_MessageHandler)); } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to transport service\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n"); reconnect (h); if (NULL == h->mq) { @@ -939,8 +882,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, return NULL; } h->neighbours = - GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, - GNUNET_YES); + GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES); return h; } @@ -948,13 +890,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Disconnect from the transport service. * - * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() + * @param handle handle to the service as returned from + * #GNUNET_TRANSPORT_core_connect() */ void GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transport disconnect called!\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); /* this disconnects all neighbours... */ if (NULL == handle->reconnect_task) disconnect_and_schedule_reconnect (handle); -- 2.25.1