X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcore%2Fgnunet-service-core_sessions.c;h=036fd1425ff0c2bfd6b5fdc78117cb8c87cf9aa1;hb=4a99ca1c3eaa4587c9dfbb01790b306014347bce;hp=7cb88005387d7388c312a0e284101da95a2d1807;hpb=9fac6b6eefdc9144053f736fd388cb2199a97046;p=oweals%2Fgnunet.git diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c index 7cb880053..036fd1425 100644 --- a/src/core/gnunet-service-core_sessions.c +++ b/src/core/gnunet-service-core_sessions.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2009-2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -25,17 +25,18 @@ */ #include "platform.h" #include "gnunet-service-core.h" -#include "gnunet-service-core_neighbours.h" #include "gnunet-service-core_kx.h" #include "gnunet-service-core_typemap.h" #include "gnunet-service-core_sessions.h" -#include "gnunet-service-core_clients.h" #include "gnunet_constants.h" +#include "core.h" + /** - * How often do we transmit our typemap? + * How many encrypted messages do we queue at most? + * Needed to bound memory consumption. */ -#define TYPEMAP_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) +#define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4 /** @@ -63,12 +64,17 @@ struct SessionMessageEntry struct GNUNET_TIME_Absolute deadline; /** - * How long is the message? (number of bytes following the "struct - * MessageEntry", but not including the size of "struct - * MessageEntry" itself!) + * How long is the message? (number of bytes following the `struct + * MessageEntry`, but not including the size of `struct + * MessageEntry` itself!) */ size_t size; + /** + * How important is this message. + */ + enum GNUNET_CORE_Priority priority; + }; @@ -80,8 +86,13 @@ struct Session /** * Identity of the other peer. */ - struct GNUNET_PeerIdentity peer; + const struct GNUNET_PeerIdentity *peer; + /** + * Key exchange state for this peer. + */ + struct GSC_KeyExchangeInfo *kx; + /** * Head of list of requests from clients for transmission to * this peer. @@ -104,32 +115,26 @@ struct Session */ struct SessionMessageEntry *sme_tail; - /** - * Information about the key exchange with the other peer. - */ - struct GSC_KeyExchangeInfo *kxinfo; - /** * Current type map for this peer. */ struct GSC_TypeMap *tmap; /** - * At what time did we initially establish this session? - * (currently unused, should be integrated with ATS in the - * future...). + * Task to transmit corked messages with a delay. */ - struct GNUNET_TIME_Absolute time_established; + struct GNUNET_SCHEDULER_Task *cork_task; /** - * Task to transmit corked messages with a delay. + * Task to transmit our type map. */ - GNUNET_SCHEDULER_TaskIdentifier cork_task; + struct GNUNET_SCHEDULER_Task *typemap_task; /** - * Task to transmit our type map. + * Retransmission delay we currently use for the typemap + * transmissions (if not confirmed). */ - GNUNET_SCHEDULER_TaskIdentifier typemap_task; + struct GNUNET_TIME_Relative typemap_delay; /** * Is the neighbour queue empty and thus ready for us @@ -137,13 +142,47 @@ struct Session */ int ready_to_transmit; + /** + * Is this the first time we're sending the typemap? If so, + * we want to send it a bit faster the second time. 0 if + * we are sending for the first time, 1 if not. + */ + int first_typemap; +}; + + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Message sent to confirm that a typemap was received. + */ +struct TypeMapConfirmationMessage +{ + + /** + * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP. + */ + struct GNUNET_MessageHeader header; + + /** + * Reserved, always zero. + */ + uint32_t reserved GNUNET_PACKED; + + /** + * Hash of the (decompressed) type map that was received. + */ + struct GNUNET_HashCode tm_hash; + }; +GNUNET_NETWORK_STRUCT_END + /** - * Map of peer identities to 'struct Session'. + * Map of peer identities to `struct Session`. */ -static struct GNUNET_CONTAINER_MultiHashMap *sessions; +static struct GNUNET_CONTAINER_MultiPeerMap *sessions; /** @@ -156,7 +195,10 @@ static struct GNUNET_CONTAINER_MultiHashMap *sessions; static struct Session * find_session (const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multihashmap_get (sessions, &peer->hashPubKey); + if (NULL == sessions) + return NULL; + return GNUNET_CONTAINER_multipeermap_get (sessions, + peer); } @@ -176,35 +218,43 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) session = find_session (pid); if (NULL == session) return; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying session for peer `%4s'\n", - GNUNET_i2s (&session->peer)); - if (GNUNET_SCHEDULER_NO_TASK != session->cork_task) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying session for peer `%s'\n", + GNUNET_i2s (session->peer)); + if (NULL != session->cork_task) { GNUNET_SCHEDULER_cancel (session->cork_task); - session->cork_task = GNUNET_SCHEDULER_NO_TASK; + session->cork_task = NULL; } while (NULL != (car = session->active_client_request_head)) { GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, session->active_client_request_tail, car); - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_NO); } while (NULL != (sme = session->sme_head)) { - GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme); + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); GNUNET_free (sme); } - GNUNET_SCHEDULER_cancel (session->typemap_task); - GSC_CLIENTS_notify_clients_about_neighbour (&session->peer, NULL, - 0 /* FIXME: ATSI */ , - session->tmap, NULL); + if (NULL != session->typemap_task) + { + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_task = NULL; + } + GSC_CLIENTS_notify_clients_about_neighbour (session->peer, + session->tmap, + NULL); GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (sessions, - &session-> - peer.hashPubKey, + GNUNET_CONTAINER_multipeermap_remove (sessions, + session->peer, session)); - GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# entries in session map"), - GNUNET_CONTAINER_multihashmap_size (sessions), + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), + GNUNET_CONTAINER_multipeermap_size (sessions), GNUNET_NO); GSC_TYPEMAP_destroy (session->tmap); session->tmap = NULL; @@ -214,33 +264,55 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) /** * Transmit our current typemap message to the other peer. - * (Done periodically in case an update got lost). + * (Done periodically until the typemap is confirmed). * - * @param cls the 'struct Session*' - * @param tc unused + * @param cls the `struct Session *` */ static void -transmit_typemap_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +transmit_typemap_task (void *cls) { struct Session *session = cls; struct GNUNET_MessageHeader *hdr; struct GNUNET_TIME_Relative delay; - delay = TYPEMAP_FREQUENCY; + session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay); + delay = session->typemap_delay; /* randomize a bit to avoid spont. sync */ - delay.rel_value += - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000); + delay.rel_value_us += + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000); session->typemap_task = - GNUNET_SCHEDULER_add_delayed (delay, &transmit_typemap_task, session); + GNUNET_SCHEDULER_add_delayed (delay, + &transmit_typemap_task, session); GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# type map refreshes sent"), 1, + gettext_noop ("# type map refreshes sent"), + 1, GNUNET_NO); hdr = GSC_TYPEMAP_compute_type_map_message (); - GSC_KX_encrypt_and_transmit (session->kxinfo, hdr, ntohs (hdr->size)); + GSC_KX_encrypt_and_transmit (session->kx, + hdr, + ntohs (hdr->size)); GNUNET_free (hdr); } +/** + * Restart the typemap task for the given session. + * + * @param session session to restart typemap transmission for + */ +static void +start_typemap_task (struct Session *session) +{ + if (NULL != session->typemap_task) + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_delay = GNUNET_TIME_UNIT_SECONDS; + session->typemap_task = + GNUNET_SCHEDULER_add_delayed (session->typemap_delay, + &transmit_typemap_task, + session); +} + + /** * Create a session, a key exchange was just completed. * @@ -253,43 +325,119 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, { struct Session *session; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating session for peer `%4s'\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating session for peer `%4s'\n", GNUNET_i2s (peer)); - session = GNUNET_malloc (sizeof (struct Session)); + session = GNUNET_new (struct Session); session->tmap = GSC_TYPEMAP_create (); - session->peer = *peer; - session->kxinfo = kx; - session->time_established = GNUNET_TIME_absolute_get (); - session->typemap_task = - GNUNET_SCHEDULER_add_now (&transmit_typemap_task, session); + session->peer = peer; + session->kx = kx; GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (sessions, &peer->hashPubKey, + GNUNET_CONTAINER_multipeermap_put (sessions, + session->peer, session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# entries in session map"), - GNUNET_CONTAINER_multihashmap_size (sessions), + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), + GNUNET_CONTAINER_multipeermap_size (sessions), GNUNET_NO); - GSC_CLIENTS_notify_clients_about_neighbour (peer, NULL, 0 /* FIXME: ATSI */ , - NULL, session->tmap); + GSC_CLIENTS_notify_clients_about_neighbour (peer, + NULL, + session->tmap); + start_typemap_task (session); +} + + +/** + * The other peer has indicated that he 'lost' the session + * (KX down), reinitialize the session on our end, in particular + * this means to restart the typemap transmission. + * + * @param peer peer that is now connected + */ +void +GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer) +{ + struct Session *session; + + session = find_session (peer); + if (NULL == session) + { + /* KX/session is new for both sides; thus no need to restart what + has not yet begun */ + return; + } + start_typemap_task (session); +} + + +/** + * The other peer has confirmed receiving our type map, + * check if it is current and if so, stop retransmitting it. + * + * @param peer peer that confirmed the type map + * @param msg confirmation message we received + */ +void +GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg) +{ + const struct TypeMapConfirmationMessage *cmsg; + struct Session *session; + + session = find_session (peer); + if (NULL == session) + { + GNUNET_break (0); + return; + } + if (ntohs (msg->size) != sizeof (struct TypeMapConfirmationMessage)) + { + GNUNET_break_op (0); + return; + } + cmsg = (const struct TypeMapConfirmationMessage *) msg; + if (GNUNET_YES != + GSC_TYPEMAP_check_hash (&cmsg->tm_hash)) + { + /* our typemap has changed in the meantime, do not + accept confirmation */ + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop + ("# outdated typemap confirmations received"), + 1, GNUNET_NO); + return; + } + if (NULL != session->typemap_task) + { + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_task = NULL; + } + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop + ("# valid typemap confirmations received"), + 1, GNUNET_NO); } /** * Notify the given client about the session (client is new). * - * @param cls the 'struct GSC_Client' + * @param cls the `struct GSC_Client` * @param key peer identity - * @param value the 'struct Session' - * @return GNUNET_OK (continue to iterate) + * @param value the `struct Session` + * @return #GNUNET_OK (continue to iterate) */ static int -notify_client_about_session (void *cls, const GNUNET_HashCode * key, +notify_client_about_session (void *cls, + const struct GNUNET_PeerIdentity *key, void *value) { struct GSC_Client *client = cls; struct Session *session = value; - GSC_CLIENTS_notify_client_about_neighbour (client, &session->peer, NULL, 0, /* FIXME: ATS!? */ + GSC_CLIENTS_notify_client_about_neighbour (client, + session->peer, NULL, /* old TMAP: none */ session->tmap); return GNUNET_OK; @@ -305,7 +453,8 @@ void GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) { /* notify new client about existing sessions */ - GNUNET_CONTAINER_multihashmap_iterate (sessions, ¬ify_client_about_session, + GNUNET_CONTAINER_multipeermap_iterate (sessions, + ¬ify_client_about_session, client); } @@ -325,8 +474,8 @@ try_transmission (struct Session *session); * * @param car request to queue; this handle is then shared between * the caller (CLIENTS subsystem) and SESSIONS and must not - * be released by either until either 'GNUNET_SESSIONS_dequeue', - * 'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed' + * be released by either until either #GSC_SESSIONS_dequeue(), + * #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed() * have been invoked on it */ void @@ -335,24 +484,27 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) struct Session *session; session = find_session (&car->target); - if (session == NULL) + if (NULL == session) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropped client request for transmission (am disconnected)\n"); GNUNET_break (0); /* should have been rejected earlier */ - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_NO); return; } if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { GNUNET_break (0); - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_YES); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received client transmission request. queueing\n"); GNUNET_CONTAINER_DLL_insert (session->active_client_request_head, - session->active_client_request_tail, car); + session->active_client_request_tail, + car); try_transmission (session); } @@ -366,78 +518,70 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) void GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) { - struct Session *s; + struct Session *session; if (0 == - memcmp (&car->target, &GSC_my_identity, + memcmp (&car->target, + &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity))) return; - s = find_session (&car->target); - GNUNET_assert (NULL != s); - GNUNET_CONTAINER_DLL_remove (s->active_client_request_head, - s->active_client_request_tail, car); -} - - -/** - * Discard all expired active transmission requests from clients. - * - * @param session session to clean up - */ -static void -discard_expired_requests (struct Session *session) -{ - struct GSC_ClientActiveRequest *pos; - struct GSC_ClientActiveRequest *nxt; - struct GNUNET_TIME_Absolute now; - - now = GNUNET_TIME_absolute_get (); - pos = NULL; - nxt = session->active_client_request_head; - while (NULL != nxt) - { - pos = nxt; - nxt = pos->next; - if ((pos->deadline.abs_value < now.abs_value) && - (GNUNET_YES != pos->was_solicited)) - { - GNUNET_STATISTICS_update (GSC_stats, - gettext_noop - ("# messages discarded (expired prior to transmission)"), - 1, GNUNET_NO); - GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, - session->active_client_request_tail, pos); - GSC_CLIENTS_reject_request (pos); - } - } + session = find_session (&car->target); + GNUNET_assert (NULL != session); + GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, + session->active_client_request_tail, + car); + /* dequeueing of 'high' priority messages may unblock + transmission for lower-priority messages, so we also + need to try in this case. */ + try_transmission (session); } /** - * Solicit messages for transmission. + * Solicit messages for transmission, starting with those of the highest + * priority. * * @param session session to solict messages for + * @param msize how many bytes do we have already */ static void -solicit_messages (struct Session *session) +solicit_messages (struct Session *session, + size_t msize) { struct GSC_ClientActiveRequest *car; struct GSC_ClientActiveRequest *nxt; size_t so_size; + enum GNUNET_CORE_Priority pmax; - discard_expired_requests (session); - so_size = 0; + so_size = msize; + pmax = GNUNET_CORE_PRIO_BACKGROUND; + for (car = session->active_client_request_head; NULL != car; car = car->next) + { + if (GNUNET_YES == car->was_solicited) + continue; + pmax = GNUNET_MAX (pmax, car->priority); + } nxt = session->active_client_request_head; while (NULL != (car = nxt)) { nxt = car->next; + if (car->priority < pmax) + continue; if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) break; so_size += car->msize; - if (car->was_solicited == GNUNET_YES) + if (GNUNET_YES == car->was_solicited) continue; car->was_solicited = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting message with priority %u\n", + car->priority); GSC_CLIENTS_solicit_request (car); + /* The above call may *dequeue* requests and thereby + clobber 'nxt'. Hence we need to restart from the + head of the list. */ + nxt = session->active_client_request_head; + so_size = msize; } } @@ -446,22 +590,22 @@ solicit_messages (struct Session *session) * Some messages were delayed (corked), but the timeout has now expired. * Send them now. * - * @param cls 'struct Session' with the messages to transmit now - * @param tc scheduler context (unused) + * @param cls `struct Session` with the messages to transmit now */ static void -pop_cork_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +pop_cork_task (void *cls) { struct Session *session = cls; - session->cork_task = GNUNET_SCHEDULER_NO_TASK; + session->cork_task = NULL; try_transmission (session); } /** * Try to perform a transmission on the given session. Will solicit - * additional messages if the 'sme' queue is not full enough. + * additional messages if the 'sme' queue is not full enough or has + * only low-priority messages. * * @param session session to transmit messages from */ @@ -472,42 +616,131 @@ try_transmission (struct Session *session) size_t msize; struct GNUNET_TIME_Absolute now; struct GNUNET_TIME_Absolute min_deadline; + enum GNUNET_CORE_Priority maxp; + enum GNUNET_CORE_Priority maxpc; + struct GSC_ClientActiveRequest *car; + int excess; if (GNUNET_YES != session->ready_to_transmit) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Not yet ready to transmit, not evaluating queue\n"); return; + } msize = 0; min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - /* check 'ready' messages */ + /* if the peer has excess bandwidth, background traffic is allowed, + otherwise not */ + if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <= + GSC_NEIGHBOURS_get_queue_length (session->kx)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission queue already very long, waiting...\n"); + return; /* queue already too long */ + } + excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx); + if (GNUNET_YES == excess) + maxp = GNUNET_CORE_PRIO_BACKGROUND; + else + maxp = GNUNET_CORE_PRIO_BEST_EFFORT; + /* determine highest priority of 'ready' messages we already solicited from clients */ pos = session->sme_head; while ((NULL != pos) && (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)) { GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); msize += pos->size; - min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline); + maxp = GNUNET_MAX (maxp, pos->priority); + min_deadline = GNUNET_TIME_absolute_min (min_deadline, + pos->deadline); pos = pos->next; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calculating transmission set with %u priority (%s) and %s earliest deadline\n", + maxp, + (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + + if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL) + { + /* if highest already solicited priority from clients is not critical, + check if there are higher-priority messages to be solicited from clients */ + if (GNUNET_YES == excess) + maxpc = GNUNET_CORE_PRIO_BACKGROUND; + else + maxpc = GNUNET_CORE_PRIO_BEST_EFFORT; + for (car = session->active_client_request_head; NULL != car; car = car->next) + { + if (GNUNET_YES == car->was_solicited) + continue; + maxpc = GNUNET_MAX (maxpc, + car->priority); + } + if (maxpc > maxp) + { + /* we have messages waiting for solicitation that have a higher + priority than those that we already accepted; solicit the + high-priority messages first */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting messages based on priority (%u > %u)\n", + maxpc, + maxp); + solicit_messages (session, 0); + return; + } + } + else + { + /* never solicit more, we have critical messages to process */ + excess = GNUNET_NO; + maxpc = GNUNET_CORE_PRIO_BACKGROUND; + } now = GNUNET_TIME_absolute_get (); - if ((msize == 0) || - ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) && - (min_deadline.abs_value > now.abs_value))) + if ( ( (GNUNET_YES == excess) || + (maxpc >= GNUNET_CORE_PRIO_BEST_EFFORT) ) && + ( (0 == msize) || + ( (msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) && + (min_deadline.abs_value_us > now.abs_value_us))) ) { - /* not enough ready yet, try to solicit more */ - solicit_messages (session); + /* not enough ready yet (tiny message & cork possible), or no messages at all, + and either excess bandwidth or best-effort or higher message waiting at + client; in this case, we try to solicit more */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n", + excess, + maxpc, + (unsigned int) msize, + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + solicit_messages (session, + msize); if (msize > 0) { /* if there is data to send, just not yet, make sure we do transmit * it once the deadline is reached */ - if (session->cork_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Corking until %s\n", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + if (NULL != session->cork_task) GNUNET_SCHEDULER_cancel (session->cork_task); session->cork_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining - (min_deadline), &pop_cork_task, + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (min_deadline), + &pop_cork_task, session); } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queue empty, waiting for solicitations\n"); + } return; } - /* create plaintext buffer of all messages, encrypt and transmit */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Building combined plaintext buffer to transmit message!\n"); + /* create plaintext buffer of all messages (that fit), encrypt and + transmit */ { static unsigned long long total_bytes; static unsigned int total_msgs; @@ -515,11 +748,14 @@ try_transmission (struct Session *session) size_t used; used = 0; - while ((NULL != (pos = session->sme_head)) && (used + pos->size <= msize)) + while ( (NULL != (pos = session->sme_head)) && + (used + pos->size <= msize) ) { - memcpy (&pbuf[used], &pos[1], pos->size); + GNUNET_memcpy (&pbuf[used], &pos[1], pos->size); used += pos->size; - GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, pos); + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + pos); GNUNET_free (pos); } /* compute average payload size */ @@ -531,52 +767,67 @@ try_transmission (struct Session *session) total_msgs = 1; total_bytes = used; } - GNUNET_STATISTICS_set (GSC_stats, "# avg payload per encrypted message", - total_bytes / total_msgs, GNUNET_NO); + GNUNET_STATISTICS_set (GSC_stats, + "# avg payload per encrypted message", + total_bytes / total_msgs, + GNUNET_NO); /* now actually transmit... */ session->ready_to_transmit = GNUNET_NO; - GSC_KX_encrypt_and_transmit (session->kxinfo, pbuf, used); + GSC_KX_encrypt_and_transmit (session->kx, + pbuf, + used); } } /** - * Send a message to the neighbour now. + * Send an updated typemap message to the neighbour now, + * and restart typemap transmissions. * * @param cls the message * @param key neighbour's identity - * @param value 'struct Neighbour' of the target - * @return always GNUNET_OK + * @param value `struct Neighbour` of the target + * @return always #GNUNET_OK */ static int -do_send_message (void *cls, const GNUNET_HashCode * key, void *value) +do_restart_typemap_message (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { const struct GNUNET_MessageHeader *hdr = cls; struct Session *session = value; - struct SessionMessageEntry *m; + struct SessionMessageEntry *sme; uint16_t size; size = ntohs (hdr->size); - m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); - memcpy (&m[1], hdr, size); - m->size = size; - GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, m); + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); + GNUNET_memcpy (&sme[1], + hdr, + size); + sme->size = size; + sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + GNUNET_CONTAINER_DLL_insert (session->sme_head, + session->sme_tail, + sme); try_transmission (session); + start_typemap_task (session); return GNUNET_OK; } /** - * Broadcast a message to all neighbours. + * Broadcast an updated typemap message to all neighbours. + * Restarts the retransmissions until the typemaps are confirmed. * * @param msg message to transmit */ void -GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg) +GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg) { if (NULL == sessions) return; - GNUNET_CONTAINER_multihashmap_iterate (sessions, &do_send_message, + GNUNET_CONTAINER_multipeermap_iterate (sessions, + &do_restart_typemap_message, (void *) msg); } @@ -608,13 +859,17 @@ GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid) * this handle will now be 'owned' by the SESSIONS subsystem * @param msg message to transmit * @param cork is corking allowed? + * @param priority how important is this message */ void GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg, int cork) + const struct GNUNET_MessageHeader *msg, + int cork, + enum GNUNET_CORE_Priority priority) { struct Session *session; struct SessionMessageEntry *sme; + struct SessionMessageEntry *pos; size_t msize; session = find_session (&car->target); @@ -622,103 +877,37 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, return; msize = ntohs (msg->size); sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize); - memcpy (&sme[1], msg, msize); + GNUNET_memcpy (&sme[1], + msg, + msize); sme->size = msize; + sme->priority = priority; if (GNUNET_YES == cork) + { sme->deadline = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); - GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, sme); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Mesage corked, delaying transmission\n"); + } + pos = session->sme_head; + while ( (NULL != pos) && + (pos->priority >= sme->priority) ) + pos = pos->next; + if (NULL == pos) + GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, + session->sme_tail, + sme); + else + GNUNET_CONTAINER_DLL_insert_after (session->sme_head, + session->sme_tail, + pos->prev, + sme); try_transmission (session); } /** - * Helper function for GSC_SESSIONS_handle_client_iterate_peers. - * - * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies - * @param key identity of the connected peer - * @param value the 'struct Neighbour' for the peer - * @return GNUNET_OK (continue to iterate) - */ -#include "core.h" -static int -queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value) -{ - struct GNUNET_SERVER_TransmitContext *tc = cls; - struct Session *session = value; - struct ConnectNotifyMessage cnm; - - /* FIXME: code duplication with clients... */ - cnm.header.size = htons (sizeof (struct ConnectNotifyMessage)); - cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); - // FIXME: full ats... - cnm.ats_count = htonl (0); - cnm.peer = session->peer; - GNUNET_SERVER_transmit_context_append_message (tc, &cnm.header); - return GNUNET_OK; -} - - -/** - * Handle CORE_ITERATE_PEERS request. For this request type, the client - * does not have to have transmitted an INIT request. All current peers - * are returned, regardless of which message types they accept. - * - * @param cls unused - * @param client client sending the iteration request - * @param message iteration request message - */ -void -GSC_SESSIONS_handle_client_iterate_peers (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader - *message) -{ - struct GNUNET_MessageHeader done_msg; - struct GNUNET_SERVER_TransmitContext *tc; - - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_CONTAINER_multihashmap_iterate (sessions, &queue_connect_message, tc); - done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); - GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); -} - - -/** - * Handle CORE_PEER_CONNECTED request. Notify client about connection - * to the given neighbour. For this request type, the client does not - * have to have transmitted an INIT request. All current peers are - * returned, regardless of which message types they accept. - * - * @param cls unused - * @param client client sending the iteration request - * @param message iteration request message - */ -void -GSC_SESSIONS_handle_client_have_peer (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader - *message) -{ - struct GNUNET_MessageHeader done_msg; - struct GNUNET_SERVER_TransmitContext *tc; - const struct GNUNET_PeerIdentity *peer; - - peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK! - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_CONTAINER_multihashmap_get_multiple (sessions, &peer->hashPubKey, - &queue_connect_message, tc); - done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); - GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); -} - - -/** - * We've received a typemap message from a peer, update ours. + * We have received a typemap message from a peer, update ours. * Notifies clients about the session. * * @param peer peer this is about @@ -730,6 +919,8 @@ GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer, { struct Session *session; struct GSC_TypeMap *nmap; + struct SessionMessageEntry *sme; + struct TypeMapConfirmationMessage *tmc; nmap = GSC_TYPEMAP_get_from_message (msg); if (NULL == nmap) @@ -740,8 +931,24 @@ GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer, GNUNET_break (0); return; } - GSC_CLIENTS_notify_clients_about_neighbour (peer, NULL, 0, /* FIXME: ATS */ - session->tmap, nmap); + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + + sizeof (struct TypeMapConfirmationMessage)); + sme->deadline = GNUNET_TIME_absolute_get (); + sme->size = sizeof (struct TypeMapConfirmationMessage); + sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + tmc = (struct TypeMapConfirmationMessage *) &sme[1]; + tmc->header.size = htons (sizeof (struct TypeMapConfirmationMessage)); + tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP); + tmc->reserved = htonl (0); + GSC_TYPEMAP_hash (nmap, + &tmc->tm_hash); + GNUNET_CONTAINER_DLL_insert (session->sme_head, + session->sme_tail, + sme); + try_transmission (session); + GSC_CLIENTS_notify_clients_about_neighbour (peer, + session->tmap, + nmap); GSC_TYPEMAP_destroy (session->tmap); session->tmap = nmap; } @@ -762,14 +969,16 @@ GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer, struct Session *session; struct GSC_TypeMap *nmap; - if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity))) + if (0 == memcmp (peer, + &GSC_my_identity, + sizeof (struct GNUNET_PeerIdentity))) return; session = find_session (peer); GNUNET_assert (NULL != session); if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1)) return; /* already in it */ nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1); - GSC_CLIENTS_notify_clients_about_neighbour (peer, NULL, 0, /* FIXME: ATS */ + GSC_CLIENTS_notify_clients_about_neighbour (peer, session->tmap, nmap); GSC_TYPEMAP_destroy (session->tmap); session->tmap = nmap; @@ -782,24 +991,28 @@ GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer, void GSC_SESSIONS_init () { - sessions = GNUNET_CONTAINER_multihashmap_create (128); + sessions = GNUNET_CONTAINER_multipeermap_create (128, + GNUNET_YES); } /** - * Helper function for GSC_SESSIONS_handle_client_iterate_peers. + * Helper function for #GSC_SESSIONS_done() to free all + * active sessions. * * @param cls NULL * @param key identity of the connected peer - * @param value the 'struct Session' for the peer - * @return GNUNET_OK (continue to iterate) + * @param value the `struct Session` for the peer + * @return #GNUNET_OK (continue to iterate) */ static int -free_session_helper (void *cls, const GNUNET_HashCode * key, void *value) +free_session_helper (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { - struct Session *session = value; + /* struct Session *session = value; */ - GSC_SESSIONS_end (&session->peer); + GSC_SESSIONS_end (key); return GNUNET_OK; } @@ -810,9 +1023,14 @@ free_session_helper (void *cls, const GNUNET_HashCode * key, void *value) void GSC_SESSIONS_done () { - GNUNET_CONTAINER_multihashmap_iterate (sessions, &free_session_helper, NULL); - GNUNET_CONTAINER_multihashmap_destroy (sessions); - sessions = NULL; + if (NULL != sessions) + { + GNUNET_CONTAINER_multipeermap_iterate (sessions, + &free_session_helper, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (sessions); + sessions = NULL; + } } /* end of gnunet-service-core_sessions.c */