From 51ac8f0d836806a499bdb8988d80380a78e736de Mon Sep 17 00:00:00 2001 From: t3sserakt Date: Tue, 25 Jun 2019 18:06:44 +0200 Subject: [PATCH] added per message GNUNET_MQ_PriorityPreferences --- src/cadet/gnunet-service-cadet_core.c | 39 ++++++++++--------- src/cadet/gnunet-service-cadet_tunnels.c | 3 -- src/fs/gnunet-service-fs_cadet_client.c | 4 +- src/include/gnunet_mq_lib.h | 17 +++++++- src/pt/gnunet-daemon-pt.c | 1 - .../gnunet-service-scalarproduct-ecc_alice.c | 5 ++- .../gnunet-service-scalarproduct_alice.c | 5 ++- src/set/gnunet-service-set.c | 3 +- src/vpn/gnunet-service-vpn.c | 3 +- 9 files changed, 51 insertions(+), 29 deletions(-) diff --git a/src/cadet/gnunet-service-cadet_core.c b/src/cadet/gnunet-service-cadet_core.c index c5d2f195a..220a2b3cd 100644 --- a/src/cadet/gnunet-service-cadet_core.c +++ b/src/cadet/gnunet-service-cadet_core.c @@ -39,7 +39,6 @@ #include "gnunet_statistics_service.h" #include "cadet_protocol.h" - #define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__) /** @@ -47,7 +46,6 @@ */ struct RouteDirection; - /** * Set of CadetRoutes that have exactly the same number of messages * in their buffer. Used so we can efficiently find all of those @@ -346,7 +344,8 @@ discard_all_from_rung_tail () static void route_message (struct CadetPeer *prev, const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, - const struct GNUNET_MessageHeader *msg) + const struct GNUNET_MessageHeader *msg, + const enum GNUNET_MQ_PriorityPreferences priority) { struct CadetRoute *route; struct RouteDirection *dir; @@ -399,7 +398,7 @@ route_message (struct CadetPeer *prev, } /* Check if buffering is disallowed, and if so, make sure we only queue one message per direction. */ - if ( (0 != (route->options & GNUNET_CADET_OPTION_NOBUFFER)) && + if ( (0 != (priority & GNUNET_MQ_PREF_NO_BUFFER)) && (NULL != dir->env_head) ) discard_buffer (dir, dir->env_head); @@ -793,9 +792,7 @@ handle_connection_create (void *cls, uint16_t size = ntohs (msg->header.size) - sizeof (*msg); unsigned int path_length; unsigned int off; - enum GNUNET_CADET_ChannelOption options; - options = (enum GNUNET_CADET_ChannelOption) ntohl (msg->options); path_length = size / sizeof (struct GNUNET_PeerIdentity); if (0 == path_length) { @@ -869,7 +866,8 @@ handle_connection_create (void *cls, GNUNET_sh2s (&msg->cid.connection_of_tunnel)); route_message (sender, &msg->cid, - &msg->header); + &msg->header, + GNUNET_MQ_PRIO_CRITICAL_CONTROL); return; } if (off == path_length - 1) @@ -901,7 +899,6 @@ handle_connection_create (void *cls, GCT_add_inbound_connection (GCP_get_tunnel (origin, GNUNET_YES), &msg->cid, - (enum GNUNET_CADET_ChannelOption) ntohl (msg->options), path)) { /* Send back BROKEN: duplicate connection on the same path, @@ -956,7 +953,6 @@ handle_connection_create (void *cls, GNUNET_i2s (&pids[off + 1]), off + 1); route = GNUNET_new (struct CadetRoute); - route->options = options; route->cid = msg->cid; route->last_use = GNUNET_TIME_absolute_get (); dir_init (&route->prev, @@ -985,7 +981,8 @@ handle_connection_create (void *cls, /* also pass CREATE message along to next hop */ route_message (sender, &msg->cid, - &msg->header); + &msg->header, + GNUNET_MQ_PRIO_CRITICAL_CONTROL); } @@ -1029,7 +1026,8 @@ handle_connection_create_ack (void *cls, /* We're just an intermediary peer, route the message along its path */ route_message (peer, &msg->cid, - &msg->header); + &msg->header, + GNUNET_MQ_PRIO_CRITICAL_CONTROL); } @@ -1077,7 +1075,8 @@ handle_connection_broken (void *cls, /* We're just an intermediary peer, route the message along its path */ route_message (peer, &msg->cid, - &msg->header); + &msg->header, + GNUNET_MQ_PREF_NO_BUFFER); route = get_route (&msg->cid); if (NULL != route) destroy_route (route); @@ -1130,7 +1129,8 @@ handle_connection_destroy (void *cls, GNUNET_sh2s (&msg->cid.connection_of_tunnel)); route_message (peer, &msg->cid, - &msg->header); + &msg->header, + GNUNET_MQ_PREF_NO_BUFFER); route = get_route (&msg->cid); if (NULL != route) destroy_route (route); @@ -1181,7 +1181,8 @@ handle_tunnel_kx (void *cls, /* We're just an intermediary peer, route the message along its path */ route_message (peer, &msg->cid, - &msg->header); + &msg->header, + GNUNET_MQ_PRIO_CRITICAL_CONTROL); } @@ -1223,7 +1224,8 @@ handle_tunnel_kx_auth (void *cls, /* We're just an intermediary peer, route the message along its path */ route_message (peer, &msg->kx.cid, - &msg->kx.header); + &msg->kx.header, + GNUNET_MQ_PRIO_CRITICAL_CONTROL); } @@ -1280,7 +1282,8 @@ handle_tunnel_encrypted (void *cls, /* We're just an intermediary peer, route the message along its path */ route_message (peer, &msg->cid, - &msg->header); + &msg->header, + GNUNET_MQ_PRIO_CRITICAL_CONTROL); } @@ -1401,13 +1404,13 @@ GCO_init (const struct GNUNET_CONFIGURATION_Handle *c) "CADET", "MAX_ROUTES", &max_routes)) - max_routes = 5000; + max_routes = 5000; if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c, "CADET", "MAX_MSGS_QUEUE", &max_buffers)) - max_buffers = 10000; + max_buffers = 10000; routes = GNUNET_CONTAINER_multishortmap_create (1024, GNUNET_NO); route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); diff --git a/src/cadet/gnunet-service-cadet_tunnels.c b/src/cadet/gnunet-service-cadet_tunnels.c index a2a493ebd..11be2bce0 100644 --- a/src/cadet/gnunet-service-cadet_tunnels.c +++ b/src/cadet/gnunet-service-cadet_tunnels.c @@ -2731,7 +2731,6 @@ consider_path_cb (void *cls, ct->cc = GCC_create (t->destination, path, off, - GNUNET_CADET_OPTION_DEFAULT, /* FIXME: set based on what channels want/need! */ ct, &connection_ready_cb, ct); @@ -3204,7 +3203,6 @@ GCT_create_tunnel (struct CadetPeer *destination) int GCT_add_inbound_connection (struct CadetTunnel *t, const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, - enum GNUNET_CADET_ChannelOption options, struct CadetPeerPath *path) { struct CadetTConnection *ct; @@ -3214,7 +3212,6 @@ GCT_add_inbound_connection (struct CadetTunnel *t, ct->t = t; ct->cc = GCC_create_inbound (t->destination, path, - options, ct, cid, &connection_ready_cb, diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c index 9ba250dfa..9f654849d 100644 --- a/src/fs/gnunet-service-fs_cadet_client.c +++ b/src/fs/gnunet-service-fs_cadet_client.c @@ -494,7 +494,6 @@ reset_cadet (struct CadetHandle *mh) mh, &mh->target, &port, - GNUNET_CADET_OPTION_RELIABLE, &window_change_cb, &disconnect_cb, handlers); @@ -572,6 +571,8 @@ transmit_pending (void *cls) GNUNET_i2s (&mh->target)); env = GNUNET_MQ_msg (sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); + GNUNET_MQ_env_set_options(env, + GNUNET_MQ_PREF_RELIABLE); sqm->type = htonl (sr->type); sqm->query = sr->query; GNUNET_MQ_notify_sent (env, @@ -634,7 +635,6 @@ get_cadet (const struct GNUNET_PeerIdentity *target) mh, &mh->target, &port, - GNUNET_CADET_OPTION_RELIABLE, &window_change_cb, &disconnect_cb, handlers); diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h index 81100d7d8..d3f4c9717 100644 --- a/src/include/gnunet_mq_lib.h +++ b/src/include/gnunet_mq_lib.h @@ -296,7 +296,22 @@ enum GNUNET_MQ_PriorityPreferences /** * Flag to indicate that out-of-order delivery is OK. */ - GNUNET_MQ_PREF_OUT_OF_ORDER = 256 + GNUNET_MQ_PREF_OUT_OF_ORDER = 256, + + /** + * Flag to indicate no buffering. + */ + GNUNET_MQ_PREF_NO_BUFFER = 512, + + /** + * Flag to indicate default + */ + GNUNET_MQ_PREF_DEFAULT = 1024, + + /** + * Flag to indicate reliable + */ + GNUNET_MQ_PREF_RELIABLE = 2048 }; diff --git a/src/pt/gnunet-daemon-pt.c b/src/pt/gnunet-daemon-pt.c index cd17c2b7b..99cbfebf8 100644 --- a/src/pt/gnunet-daemon-pt.c +++ b/src/pt/gnunet-daemon-pt.c @@ -1088,7 +1088,6 @@ try_open_exit () pos, &pos->peer, &port, - GNUNET_CADET_OPTION_DEFAULT, &channel_idle_notify_cb, &cadet_channel_end_cb, cadet_handlers); diff --git a/src/scalarproduct/gnunet-service-scalarproduct-ecc_alice.c b/src/scalarproduct/gnunet-service-scalarproduct-ecc_alice.c index 4b1a09e50..123a02188 100644 --- a/src/scalarproduct/gnunet-service-scalarproduct-ecc_alice.c +++ b/src/scalarproduct/gnunet-service-scalarproduct-ecc_alice.c @@ -634,6 +634,8 @@ send_alices_cryptodata_message (struct AliceServiceSession *s) e = GNUNET_MQ_msg_extra (msg, todo_count * 2 * sizeof (struct GNUNET_CRYPTO_EccPoint), GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ECC_ALICE_CRYPTODATA); + GNUNET_MQ_env_set_options(e, + GNUNET_MQ_PREF_RELIABLE); msg->contained_element_count = htonl (todo_count); payload = (struct GNUNET_CRYPTO_EccPoint *) &msg[1]; r_ia = gcry_mpi_new (0); @@ -837,7 +839,6 @@ client_request_complete_alice (struct AliceServiceSession *s) s, &s->peer, &s->session_id, - GNUNET_CADET_OPTION_RELIABLE, NULL, &cb_channel_destruction, cadet_handlers); @@ -865,6 +866,8 @@ client_request_complete_alice (struct AliceServiceSession *s) e = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ECC_SESSION_INITIALIZATION); + GNUNET_MQ_env_set_options(e, + GNUNET_MQ_PREF_RELIABLE); msg->session_id = s->session_id; GNUNET_MQ_send (s->cadet_mq, e); diff --git a/src/scalarproduct/gnunet-service-scalarproduct_alice.c b/src/scalarproduct/gnunet-service-scalarproduct_alice.c index 393a1951b..18c5cc49a 100644 --- a/src/scalarproduct/gnunet-service-scalarproduct_alice.c +++ b/src/scalarproduct/gnunet-service-scalarproduct_alice.c @@ -893,6 +893,8 @@ send_alices_cryptodata_message (struct AliceServiceSession *s) e = GNUNET_MQ_msg_extra (msg, todo_count * sizeof (struct GNUNET_CRYPTO_PaillierCiphertext), GNUNET_MESSAGE_TYPE_SCALARPRODUCT_ALICE_CRYPTODATA); + GNUNET_MQ_env_set_options(e, + GNUNET_MQ_PREF_RELIABLE); msg->contained_element_count = htonl (todo_count); payload = (struct GNUNET_CRYPTO_PaillierCiphertext *) &msg[1]; a = gcry_mpi_new (0); @@ -1073,7 +1075,6 @@ client_request_complete_alice (struct AliceServiceSession *s) s, &s->peer, &s->session_id, - GNUNET_CADET_OPTION_RELIABLE, NULL, &cb_channel_destruction, cadet_handlers); @@ -1101,6 +1102,8 @@ client_request_complete_alice (struct AliceServiceSession *s) e = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SESSION_INITIALIZATION); + GNUNET_MQ_env_set_options(e, + GNUNET_MQ_PREF_RELIABLE); msg->session_id = s->session_id; msg->public_key = my_pubkey; GNUNET_MQ_send (s->cadet_mq, diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 71f74594f..f1d94bef7 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -741,6 +741,8 @@ handle_incoming_msg (void *cls, env = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, op->context_msg); + GNUNET_MQ_env_set_options(env, + GNUNET_MQ_PREF_RELIABLE); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Suggesting incoming request with accept id %u to listener %p of client %p\n", op->suggest_id, @@ -1628,7 +1630,6 @@ handle_client_evaluate (void *cls, op, &msg->target_peer, &msg->app_id, - GNUNET_CADET_OPTION_RELIABLE, &channel_window_cb, &channel_end_cb, cadet_handlers); diff --git a/src/vpn/gnunet-service-vpn.c b/src/vpn/gnunet-service-vpn.c index 91bc13fd8..7bd26798e 100644 --- a/src/vpn/gnunet-service-vpn.c +++ b/src/vpn/gnunet-service-vpn.c @@ -589,6 +589,8 @@ send_to_channel (struct ChannelState *ts, GNUNET_assert (NULL != ts->channel); mq = GNUNET_CADET_get_mq (ts->channel); + GNUNET_MQ_env_set_options(env, + GNUNET_MQ_PREF_DEFAULT); GNUNET_MQ_send (mq, env); if (GNUNET_MQ_get_length (mq) > MAX_MESSAGE_QUEUE_SIZE) @@ -1388,7 +1390,6 @@ create_channel (struct ChannelState *ts, ts, target, port, - GNUNET_CADET_OPTION_DEFAULT, NULL, &channel_cleaner, cadet_handlers); -- 2.25.1