X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcore%2Fgnunet-service-core_sessions.c;h=036fd1425ff0c2bfd6b5fdc78117cb8c87cf9aa1;hb=4a99ca1c3eaa4587c9dfbb01790b306014347bce;hp=93f9b69508b2c02f1a38c35781d02642a86fc6de;hpb=f0f1ce0017db9c6a06d1fdd8650ad01e11e8201b;p=oweals%2Fgnunet.git diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c index 93f9b6950..036fd1425 100644 --- a/src/core/gnunet-service-core_sessions.c +++ b/src/core/gnunet-service-core_sessions.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2009-2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,23 +14,29 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file core/gnunet-service-core_sessions.c - * @brief code for managing of 'encrypted' sessions (key exchange done) + * @brief code for managing of 'encrypted' sessions (key exchange done) * @author Christian Grothoff */ #include "platform.h" #include "gnunet-service-core.h" -#include "gnunet-service-core_neighbours.h" #include "gnunet-service-core_kx.h" #include "gnunet-service-core_typemap.h" #include "gnunet-service-core_sessions.h" -#include "gnunet-service-core_clients.h" #include "gnunet_constants.h" +#include "core.h" + + +/** + * How many encrypted messages do we queue at most? + * Needed to bound memory consumption. + */ +#define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4 /** @@ -58,12 +64,17 @@ struct SessionMessageEntry struct GNUNET_TIME_Absolute deadline; /** - * How long is the message? (number of bytes following the "struct - * MessageEntry", but not including the size of "struct - * MessageEntry" itself!) + * How long is the message? (number of bytes following the `struct + * MessageEntry`, but not including the size of `struct + * MessageEntry` itself!) */ size_t size; + /** + * How important is this message. + */ + enum GNUNET_CORE_Priority priority; + }; @@ -75,8 +86,13 @@ struct Session /** * Identity of the other peer. */ - struct GNUNET_PeerIdentity peer; + const struct GNUNET_PeerIdentity *peer; + /** + * Key exchange state for this peer. + */ + struct GSC_KeyExchangeInfo *kx; + /** * Head of list of requests from clients for transmission to * this peer. @@ -99,76 +115,74 @@ struct Session */ struct SessionMessageEntry *sme_tail; - /** - * Information about the key exchange with the other peer. - */ - struct GSC_KeyExchangeInfo *kxinfo; - /** * Current type map for this peer. */ struct GSC_TypeMap *tmap; /** - * At what time did we initially establish this session? - * (currently unused, should be integrated with ATS in the - * future...). + * Task to transmit corked messages with a delay. */ - struct GNUNET_TIME_Absolute time_established; + struct GNUNET_SCHEDULER_Task *cork_task; /** - * Task to transmit corked messages with a delay. + * Task to transmit our type map. */ - GNUNET_SCHEDULER_TaskIdentifier cork_task; + struct GNUNET_SCHEDULER_Task *typemap_task; /** - * Tracking bandwidth for sending to this peer. - * // FIXME: unused! should it be used? + * Retransmission delay we currently use for the typemap + * transmissions (if not confirmed). */ - struct GNUNET_BANDWIDTH_Tracker available_send_window; + struct GNUNET_TIME_Relative typemap_delay; /** - * Tracking bandwidth for receiving from this peer. - * // FIXME: need to set it! + * Is the neighbour queue empty and thus ready for us + * to transmit an encrypted message? */ - struct GNUNET_BANDWIDTH_Tracker available_recv_window; + int ready_to_transmit; /** - * Available bandwidth out for this peer (current target). - * // FIXME: check usage! + * Is this the first time we're sending the typemap? If so, + * we want to send it a bit faster the second time. 0 if + * we are sending for the first time, 1 if not. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out; + int first_typemap; +}; + + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Message sent to confirm that a typemap was received. + */ +struct TypeMapConfirmationMessage +{ /** - * Internal bandwidth limit set for this peer (initially typically - * set to "-1"). Actual "bw_out" is MIN of - * "bpm_out_internal_limit" and "bw_out_external_limit". - * // FIXME: check usage + * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit; + struct GNUNET_MessageHeader header; /** - * External bandwidth limit set for this peer by the - * peer that we are communicating with. "bw_out" is MIN of - * "bw_out_internal_limit" and "bw_out_external_limit". - * // FIXME: check usage + * Reserved, always zero. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit; - + uint32_t reserved GNUNET_PACKED; /** - * Is the neighbour queue empty and thus ready for us - * to transmit an encrypted message? + * Hash of the (decompressed) type map that was received. */ - int ready_to_transmit; + struct GNUNET_HashCode tm_hash; }; +GNUNET_NETWORK_STRUCT_END + /** - * Map of peer identities to 'struct Session'. + * Map of peer identities to `struct Session`. */ -static struct GNUNET_CONTAINER_MultiHashMap *sessions; +static struct GNUNET_CONTAINER_MultiPeerMap *sessions; /** @@ -181,13 +195,16 @@ static struct GNUNET_CONTAINER_MultiHashMap *sessions; static struct Session * find_session (const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multihashmap_get (sessions, &peer->hashPubKey); + if (NULL == sessions) + return NULL; + return GNUNET_CONTAINER_multipeermap_get (sessions, + peer); } /** * End the session with the given peer (we are no longer - * connected). + * connected). * * @param pid identity of peer to kill session with */ @@ -196,38 +213,106 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) { struct Session *session; struct GSC_ClientActiveRequest *car; + struct SessionMessageEntry *sme; session = find_session (pid); if (NULL == session) return; -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Destroying session for peer `%4s'\n", - GNUNET_i2s (&session->peer)); -#endif - if (GNUNET_SCHEDULER_NO_TASK != session->cork_task) + "Destroying session for peer `%s'\n", + GNUNET_i2s (session->peer)); + if (NULL != session->cork_task) { GNUNET_SCHEDULER_cancel (session->cork_task); - session->cork_task = GNUNET_SCHEDULER_NO_TASK; + session->cork_task = NULL; } - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (sessions, - &session->peer.hashPubKey, session)); while (NULL != (car = session->active_client_request_head)) { GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, - session->active_client_request_tail, - car); - GSC_CLIENTS_reject_request (car); + session->active_client_request_tail, car); + GSC_CLIENTS_reject_request (car, + GNUNET_NO); + } + while (NULL != (sme = session->sme_head)) + { + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); + GNUNET_free (sme); + } + if (NULL != session->typemap_task) + { + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_task = NULL; } - GNUNET_STATISTICS_set (GSC_stats, - gettext_noop ("# established sessions"), - GNUNET_CONTAINER_multihashmap_size (sessions), - GNUNET_NO); + GSC_CLIENTS_notify_clients_about_neighbour (session->peer, + session->tmap, + NULL); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (sessions, + session->peer, + session)); + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), + GNUNET_CONTAINER_multipeermap_size (sessions), + GNUNET_NO); + GSC_TYPEMAP_destroy (session->tmap); + session->tmap = NULL; GNUNET_free (session); } +/** + * Transmit our current typemap message to the other peer. + * (Done periodically until the typemap is confirmed). + * + * @param cls the `struct Session *` + */ +static void +transmit_typemap_task (void *cls) +{ + struct Session *session = cls; + struct GNUNET_MessageHeader *hdr; + struct GNUNET_TIME_Relative delay; + + session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay); + delay = session->typemap_delay; + /* randomize a bit to avoid spont. sync */ + delay.rel_value_us += + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000); + session->typemap_task = + GNUNET_SCHEDULER_add_delayed (delay, + &transmit_typemap_task, session); + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop ("# type map refreshes sent"), + 1, + GNUNET_NO); + hdr = GSC_TYPEMAP_compute_type_map_message (); + GSC_KX_encrypt_and_transmit (session->kx, + hdr, + ntohs (hdr->size)); + GNUNET_free (hdr); +} + + +/** + * Restart the typemap task for the given session. + * + * @param session session to restart typemap transmission for + */ +static void +start_typemap_task (struct Session *session) +{ + if (NULL != session->typemap_task) + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_delay = GNUNET_TIME_UNIT_SECONDS; + session->typemap_task = + GNUNET_SCHEDULER_add_delayed (session->typemap_delay, + &transmit_typemap_task, + session); +} + + /** * Create a session, a key exchange was just completed. * @@ -236,67 +321,125 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) */ void GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, - struct GSC_KeyExchangeInfo *kx) + struct GSC_KeyExchangeInfo *kx) { - struct GNUNET_MessageHeader *hdr; struct Session *session; -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating session for peer `%4s'\n", GNUNET_i2s (pid)); -#endif - session = GNUNET_malloc (sizeof (struct Session)); - session->peer = *peer; - session->kxinfo = kx; - session->time_established = GNUNET_TIME_absolute_get (); + "Creating session for peer `%4s'\n", + GNUNET_i2s (peer)); + session = GNUNET_new (struct Session); + session->tmap = GSC_TYPEMAP_create (); + session->peer = peer; + session->kx = kx; GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (sessions, - &peer->hashPubKey, session, + GNUNET_CONTAINER_multipeermap_put (sessions, + session->peer, + session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# established sessions"), - GNUNET_CONTAINER_multihashmap_size (sessions), - GNUNET_NO); -#if 0 - /* FIXME: integration with ATS for quota calculations... */ - /* FIXME: who should do this? Neighbours!? */ - GNUNET_TRANSPORT_set_quota (transport, - peer, - GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, - GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT); -#endif - /* FIXME: we should probably do this periodically (in case - type map message is lost...) */ - hdr = GSC_TYPEMAP_compute_type_map_message (); - GSC_KX_encrypt_and_transmit (kx, - GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, - hdr, - ntohs (hdr->size)); - GNUNET_free (hdr); + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), + GNUNET_CONTAINER_multipeermap_size (sessions), + GNUNET_NO); + GSC_CLIENTS_notify_clients_about_neighbour (peer, + NULL, + session->tmap); + start_typemap_task (session); +} + + +/** + * The other peer has indicated that he 'lost' the session + * (KX down), reinitialize the session on our end, in particular + * this means to restart the typemap transmission. + * + * @param peer peer that is now connected + */ +void +GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer) +{ + struct Session *session; + + session = find_session (peer); + if (NULL == session) + { + /* KX/session is new for both sides; thus no need to restart what + has not yet begun */ + return; + } + start_typemap_task (session); +} + + +/** + * The other peer has confirmed receiving our type map, + * check if it is current and if so, stop retransmitting it. + * + * @param peer peer that confirmed the type map + * @param msg confirmation message we received + */ +void +GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg) +{ + const struct TypeMapConfirmationMessage *cmsg; + struct Session *session; + + session = find_session (peer); + if (NULL == session) + { + GNUNET_break (0); + return; + } + if (ntohs (msg->size) != sizeof (struct TypeMapConfirmationMessage)) + { + GNUNET_break_op (0); + return; + } + cmsg = (const struct TypeMapConfirmationMessage *) msg; + if (GNUNET_YES != + GSC_TYPEMAP_check_hash (&cmsg->tm_hash)) + { + /* our typemap has changed in the meantime, do not + accept confirmation */ + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop + ("# outdated typemap confirmations received"), + 1, GNUNET_NO); + return; + } + if (NULL != session->typemap_task) + { + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_task = NULL; + } + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop + ("# valid typemap confirmations received"), + 1, GNUNET_NO); } /** * Notify the given client about the session (client is new). * - * @param cls the 'struct GSC_Client' - * @param key peer identity - * @param value the 'struct Session' - * @return GNUNET_OK (continue to iterate) + * @param cls the `struct GSC_Client` + * @param key peer identity + * @param value the `struct Session` + * @return #GNUNET_OK (continue to iterate) */ static int notify_client_about_session (void *cls, - const GNUNET_HashCode *key, - void *value) + const struct GNUNET_PeerIdentity *key, + void *value) { struct GSC_Client *client = cls; struct Session *session = value; - GDS_CLIENTS_notify_client_about_neighbour (client, - &session->peer, - NULL, 0, /* FIXME: ATS!? */ - NULL, /* old TMAP: none */ - session->tmap); + GSC_CLIENTS_notify_client_about_neighbour (client, + session->peer, + NULL, /* old TMAP: none */ + session->tmap); return GNUNET_OK; } @@ -310,8 +453,9 @@ void GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) { /* notify new client about existing sessions */ - GNUNET_CONTAINER_multihashmap_iterate (sessions, - ¬ify_client_about_session, client); + GNUNET_CONTAINER_multipeermap_iterate (sessions, + ¬ify_client_about_session, + client); } @@ -330,8 +474,8 @@ try_transmission (struct Session *session); * * @param car request to queue; this handle is then shared between * the caller (CLIENTS subsystem) and SESSIONS and must not - * be released by either until either 'GNUNET_SESSIONS_dequeue', - * 'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed' + * be released by either until either #GSC_SESSIONS_dequeue(), + * #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed() * have been invoked on it */ void @@ -340,34 +484,27 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) struct Session *session; session = find_session (&car->target); - if (session == NULL) + if (NULL == session) { - /* neighbour must have disconnected since request was issued, - * ignore (client will realize it once it processes the - * disconnect notification) */ -#if DEBUG_CORE_CLIENT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropped client request for transmission (am disconnected)\n"); -#endif - GNUNET_STATISTICS_update (GSC_stats, - gettext_noop - ("# send requests dropped (disconnected)"), 1, - GNUNET_NO); - GSC_CLIENTS_reject_request (car); + GNUNET_break (0); /* should have been rejected earlier */ + GSC_CLIENTS_reject_request (car, + GNUNET_NO); return; } if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { GNUNET_break (0); - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_YES); return; } -#if DEBUG_CORE_CLIENT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received client transmission request. queueing\n"); -#endif GNUNET_CONTAINER_DLL_insert (session->active_client_request_head, - session->active_client_request_tail, car); + session->active_client_request_tail, + car); try_transmission (session); } @@ -381,96 +518,94 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) void GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) { - struct Session *s; - - s = find_session (&car->target); - GNUNET_CONTAINER_DLL_remove (s->active_client_request_head, - s->active_client_request_tail, car); -} - + struct Session *session; -/** - * Discard all expired active transmission requests from clients. - * - * @param session session to clean up - */ -static void -discard_expired_requests (struct Session *session) -{ - struct GSC_ClientActiveRequest *pos; - struct GSC_ClientActiveRequest *nxt; - struct GNUNET_TIME_Absolute now; - - now = GNUNET_TIME_absolute_get (); - pos = NULL; - nxt = session->active_client_request_head; - while (NULL != nxt) - { - pos = nxt; - nxt = pos->next; - if ( (pos->deadline.abs_value < now.abs_value) && - (GNUNET_YES != pos->was_solicited) ) - { - GNUNET_STATISTICS_update (GSC_stats, - gettext_noop - ("# messages discarded (expired prior to transmission)"), - 1, GNUNET_NO); - GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, - session->active_client_request_tail, - pos); - GSC_CLIENTS_reject_request (pos); - } - } + if (0 == + memcmp (&car->target, + &GSC_my_identity, + sizeof (struct GNUNET_PeerIdentity))) + return; + session = find_session (&car->target); + GNUNET_assert (NULL != session); + GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, + session->active_client_request_tail, + car); + /* dequeueing of 'high' priority messages may unblock + transmission for lower-priority messages, so we also + need to try in this case. */ + try_transmission (session); } /** - * Solicit messages for transmission. + * Solicit messages for transmission, starting with those of the highest + * priority. * * @param session session to solict messages for + * @param msize how many bytes do we have already */ static void -solicit_messages (struct Session *session) +solicit_messages (struct Session *session, + size_t msize) { struct GSC_ClientActiveRequest *car; + struct GSC_ClientActiveRequest *nxt; size_t so_size; + enum GNUNET_CORE_Priority pmax; - discard_expired_requests (session); - so_size = 0; + so_size = msize; + pmax = GNUNET_CORE_PRIO_BACKGROUND; for (car = session->active_client_request_head; NULL != car; car = car->next) { + if (GNUNET_YES == car->was_solicited) + continue; + pmax = GNUNET_MAX (pmax, car->priority); + } + nxt = session->active_client_request_head; + while (NULL != (car = nxt)) + { + nxt = car->next; + if (car->priority < pmax) + continue; if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) break; so_size += car->msize; - if (car->was_solicited == GNUNET_YES) + if (GNUNET_YES == car->was_solicited) continue; car->was_solicited = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting message with priority %u\n", + car->priority); GSC_CLIENTS_solicit_request (car); + /* The above call may *dequeue* requests and thereby + clobber 'nxt'. Hence we need to restart from the + head of the list. */ + nxt = session->active_client_request_head; + so_size = msize; } } /** - * Some messages were delayed (corked), but the timeout has now expired. + * Some messages were delayed (corked), but the timeout has now expired. * Send them now. * - * @param cls 'struct Session' with the messages to transmit now - * @param tc scheduler context (unused) + * @param cls `struct Session` with the messages to transmit now */ static void -pop_cork_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +pop_cork_task (void *cls) { - struct Session *session = session; + struct Session *session = cls; - session->cork_task = GNUNET_SCHEDULER_NO_TASK; + session->cork_task = NULL; try_transmission (session); } /** * Try to perform a transmission on the given session. Will solicit - * additional messages if the 'sme' queue is not full enough. + * additional messages if the 'sme' queue is not full enough or has + * only low-priority messages. * * @param session session to transmit messages from */ @@ -481,55 +616,147 @@ try_transmission (struct Session *session) size_t msize; struct GNUNET_TIME_Absolute now; struct GNUNET_TIME_Absolute min_deadline; + enum GNUNET_CORE_Priority maxp; + enum GNUNET_CORE_Priority maxpc; + struct GSC_ClientActiveRequest *car; + int excess; if (GNUNET_YES != session->ready_to_transmit) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Not yet ready to transmit, not evaluating queue\n"); return; + } msize = 0; min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - /* check 'ready' messages */ + /* if the peer has excess bandwidth, background traffic is allowed, + otherwise not */ + if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <= + GSC_NEIGHBOURS_get_queue_length (session->kx)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission queue already very long, waiting...\n"); + return; /* queue already too long */ + } + excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx); + if (GNUNET_YES == excess) + maxp = GNUNET_CORE_PRIO_BACKGROUND; + else + maxp = GNUNET_CORE_PRIO_BEST_EFFORT; + /* determine highest priority of 'ready' messages we already solicited from clients */ pos = session->sme_head; - GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); - while ( (NULL != pos) && - (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) ) + while ((NULL != pos) && + (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)) { + GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); msize += pos->size; + maxp = GNUNET_MAX (maxp, pos->priority); min_deadline = GNUNET_TIME_absolute_min (min_deadline, - pos->deadline); + pos->deadline); pos = pos->next; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calculating transmission set with %u priority (%s) and %s earliest deadline\n", + maxp, + (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + + if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL) + { + /* if highest already solicited priority from clients is not critical, + check if there are higher-priority messages to be solicited from clients */ + if (GNUNET_YES == excess) + maxpc = GNUNET_CORE_PRIO_BACKGROUND; + else + maxpc = GNUNET_CORE_PRIO_BEST_EFFORT; + for (car = session->active_client_request_head; NULL != car; car = car->next) + { + if (GNUNET_YES == car->was_solicited) + continue; + maxpc = GNUNET_MAX (maxpc, + car->priority); + } + if (maxpc > maxp) + { + /* we have messages waiting for solicitation that have a higher + priority than those that we already accepted; solicit the + high-priority messages first */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting messages based on priority (%u > %u)\n", + maxpc, + maxp); + solicit_messages (session, 0); + return; + } + } + else + { + /* never solicit more, we have critical messages to process */ + excess = GNUNET_NO; + maxpc = GNUNET_CORE_PRIO_BACKGROUND; + } now = GNUNET_TIME_absolute_get (); - if ( (msize == 0) || - ( (msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) && - (min_deadline.abs_value > now.abs_value) ) ) + if ( ( (GNUNET_YES == excess) || + (maxpc >= GNUNET_CORE_PRIO_BEST_EFFORT) ) && + ( (0 == msize) || + ( (msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) && + (min_deadline.abs_value_us > now.abs_value_us))) ) { - /* not enough ready yet, try to solicit more */ - solicit_messages (session); + /* not enough ready yet (tiny message & cork possible), or no messages at all, + and either excess bandwidth or best-effort or higher message waiting at + client; in this case, we try to solicit more */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n", + excess, + maxpc, + (unsigned int) msize, + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + solicit_messages (session, + msize); if (msize > 0) { /* if there is data to send, just not yet, make sure we do transmit - it once the deadline is reached */ - if (session->cork_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (session->cork_task); - session->cork_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (min_deadline), - &pop_cork_task, - session); + * it once the deadline is reached */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Corking until %s\n", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + if (NULL != session->cork_task) + GNUNET_SCHEDULER_cancel (session->cork_task); + session->cork_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (min_deadline), + &pop_cork_task, + session); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queue empty, waiting for solicitations\n"); } return; } - /* create plaintext buffer of all messages, encrypt and transmit */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Building combined plaintext buffer to transmit message!\n"); + /* create plaintext buffer of all messages (that fit), encrypt and + transmit */ { static unsigned long long total_bytes; static unsigned int total_msgs; - char pbuf[msize]; /* plaintext */ + char pbuf[msize]; /* plaintext */ size_t used; used = 0; - pos = session->sme_head; - while ( (NULL != pos) && - (used + pos->size <= msize) ) + while ( (NULL != (pos = session->sme_head)) && + (used + pos->size <= msize) ) { - memcpy (&pbuf[used], &pos[1], pos->size); + GNUNET_memcpy (&pbuf[used], &pos[1], pos->size); used += pos->size; + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + pos); + GNUNET_free (pos); } /* compute average payload size */ total_bytes += used; @@ -540,60 +767,68 @@ try_transmission (struct Session *session) total_msgs = 1; total_bytes = used; } - GNUNET_STATISTICS_set (GSC_stats, - "# avg payload per encrypted message", - total_bytes / total_msgs, - GNUNET_NO); + GNUNET_STATISTICS_set (GSC_stats, + "# avg payload per encrypted message", + total_bytes / total_msgs, + GNUNET_NO); /* now actually transmit... */ session->ready_to_transmit = GNUNET_NO; - GSC_KX_encrypt_and_transmit (session->kxinfo, - GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT /* FIXME! */, - pbuf, - used); + GSC_KX_encrypt_and_transmit (session->kx, + pbuf, + used); } } /** - * Send a message to the neighbour now. + * Send an updated typemap message to the neighbour now, + * and restart typemap transmissions. * * @param cls the message * @param key neighbour's identity - * @param value 'struct Neighbour' of the target - * @return always GNUNET_OK + * @param value `struct Neighbour` of the target + * @return always #GNUNET_OK */ static int -do_send_message (void *cls, const GNUNET_HashCode * key, void *value) +do_restart_typemap_message (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { const struct GNUNET_MessageHeader *hdr = cls; struct Session *session = value; - struct SessionMessageEntry *m; + struct SessionMessageEntry *sme; uint16_t size; size = ntohs (hdr->size); - m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); - memcpy (&m[1], hdr, size); - m->size = size; + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); + GNUNET_memcpy (&sme[1], + hdr, + size); + sme->size = size; + sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; GNUNET_CONTAINER_DLL_insert (session->sme_head, - session->sme_tail, - m); + session->sme_tail, + sme); try_transmission (session); + start_typemap_task (session); return GNUNET_OK; } /** - * Broadcast a message to all neighbours. + * Broadcast an updated typemap message to all neighbours. + * Restarts the retransmissions until the typemaps are confirmed. * * @param msg message to transmit */ void -GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg) +GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg) { if (NULL == sessions) return; - GNUNET_CONTAINER_multihashmap_iterate (sessions, - &do_send_message, (void*) msg); + GNUNET_CONTAINER_multipeermap_iterate (sessions, + &do_restart_typemap_message, + (void *) msg); } @@ -610,6 +845,8 @@ GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid) struct Session *session; session = find_session (pid); + if (NULL == session) + return; session->ready_to_transmit = GNUNET_YES; try_transmission (session); } @@ -622,227 +859,129 @@ GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid) * this handle will now be 'owned' by the SESSIONS subsystem * @param msg message to transmit * @param cork is corking allowed? + * @param priority how important is this message */ void GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg, - int cork) + const struct GNUNET_MessageHeader *msg, + int cork, + enum GNUNET_CORE_Priority priority) { struct Session *session; struct SessionMessageEntry *sme; + struct SessionMessageEntry *pos; size_t msize; session = find_session (&car->target); + if (NULL == session) + return; msize = ntohs (msg->size); sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize); - memcpy (&sme[1], msg, msize); + GNUNET_memcpy (&sme[1], + msg, + msize); sme->size = msize; + sme->priority = priority; if (GNUNET_YES == cork) - sme->deadline = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); - GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, - session->sme_tail, - sme); + { + sme->deadline = + GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Mesage corked, delaying transmission\n"); + } + pos = session->sme_head; + while ( (NULL != pos) && + (pos->priority >= sme->priority) ) + pos = pos->next; + if (NULL == pos) + GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, + session->sme_tail, + sme); + else + GNUNET_CONTAINER_DLL_insert_after (session->sme_head, + session->sme_tail, + pos->prev, + sme); try_transmission (session); } /** - * Helper function for GSC_SESSIONS_handle_client_iterate_peers. - * - * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies - * @param key identity of the connected peer - * @param value the 'struct Neighbour' for the peer - * @return GNUNET_OK (continue to iterate) - */ -#include "core.h" -static int -queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value) -{ - struct GNUNET_SERVER_TransmitContext *tc = cls; - struct Session *session = value; - struct ConnectNotifyMessage cnm; - struct GNUNET_TRANSPORT_ATS_Information *a; - - /* FIXME: code duplication with clients... */ - cnm.header.size = htons (sizeof (struct ConnectNotifyMessage)); - cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); - cnm.ats_count = htonl (0); - cnm.peer = session->peer; - a = &cnm.ats; - // FIXME: full ats... - a[0].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); - a[0].value = htonl (0); - GNUNET_SERVER_transmit_context_append_message (tc, &cnm.header); - return GNUNET_OK; -} - - -/** - * Handle CORE_ITERATE_PEERS request. For this request type, the client - * does not have to have transmitted an INIT request. All current peers - * are returned, regardless of which message types they accept. - * - * @param cls unused - * @param client client sending the iteration request - * @param message iteration request message - */ -void -GSC_SESSIONS_handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_MessageHeader done_msg; - struct GNUNET_SERVER_TransmitContext *tc; - - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_CONTAINER_multihashmap_iterate (sessions, - &queue_connect_message, - tc); - done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); - GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); -} - - -/** - * Handle CORE_PEER_CONNECTED request. Notify client about connection - * to the given neighbour. For this request type, the client does not - * have to have transmitted an INIT request. All current peers are - * returned, regardless of which message types they accept. - * - * @param cls unused - * @param client client sending the iteration request - * @param message iteration request message - */ -void -GSC_SESSIONS_handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_MessageHeader done_msg; - struct GNUNET_SERVER_TransmitContext *tc; - const struct GNUNET_PeerIdentity *peer; - - peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK! - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_CONTAINER_multihashmap_get_multiple (sessions, &peer->hashPubKey, - &queue_connect_message, tc); - done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); - GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); -} - - -/** - * Handle REQUEST_INFO request. For this request type, the client must - * have transmitted an INIT first. + * We have received a typemap message from a peer, update ours. + * Notifies clients about the session. * - * @param cls unused - * @param client client sending the request - * @param message iteration request message + * @param peer peer this is about + * @param msg typemap update message */ void -GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg) { - const struct RequestInfoMessage *rcm; struct Session *session; - struct ConfigurationInfoMessage cim; - int32_t want_reserv; - int32_t got_reserv; - struct GNUNET_TIME_Relative rdelay; + struct GSC_TypeMap *nmap; + struct SessionMessageEntry *sme; + struct TypeMapConfirmationMessage *tmc; - rdelay = GNUNET_TIME_UNIT_ZERO; -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Core service receives `%s' request.\n", - "REQUEST_INFO"); -#endif - rcm = (const struct RequestInfoMessage *) message; - session = find_session (&rcm->peer); + nmap = GSC_TYPEMAP_get_from_message (msg); + if (NULL == nmap) + return; /* malformed */ + session = find_session (peer); if (NULL == session) { - /* Technically, this COULD happen (due to asynchronous behavior), - * but it should be rare, so we should generate an info event - * to help diagnosis of serious errors that might be masked by this */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Client asked for preference change with peer `%s', which is not connected!\n"), - GNUNET_i2s (&rcm->peer)); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_break (0); return; } - - want_reserv = ntohl (rcm->reserve_inbound); - if (session->bw_out_internal_limit.value__ != rcm->limit_outbound.value__) - { - session->bw_out_internal_limit = rcm->limit_outbound; - if (session->bw_out.value__ != - GNUNET_BANDWIDTH_value_min (session->bw_out_internal_limit, - session->bw_out_external_limit).value__) - { - session->bw_out = - GNUNET_BANDWIDTH_value_min (session->bw_out_internal_limit, - session->bw_out_external_limit); - GNUNET_BANDWIDTH_tracker_update_quota (&session->available_recv_window, - session->bw_out); -#if 0 - // FIXME: who does this? - GNUNET_TRANSPORT_set_quota (transport, &session->peer, - session->bw_in, - session->bw_out); -#endif - } - } - if (want_reserv < 0) - { - got_reserv = want_reserv; - } - else if (want_reserv > 0) - { - rdelay = - GNUNET_BANDWIDTH_tracker_get_delay (&session->available_recv_window, - want_reserv); - if (rdelay.rel_value == 0) - got_reserv = want_reserv; - else - got_reserv = 0; /* all or nothing */ - } - else - got_reserv = 0; - GNUNET_BANDWIDTH_tracker_consume (&session->available_recv_window, got_reserv); -#if DEBUG_CORE_QUOTA - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received reservation request for %d bytes for peer `%4s', reserved %d bytes, suggesting delay of %llu ms\n", - (int) want_reserv, GNUNET_i2s (&rcm->peer), (int) got_reserv, - (unsigned long long) rdelay.rel_value); -#endif - cim.header.size = htons (sizeof (struct ConfigurationInfoMessage)); - cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO); - cim.reserved_amount = htonl (got_reserv); - cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay); - cim.rim_id = rcm->rim_id; - cim.bw_out = session->bw_out; - cim.preference = 0; /* FIXME: remove */ - cim.peer = rcm->peer; - GSC_CLIENTS_send_to_client (client, &cim.header, GNUNET_NO); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + + sizeof (struct TypeMapConfirmationMessage)); + sme->deadline = GNUNET_TIME_absolute_get (); + sme->size = sizeof (struct TypeMapConfirmationMessage); + sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + tmc = (struct TypeMapConfirmationMessage *) &sme[1]; + tmc->header.size = htons (sizeof (struct TypeMapConfirmationMessage)); + tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP); + tmc->reserved = htonl (0); + GSC_TYPEMAP_hash (nmap, + &tmc->tm_hash); + GNUNET_CONTAINER_DLL_insert (session->sme_head, + session->sme_tail, + sme); + try_transmission (session); + GSC_CLIENTS_notify_clients_about_neighbour (peer, + session->tmap, + nmap); + GSC_TYPEMAP_destroy (session->tmap); + session->tmap = nmap; } /** - * Update information about a session. + * The given peer send a message of the specified type. Make sure the + * respective bit is set in its type-map and that clients are notified + * about the session. * - * @param peer peer who's session should be updated - * @param bw_out new outbound bandwidth limit for the peer - * @param atsi performance information - * @param atsi_count number of performance records supplied + * @param peer peer this is about + * @param type type of the message */ void -GSC_SESSIONS_update (const struct GNUNET_PeerIdentity *peer, - struct GNUNET_BANDWIDTH_Value32NBO bw_out) +GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer, + uint16_t type) { - // FIXME - /* not implemented */ + struct Session *session; + struct GSC_TypeMap *nmap; + + if (0 == memcmp (peer, + &GSC_my_identity, + sizeof (struct GNUNET_PeerIdentity))) + return; + session = find_session (peer); + GNUNET_assert (NULL != session); + if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1)) + return; /* already in it */ + nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1); + GSC_CLIENTS_notify_clients_about_neighbour (peer, + session->tmap, nmap); + GSC_TYPEMAP_destroy (session->tmap); + session->tmap = nmap; } @@ -852,24 +991,28 @@ GSC_SESSIONS_update (const struct GNUNET_PeerIdentity *peer, void GSC_SESSIONS_init () { - sessions = GNUNET_CONTAINER_multihashmap_create (128); + sessions = GNUNET_CONTAINER_multipeermap_create (128, + GNUNET_YES); } /** - * Helper function for GSC_SESSIONS_handle_client_iterate_peers. + * Helper function for #GSC_SESSIONS_done() to free all + * active sessions. * * @param cls NULL * @param key identity of the connected peer - * @param value the 'struct Session' for the peer - * @return GNUNET_OK (continue to iterate) + * @param value the `struct Session` for the peer + * @return #GNUNET_OK (continue to iterate) */ static int -free_session_helper (void *cls, const GNUNET_HashCode * key, void *value) +free_session_helper (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { - struct Session *session = value; + /* struct Session *session = value; */ - GSC_SESSIONS_end (&session->peer); + GSC_SESSIONS_end (key); return GNUNET_OK; } @@ -880,15 +1023,14 @@ free_session_helper (void *cls, const GNUNET_HashCode * key, void *value) void GSC_SESSIONS_done () { - GNUNET_CONTAINER_multihashmap_iterate (sessions, - &free_session_helper, - NULL); - GNUNET_CONTAINER_multihashmap_destroy (sessions); - sessions = NULL; - GNUNET_STATISTICS_set (GSC_stats, - gettext_noop ("# established sessions"), - 0, GNUNET_NO); + if (NULL != sessions) + { + GNUNET_CONTAINER_multipeermap_iterate (sessions, + &free_session_helper, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (sessions); + sessions = NULL; + } } /* end of gnunet-service-core_sessions.c */ -