From 6daf13eaa64b5b041edce219f30ab8dcfe38cdf5 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 30 Jan 2014 19:22:23 +0000 Subject: [PATCH] -towards fixing #3295 (core traffic prioritization) --- src/core/core.h | 2 +- src/core/core_api.c | 11 +- src/core/gnunet-service-core_clients.c | 22 +++- src/core/gnunet-service-core_sessions.c | 149 ++++++++++++++++++------ src/core/gnunet-service-core_sessions.h | 5 +- src/fs/gnunet-service-fs_cp.c | 2 +- src/include/gnunet_core_service.h | 19 +-- 7 files changed, 152 insertions(+), 58 deletions(-) diff --git a/src/core/core.h b/src/core/core.h index 17014d6ec..1c6e0bc72 100644 --- a/src/core/core.h +++ b/src/core/core.h @@ -286,7 +286,7 @@ struct SendMessage /** * Always 0. */ - uint64_t reserved GNUNET_PACKED; + uint32_t reserved GNUNET_PACKED; }; diff --git a/src/core/core_api.c b/src/core/core_api.c index 56bd29df3..7818a60a3 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c @@ -700,7 +700,8 @@ transmit_message (void *cls, size_t size, void *buf) * @param ignore_currently_down transmit message even if not initialized? */ static void -trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down) +trigger_next_request (struct GNUNET_CORE_Handle *h, + int ignore_currently_down) { uint16_t msize; @@ -742,7 +743,8 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down) * @param msg the message received from the core service */ static void -main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) +main_notify_handler (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_CORE_Handle *h = cls; const struct InitReplyMessage *m; @@ -1265,7 +1267,7 @@ run_request_next_transmission (void *cls, * @param handle connection to core service * @param cork is corking allowed for this transmission? * @param priority how important is the message? - * @param maxdelay how long can the message wait? + * @param maxdelay how long can the message wait? Only effective if @a cork is #GNUNET_YES * @param target who should receive the message, never NULL (can be this peer's identity for loopback) * @param notify_size how many bytes of buffer space does @a notify want? * @param notify function to call when buffer space is available; @@ -1278,7 +1280,8 @@ run_request_next_transmission (void *cls, * if NULL is returned, @a notify will NOT be called. */ struct GNUNET_CORE_TransmitHandle * -GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork, +GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, + int cork, enum GNUNET_CORE_Priority priority, struct GNUNET_TIME_Relative maxdelay, const struct GNUNET_PeerIdentity *target, diff --git a/src/core/gnunet-service-core_clients.c b/src/core/gnunet-service-core_clients.c index f153ac397..fa87f82e6 100644 --- a/src/core/gnunet-service-core_clients.c +++ b/src/core/gnunet-service-core_clients.c @@ -418,6 +418,11 @@ struct TokenizerContext */ struct GSC_ClientActiveRequest *car; + /** + * How important is this message. + */ + enum GNUNET_CORE_Priority priority; + /** * Is corking allowed (set only once we have the real message). */ @@ -454,7 +459,7 @@ handle_client_send (void *cls, struct GNUNET_SERVER_Client *client, msize -= sizeof (struct SendMessage); GNUNET_break (0 == ntohl (sm->reserved)); c = find_client (client); - if (c == NULL) + if (NULL == c) { /* client did not send INIT first! */ GNUNET_break (0); @@ -482,10 +487,13 @@ handle_client_send (void *cls, struct GNUNET_SERVER_Client *client, &sm->peer, tc.car)); tc.cork = ntohl (sm->cork); + tc.priority = (enum GNUNET_CORE_Priority) ntohl (sm->priority); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client asked for transmission of %u bytes to `%s' %s\n", msize, + "Client asked for transmission of %u bytes to `%s' %s\n", + msize, GNUNET_i2s (&sm->peer), tc.cork ? "now" : ""); - GNUNET_SERVER_mst_receive (client_mst, &tc, (const char *) &sm[1], msize, + GNUNET_SERVER_mst_receive (client_mst, &tc, + (const char *) &sm[1], msize, GNUNET_YES, GNUNET_NO); if (0 != memcmp (&tc.car->target, &GSC_my_identity, @@ -541,7 +549,8 @@ client_tokenizer_callback (void *cls, void *client, else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Delivering message of type %u to %s\n", ntohs (message->type), + "Delivering message of type %u to %s\n", + ntohs (message->type), GNUNET_i2s (&car->target)); GSC_CLIENTS_deliver_message (&car->target, message, ntohs (message->size), @@ -549,7 +558,10 @@ client_tokenizer_callback (void *cls, void *client, GSC_CLIENTS_deliver_message (&car->target, message, sizeof (struct GNUNET_MessageHeader), GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND); - GSC_SESSIONS_transmit (car, message, tc->cork); + GSC_SESSIONS_transmit (car, + message, + tc->cork, + tc->priority); } return GNUNET_OK; } diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c index eb42d6fbc..080bbf88b 100644 --- a/src/core/gnunet-service-core_sessions.c +++ b/src/core/gnunet-service-core_sessions.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009-2013 Christian Grothoff (and other contributing authors) + (C) 2009-2014 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -74,6 +74,11 @@ struct SessionMessageEntry */ size_t size; + /** + * How important is this message. + */ + enum GNUNET_CORE_Priority priority; + }; @@ -145,7 +150,7 @@ struct Session /** - * Map of peer identities to 'struct Session'. + * Map of peer identities to `struct Session`. */ static struct GNUNET_CONTAINER_MultiPeerMap *sessions; @@ -180,7 +185,8 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) session = find_session (pid); if (NULL == session) return; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying session for peer `%4s'\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying session for peer `%4s'\n", GNUNET_i2s (&session->peer)); if (GNUNET_SCHEDULER_NO_TASK != session->cork_task) { @@ -218,11 +224,12 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) * Transmit our current typemap message to the other peer. * (Done periodically in case an update got lost). * - * @param cls the 'struct Session*' + * @param cls the `struct Session *` * @param tc unused */ static void -transmit_typemap_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +transmit_typemap_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Session *session = cls; struct GNUNET_MessageHeader *hdr; @@ -263,7 +270,8 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, { struct Session *session; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating session for peer `%4s'\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating session for peer `%4s'\n", GNUNET_i2s (peer)); session = GNUNET_new (struct Session); session->tmap = GSC_TYPEMAP_create (); @@ -286,13 +294,14 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, /** * Notify the given client about the session (client is new). * - * @param cls the 'struct GSC_Client' + * @param cls the `struct GSC_Client` * @param key peer identity - * @param value the 'struct Session' - * @return GNUNET_OK (continue to iterate) + * @param value the `struct Session` + * @return #GNUNET_OK (continue to iterate) */ static int -notify_client_about_session (void *cls, const struct GNUNET_PeerIdentity * key, +notify_client_about_session (void *cls, + const struct GNUNET_PeerIdentity *key, void *value) { struct GSC_Client *client = cls; @@ -334,8 +343,8 @@ try_transmission (struct Session *session); * * @param car request to queue; this handle is then shared between * the caller (CLIENTS subsystem) and SESSIONS and must not - * be released by either until either 'GNUNET_SESSIONS_dequeue', - * 'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed' + * be released by either until either #GSC_SESSIONS_dequeue(), + * #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed() * have been invoked on it */ void @@ -344,7 +353,7 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) struct Session *session; session = find_session (&car->target); - if (session == NULL) + if (NULL == session) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropped client request for transmission (am disconnected)\n"); @@ -423,27 +432,40 @@ discard_expired_requests (struct Session *session) /** - * Solicit messages for transmission. + * Solicit messages for transmission, starting with those of the highest + * priority. * * @param session session to solict messages for + * @param msize how many bytes do we have already */ static void -solicit_messages (struct Session *session) +solicit_messages (struct Session *session, + size_t msize) { struct GSC_ClientActiveRequest *car; struct GSC_ClientActiveRequest *nxt; size_t so_size; + enum GNUNET_CORE_Priority pmax; discard_expired_requests (session); - so_size = 0; + so_size = msize; + pmax = GNUNET_CORE_PRIO_BACKGROUND; + for (car = session->active_client_request_head; NULL != car; car = car->next) + { + if (GNUNET_YES == car->was_solicited) + continue; + pmax = GNUNET_MAX (pmax, car->priority); + } nxt = session->active_client_request_head; while (NULL != (car = nxt)) { nxt = car->next; + if (car->priority < pmax) + continue; if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) break; so_size += car->msize; - if (car->was_solicited == GNUNET_YES) + if (GNUNET_YES == car->was_solicited) continue; car->was_solicited = GNUNET_YES; GSC_CLIENTS_solicit_request (car); @@ -455,11 +477,12 @@ solicit_messages (struct Session *session) * Some messages were delayed (corked), but the timeout has now expired. * Send them now. * - * @param cls 'struct Session' with the messages to transmit now + * @param cls `struct Session` with the messages to transmit now * @param tc scheduler context (unused) */ static void -pop_cork_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +pop_cork_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Session *session = cls; @@ -470,7 +493,8 @@ pop_cork_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** * Try to perform a transmission on the given session. Will solicit - * additional messages if the 'sme' queue is not full enough. + * additional messages if the 'sme' queue is not full enough or has + * only low-priority messages. * * @param session session to transmit messages from */ @@ -481,33 +505,58 @@ try_transmission (struct Session *session) size_t msize; struct GNUNET_TIME_Absolute now; struct GNUNET_TIME_Absolute min_deadline; + enum GNUNET_CORE_Priority maxp; + enum GNUNET_CORE_Priority maxpc; + struct GSC_ClientActiveRequest *car; if (GNUNET_YES != session->ready_to_transmit) return; msize = 0; min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; /* check 'ready' messages */ + maxp = GNUNET_CORE_PRIO_BACKGROUND; pos = session->sme_head; while ((NULL != pos) && (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)) { GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); msize += pos->size; + maxp = GNUNET_MAX (maxp, pos->priority); min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline); pos = pos->next; } + if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL) + { + maxpc = GNUNET_CORE_PRIO_BACKGROUND; + for (car = session->active_client_request_head; NULL != car; car = car->next) + { + if (GNUNET_YES == car->was_solicited) + continue; + maxpc = GNUNET_MAX (maxpc, car->priority); + } + if (maxpc > maxp) + { + /* we have messages waiting for solicitation that have a higher + priority than those that we already accepted; solicit the + high-priority messages first */ + solicit_messages (session, 0); + return; + } + } + now = GNUNET_TIME_absolute_get (); - if ((msize == 0) || + if ((0 == msize) || ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) && (min_deadline.abs_value_us > now.abs_value_us))) { /* not enough ready yet, try to solicit more */ - solicit_messages (session); + solicit_messages (session, + msize); if (msize > 0) { /* if there is data to send, just not yet, make sure we do transmit * it once the deadline is reached */ - if (session->cork_task != GNUNET_SCHEDULER_NO_TASK) + if (GNUNET_SCHEDULER_NO_TASK != session->cork_task) GNUNET_SCHEDULER_cancel (session->cork_task); session->cork_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining @@ -540,7 +589,8 @@ try_transmission (struct Session *session) total_msgs = 1; total_bytes = used; } - GNUNET_STATISTICS_set (GSC_stats, "# avg payload per encrypted message", + GNUNET_STATISTICS_set (GSC_stats, + "# avg payload per encrypted message", total_bytes / total_msgs, GNUNET_NO); /* now actually transmit... */ session->ready_to_transmit = GNUNET_NO; @@ -554,11 +604,13 @@ try_transmission (struct Session *session) * * @param cls the message * @param key neighbour's identity - * @param value 'struct Neighbour' of the target - * @return always GNUNET_OK + * @param value `struct Neighbour` of the target + * @return always #GNUNET_OK */ static int -do_send_message (void *cls, const struct GNUNET_PeerIdentity * key, void *value) +do_send_message (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { const struct GNUNET_MessageHeader *hdr = cls; struct Session *session = value; @@ -569,7 +621,8 @@ do_send_message (void *cls, const struct GNUNET_PeerIdentity * key, void *value) m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); memcpy (&m[1], hdr, size); m->size = size; - GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, m); + m->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, m); try_transmission (session); return GNUNET_OK; } @@ -585,7 +638,8 @@ GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg) { if (NULL == sessions) return; - GNUNET_CONTAINER_multipeermap_iterate (sessions, &do_send_message, + GNUNET_CONTAINER_multipeermap_iterate (sessions, + &do_send_message, (void *) msg); } @@ -617,13 +671,17 @@ GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid) * this handle will now be 'owned' by the SESSIONS subsystem * @param msg message to transmit * @param cork is corking allowed? + * @param priority how important is this message */ void GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg, int cork) + const struct GNUNET_MessageHeader *msg, + int cork, + enum GNUNET_CORE_Priority priority) { struct Session *session; struct SessionMessageEntry *sme; + struct SessionMessageEntry *pos; size_t msize; session = find_session (&car->target); @@ -633,10 +691,23 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize); memcpy (&sme[1], msg, msize); sme->size = msize; + sme->priority = priority; if (GNUNET_YES == cork) sme->deadline = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); - GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, sme); + pos = session->sme_head; + while ( (NULL != pos) && + (pos->priority > sme->priority) ) + pos = pos->next; + if (NULL == pos) + GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, + session->sme_tail, + sme); + else + GNUNET_CONTAINER_DLL_insert_after (session->sme_head, + session->sme_tail, + pos->prev, + sme); try_transmission (session); } @@ -644,14 +715,16 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, /** * Helper function for GSC_SESSIONS_handle_client_iterate_peers. * - * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies + * @param cls the `struct GNUNET_SERVER_TransmitContext` to queue replies * @param key identity of the connected peer - * @param value the 'struct Neighbour' for the peer + * @param value the `struct Neighbour` for the peer * @return GNUNET_OK (continue to iterate) */ #include "core.h" static int -queue_connect_message (void *cls, const struct GNUNET_PeerIdentity * key, void *value) +queue_connect_message (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { struct GNUNET_SERVER_TransmitContext *tc = cls; struct Session *session = value; @@ -768,11 +841,13 @@ GSC_SESSIONS_init () * * @param cls NULL * @param key identity of the connected peer - * @param value the 'struct Session' for the peer - * @return GNUNET_OK (continue to iterate) + * @param value the `struct Session` for the peer + * @return #GNUNET_OK (continue to iterate) */ static int -free_session_helper (void *cls, const struct GNUNET_PeerIdentity * key, void *value) +free_session_helper (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { struct Session *session = value; diff --git a/src/core/gnunet-service-core_sessions.h b/src/core/gnunet-service-core_sessions.h index d578e3d72..cc88548ec 100644 --- a/src/core/gnunet-service-core_sessions.h +++ b/src/core/gnunet-service-core_sessions.h @@ -92,10 +92,13 @@ GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car); * ownership does not change (dequeue will be called soon). * @param msg message to transmit * @param cork is corking allowed? + * @param priority how important is this message */ void GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg, int cork); + const struct GNUNET_MessageHeader *msg, + int cork, + enum GNUNET_CORE_Priority priority); /** diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 82b1f5867..087376218 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -268,7 +268,7 @@ struct GSF_ConnectedPeer /** * Set to 1 if we're currently in the process of calling - * #GNUNET_CORE_notify_transmit_ready() (so while cth is + * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is * NULL, we should not call notify_transmit_ready for this * handle right now). */ diff --git a/src/include/gnunet_core_service.h b/src/include/gnunet_core_service.h index 6c68a6833..b0248323a 100644 --- a/src/include/gnunet_core_service.h +++ b/src/include/gnunet_core_service.h @@ -53,24 +53,25 @@ enum GNUNET_CORE_Priority { /** - * Highest priority, control traffic (i.e. NSE, Core/Mesh KX). + * Lowest priority, i.e. background traffic (i.e. fs) */ - GNUNET_CORE_PRIO_CRITICAL_CONTROL = 0, + GNUNET_CORE_PRIO_BACKGROUND = 0, /** - * Urgent traffic (local peer, i.e. conversation). + * Normal traffic (i.e. mesh/dv relay, DHT) */ - GNUNET_CORE_PRIO_URGENT = 1, + GNUNET_CORE_PRIO_BEST_EFFORT = 1, /** - * Normal traffic (i.e. mesh/dv relay, DHT) + * Urgent traffic (local peer, i.e. conversation). */ - GNUNET_CORE_PRIO_BEST_EFFORT = 2, + GNUNET_CORE_PRIO_URGENT = 2, /** - * Background traffic (i.e. fs) + * Highest priority, control traffic (i.e. NSE, Core/Mesh KX). */ - GNUNET_CORE_PRIO_BACKGROUND = 3 + GNUNET_CORE_PRIO_CRITICAL_CONTROL = 3 + }; @@ -250,7 +251,7 @@ struct GNUNET_CORE_TransmitHandle; * @param handle connection to core service * @param cork is corking allowed for this transmission? * @param priority how important is the message? - * @param maxdelay how long can the message wait? + * @param maxdelay how long can the message wait? Only effective if @a cork is #GNUNET_YES * @param target who should receive the message, never NULL (can be this peer's identity for loopback) * @param notify_size how many bytes of buffer space does notify want? * @param notify function to call when buffer space is available; -- 2.25.1