X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcore%2Fgnunet-service-core_sessions.c;h=973ef9c2faaa7d5592efa9c6df1e4ce478aed866;hb=4b766fd267ca83a8faa4e22353d5942074d6f2b7;hp=a168eeed3f67af8f6a5fdee0437be4e2a99ab0d4;hpb=f1f603c7d0b3f03dca46a4f313472288eb080eb1;p=oweals%2Fgnunet.git diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c index a168eeed3..973ef9c2f 100644 --- a/src/core/gnunet-service-core_sessions.c +++ b/src/core/gnunet-service-core_sessions.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009-2014 Christian Grothoff (and other contributing authors) + Copyright (C) 2009-2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -25,15 +25,20 @@ */ #include "platform.h" #include "gnunet-service-core.h" -#include "gnunet-service-core_neighbours.h" #include "gnunet-service-core_kx.h" #include "gnunet-service-core_typemap.h" #include "gnunet-service-core_sessions.h" -#include "gnunet-service-core_clients.h" #include "gnunet_constants.h" #include "core.h" +/** + * How many encrypted messages do we queue at most? + * Needed to bound memory consumption. + */ +#define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4 + + /** * Message ready for encryption. This struct is followed by the * actual content of the message. @@ -51,6 +56,21 @@ struct SessionMessageEntry */ struct SessionMessageEntry *prev; + /** + * How important is this message. + */ + enum GNUNET_CORE_Priority priority; + + /** + * Flag set to #GNUNET_YES if this is a typemap message. + */ + int is_typemap; + + /** + * Flag set to #GNUNET_YES if this is a typemap confirmation message. + */ + int is_typemap_confirm; + /** * Deadline for transmission, 1s after we received it (if we * are not corking), otherwise "now". Note that this message @@ -65,11 +85,6 @@ struct SessionMessageEntry */ size_t size; - /** - * How important is this message. - */ - enum GNUNET_CORE_Priority priority; - }; @@ -81,7 +96,12 @@ struct Session /** * Identity of the other peer. */ - struct GNUNET_PeerIdentity peer; + const struct GNUNET_PeerIdentity *peer; + + /** + * Key exchange state for this peer. + */ + struct GSC_KeyExchangeInfo *kx; /** * Head of list of requests from clients for transmission to @@ -105,11 +125,6 @@ struct Session */ struct SessionMessageEntry *sme_tail; - /** - * Information about the key exchange with the other peer. - */ - struct GSC_KeyExchangeInfo *kxinfo; - /** * Current type map for this peer. */ @@ -118,12 +133,12 @@ struct Session /** * Task to transmit corked messages with a delay. */ - struct GNUNET_SCHEDULER_Task * cork_task; + struct GNUNET_SCHEDULER_Task *cork_task; /** * Task to transmit our type map. */ - struct GNUNET_SCHEDULER_Task * typemap_task; + struct GNUNET_SCHEDULER_Task *typemap_task; /** * Retransmission delay we currently use for the typemap @@ -131,12 +146,6 @@ struct Session */ struct GNUNET_TIME_Relative typemap_delay; - /** - * Is the neighbour queue empty and thus ready for us - * to transmit an encrypted message? - */ - int ready_to_transmit; - /** * Is this the first time we're sending the typemap? If so, * we want to send it a bit faster the second time. 0 if @@ -190,7 +199,10 @@ static struct GNUNET_CONTAINER_MultiPeerMap *sessions; static struct Session * find_session (const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multipeermap_get (sessions, peer); + if (NULL == sessions) + return NULL; + return GNUNET_CONTAINER_multipeermap_get (sessions, + peer); } @@ -211,8 +223,8 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) if (NULL == session) return; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Destroying session for peer `%4s'\n", - GNUNET_i2s (&session->peer)); + "Destroying session for peer `%s'\n", + GNUNET_i2s (session->peer)); if (NULL != session->cork_task) { GNUNET_SCHEDULER_cancel (session->cork_task); @@ -222,7 +234,8 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) { GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, session->active_client_request_tail, car); - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_NO); } while (NULL != (sme = session->sme_head)) { @@ -236,13 +249,15 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) GNUNET_SCHEDULER_cancel (session->typemap_task); session->typemap_task = NULL; } - GSC_CLIENTS_notify_clients_about_neighbour (&session->peer, - session->tmap, NULL); + GSC_CLIENTS_notify_clients_about_neighbour (session->peer, + session->tmap, + NULL); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (sessions, - &session->peer, + session->peer, session)); - GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"), + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), GNUNET_CONTAINER_multipeermap_size (sessions), GNUNET_NO); GSC_TYPEMAP_destroy (session->tmap); @@ -256,29 +271,35 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) * (Done periodically until the typemap is confirmed). * * @param cls the `struct Session *` - * @param tc unused */ static void -transmit_typemap_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +transmit_typemap_task (void *cls) { struct Session *session = cls; struct GNUNET_MessageHeader *hdr; struct GNUNET_TIME_Relative delay; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending TYPEMAP to %s\n", + GNUNET_i2s (session->peer)); session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay); delay = session->typemap_delay; /* randomize a bit to avoid spont. sync */ delay.rel_value_us += - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000); + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + 1000 * 1000); session->typemap_task = GNUNET_SCHEDULER_add_delayed (delay, - &transmit_typemap_task, session); + &transmit_typemap_task, + session); GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# type map refreshes sent"), 1, + gettext_noop ("# type map refreshes sent"), + 1, GNUNET_NO); hdr = GSC_TYPEMAP_compute_type_map_message (); - GSC_KX_encrypt_and_transmit (session->kxinfo, hdr, ntohs (hdr->size)); + GSC_KX_encrypt_and_transmit (session->kx, + hdr, + ntohs (hdr->size)); GNUNET_free (hdr); } @@ -314,18 +335,19 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, struct Session *session; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating session for peer `%4s'\n", + "Creating session for peer `%s'\n", GNUNET_i2s (peer)); session = GNUNET_new (struct Session); session->tmap = GSC_TYPEMAP_create (); - session->peer = *peer; - session->kxinfo = kx; + session->peer = peer; + session->kx = kx; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put (sessions, - &session->peer, + session->peer, session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"), + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), GNUNET_CONTAINER_multipeermap_size (sessions), GNUNET_NO); GSC_CLIENTS_notify_clients_about_neighbour (peer, @@ -393,8 +415,14 @@ GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer, gettext_noop ("# outdated typemap confirmations received"), 1, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got outdated typemap confirmated from peer `%s'\n", + GNUNET_i2s (session->peer)); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got typemap confirmation from peer `%s'\n", + GNUNET_i2s (session->peer)); if (NULL != session->typemap_task) { GNUNET_SCHEDULER_cancel (session->typemap_task); @@ -424,7 +452,7 @@ notify_client_about_session (void *cls, struct Session *session = value; GSC_CLIENTS_notify_client_about_neighbour (client, - &session->peer, + session->peer, NULL, /* old TMAP: none */ session->tmap); return GNUNET_OK; @@ -476,19 +504,22 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropped client request for transmission (am disconnected)\n"); GNUNET_break (0); /* should have been rejected earlier */ - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_NO); return; } if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { GNUNET_break (0); - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_YES); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received client transmission request. queueing\n"); - GNUNET_CONTAINER_DLL_insert (session->active_client_request_head, - session->active_client_request_tail, car); + GNUNET_CONTAINER_DLL_insert_tail (session->active_client_request_head, + session->active_client_request_tail, + car); try_transmission (session); } @@ -505,7 +536,8 @@ GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) struct Session *session; if (0 == - memcmp (&car->target, &GSC_my_identity, + memcmp (&car->target, + &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity))) return; session = find_session (&car->target); @@ -513,41 +545,10 @@ GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, session->active_client_request_tail, car); -} - - -/** - * Discard all expired active transmission requests from clients. - * - * @param session session to clean up - */ -static void -discard_expired_requests (struct Session *session) -{ - struct GSC_ClientActiveRequest *pos; - struct GSC_ClientActiveRequest *nxt; - struct GNUNET_TIME_Absolute now; - - now = GNUNET_TIME_absolute_get (); - pos = NULL; - nxt = session->active_client_request_head; - while (NULL != nxt) - { - pos = nxt; - nxt = pos->next; - if ( (pos->deadline.abs_value_us < now.abs_value_us) && - (GNUNET_YES != pos->was_solicited) ) - { - GNUNET_STATISTICS_update (GSC_stats, - gettext_noop - ("# messages discarded (expired prior to transmission)"), - 1, GNUNET_NO); - GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, - session->active_client_request_tail, - pos); - GSC_CLIENTS_reject_request (pos); - } - } + /* dequeueing of 'high' priority messages may unblock + transmission for lower-priority messages, so we also + need to try in this case. */ + try_transmission (session); } @@ -567,7 +568,6 @@ solicit_messages (struct Session *session, size_t so_size; enum GNUNET_CORE_Priority pmax; - discard_expired_requests (session); so_size = msize; pmax = GNUNET_CORE_PRIO_BACKGROUND; for (car = session->active_client_request_head; NULL != car; car = car->next) @@ -588,7 +588,15 @@ solicit_messages (struct Session *session, if (GNUNET_YES == car->was_solicited) continue; car->was_solicited = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting message with priority %u\n", + car->priority); GSC_CLIENTS_solicit_request (car); + /* The above call may *dequeue* requests and thereby + clobber 'nxt'. Hence we need to restart from the + head of the list. */ + nxt = session->active_client_request_head; + so_size = msize; } } @@ -598,11 +606,9 @@ solicit_messages (struct Session *session, * Send them now. * * @param cls `struct Session` with the messages to transmit now - * @param tc scheduler context (unused) */ static void -pop_cork_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +pop_cork_task (void *cls) { struct Session *session = cls; @@ -629,14 +635,19 @@ try_transmission (struct Session *session) enum GNUNET_CORE_Priority maxpc; struct GSC_ClientActiveRequest *car; int excess; - - if (GNUNET_YES != session->ready_to_transmit) - return; + msize = 0; min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; /* if the peer has excess bandwidth, background traffic is allowed, otherwise not */ - excess = GSC_NEIGHBOURS_check_excess_bandwidth (&session->peer); + if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <= + GSC_NEIGHBOURS_get_queue_length (session->kx)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission queue already very long, waiting...\n"); + return; /* queue already too long */ + } + excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx); if (GNUNET_YES == excess) maxp = GNUNET_CORE_PRIO_BACKGROUND; else @@ -672,7 +683,8 @@ try_transmission (struct Session *session) { if (GNUNET_YES == car->was_solicited) continue; - maxpc = GNUNET_MAX (maxpc, car->priority); + maxpc = GNUNET_MAX (maxpc, + car->priority); } if (maxpc > maxp) { @@ -707,7 +719,7 @@ try_transmission (struct Session *session) "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n", excess, maxpc, - msize, + (unsigned int) msize, GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), GNUNET_YES)); solicit_messages (session, @@ -722,10 +734,15 @@ try_transmission (struct Session *session) GNUNET_YES)); if (NULL != session->cork_task) GNUNET_SCHEDULER_cancel (session->cork_task); - session->cork_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (min_deadline), - &pop_cork_task, - session); + session->cork_task + = GNUNET_SCHEDULER_add_at (min_deadline, + &pop_cork_task, + session); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queue empty, waiting for solicitations\n"); } return; } @@ -743,7 +760,15 @@ try_transmission (struct Session *session) while ( (NULL != (pos = session->sme_head)) && (used + pos->size <= msize) ) { - memcpy (&pbuf[used], &pos[1], pos->size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding message of type %d (%d/%d) to payload for %s\n", + ntohs (((const struct GNUNET_MessageHeader *)&pos[1])->type), + pos->is_typemap, + pos->is_typemap_confirm, + GNUNET_i2s (session->peer)); + GNUNET_memcpy (&pbuf[used], + &pos[1], + pos->size); used += pos->size; GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, @@ -761,10 +786,13 @@ try_transmission (struct Session *session) } GNUNET_STATISTICS_set (GSC_stats, "# avg payload per encrypted message", - total_bytes / total_msgs, GNUNET_NO); + total_bytes / total_msgs, + GNUNET_NO); /* now actually transmit... */ session->ready_to_transmit = GNUNET_NO; - GSC_KX_encrypt_and_transmit (session->kxinfo, pbuf, used); + GSC_KX_encrypt_and_transmit (session->kx, + pbuf, + used); } } @@ -788,9 +816,26 @@ do_restart_typemap_message (void *cls, struct SessionMessageEntry *sme; uint16_t size; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Restarting sending TYPEMAP to %s\n", + GNUNET_i2s (session->peer)); size = ntohs (hdr->size); + for (sme = session->sme_head; NULL != sme; sme = sme->next) + { + if (GNUNET_YES == sme->is_typemap) + { + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); + GNUNET_free (sme); + break; + } + } sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); - memcpy (&sme[1], hdr, size); + sme->is_typemap = GNUNET_YES; + GNUNET_memcpy (&sme[1], + hdr, + size); sme->size = size; sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; GNUNET_CONTAINER_DLL_insert (session->sme_head, @@ -831,10 +876,12 @@ GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid) { struct Session *session; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transport solicits for %s\n", + GNUNET_i2s (pid)); session = find_session (pid); if (NULL == session) return; - session->ready_to_transmit = GNUNET_YES; try_transmission (session); } @@ -864,15 +911,21 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, return; msize = ntohs (msg->size); sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize); - memcpy (&sme[1], msg, msize); + GNUNET_memcpy (&sme[1], + msg, + msize); sme->size = msize; sme->priority = priority; if (GNUNET_YES == cork) + { sme->deadline = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Mesage corked, delaying transmission\n"); + } pos = session->sme_head; while ( (NULL != pos) && - (pos->priority > sme->priority) ) + (pos->priority >= sme->priority) ) pos = pos->next; if (NULL == pos) GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, @@ -905,18 +958,37 @@ GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer, nmap = GSC_TYPEMAP_get_from_message (msg); if (NULL == nmap) + { + GNUNET_break_op (0); return; /* malformed */ + } session = find_session (peer); if (NULL == session) { + GSC_TYPEMAP_destroy (nmap); GNUNET_break (0); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received TYPEMAP from %s\n", + GNUNET_i2s (session->peer)); + for (sme = session->sme_head; NULL != sme; sme = sme->next) + { + if (GNUNET_YES == sme->is_typemap_confirm) + { + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); + GNUNET_free (sme); + break; + } + } sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + sizeof (struct TypeMapConfirmationMessage)); sme->deadline = GNUNET_TIME_absolute_get (); sme->size = sizeof (struct TypeMapConfirmationMessage); sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + sme->is_typemap_confirm = GNUNET_YES; tmc = (struct TypeMapConfirmationMessage *) &sme[1]; tmc->header.size = htons (sizeof (struct TypeMapConfirmationMessage)); tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP); @@ -956,11 +1028,15 @@ GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer, return; session = find_session (peer); GNUNET_assert (NULL != session); - if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1)) + if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, + &type, 1)) return; /* already in it */ - nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1); + nmap = GSC_TYPEMAP_extend (session->tmap, + &type, + 1); GSC_CLIENTS_notify_clients_about_neighbour (peer, - session->tmap, nmap); + session->tmap, + nmap); GSC_TYPEMAP_destroy (session->tmap); session->tmap = nmap; } @@ -991,9 +1067,9 @@ free_session_helper (void *cls, const struct GNUNET_PeerIdentity *key, void *value) { - struct Session *session = value; + /* struct Session *session = value; */ - GSC_SESSIONS_end (&session->peer); + GSC_SESSIONS_end (key); return GNUNET_OK; } @@ -1007,7 +1083,8 @@ GSC_SESSIONS_done () if (NULL != sessions) { GNUNET_CONTAINER_multipeermap_iterate (sessions, - &free_session_helper, NULL); + &free_session_helper, + NULL); GNUNET_CONTAINER_multipeermap_destroy (sessions); sessions = NULL; }