X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcore%2Fgnunet-service-core_sessions.c;h=973ef9c2faaa7d5592efa9c6df1e4ce478aed866;hb=4b766fd267ca83a8faa4e22353d5942074d6f2b7;hp=b72a0e0b3e176767d02a31d52620fea0eb72a4ef;hpb=0babf19b445e521fdb78f0e1a67d5548e28405b9;p=oweals%2Fgnunet.git diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c index b72a0e0b3..973ef9c2f 100644 --- a/src/core/gnunet-service-core_sessions.c +++ b/src/core/gnunet-service-core_sessions.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) + Copyright (C) 2009-2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,1459 +14,853 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file core/gnunet-service-core_sessions.c - * @brief code for managing of 'encrypted' sessions (key exchange done) + * @brief code for managing of 'encrypted' sessions (key exchange done) * @author Christian Grothoff */ #include "platform.h" -#include "gnunet_service_core.h" -#include "gnunet_service_core_neighbours.h" -#include "gnunet_service_core_kx.h" -#include "gnunet_service_core_sessions.h" +#include "gnunet-service-core.h" +#include "gnunet-service-core_kx.h" +#include "gnunet-service-core_typemap.h" +#include "gnunet-service-core_sessions.h" +#include "gnunet_constants.h" +#include "core.h" + /** - * Record kept for each request for transmission issued by a - * client that is still pending. + * How many encrypted messages do we queue at most? + * Needed to bound memory consumption. */ -struct GSC_ClientActiveRequest; +#define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4 + /** - * Data kept per session. + * Message ready for encryption. This struct is followed by the + * actual content of the message. */ -struct Session +struct SessionMessageEntry { + /** - * Identity of the other peer. + * We keep messages in a doubly linked list. */ - struct GNUNET_PeerIdentity peer; + struct SessionMessageEntry *next; /** - * Head of list of requests from clients for transmission to - * this peer. + * We keep messages in a doubly linked list. */ - struct GSC_ClientActiveRequest *active_client_request_head; + struct SessionMessageEntry *prev; /** - * Tail of list of requests from clients for transmission to - * this peer. + * How important is this message. */ - struct GSC_ClientActiveRequest *active_client_request_tail; + enum GNUNET_CORE_Priority priority; /** - * Performance data for the peer. + * Flag set to #GNUNET_YES if this is a typemap message. */ - struct GNUNET_TRANSPORT_ATS_Information *ats; + int is_typemap; /** - * Information about the key exchange with the other peer. + * Flag set to #GNUNET_YES if this is a typemap confirmation message. */ - struct GSC_KeyExchangeInfo *kxinfo; - + int is_typemap_confirm; /** - * ID of task used for cleaning up dead neighbour entries. + * Deadline for transmission, 1s after we received it (if we + * are not corking), otherwise "now". Note that this message + * does NOT expire past its deadline. */ - GNUNET_SCHEDULER_TaskIdentifier dead_clean_task; + struct GNUNET_TIME_Absolute deadline; /** - * ID of task used for updating bandwidth quota for this neighbour. + * How long is the message? (number of bytes following the `struct + * MessageEntry`, but not including the size of `struct + * MessageEntry` itself!) */ - GNUNET_SCHEDULER_TaskIdentifier quota_update_task; + size_t size; + +}; + +/** + * Data kept per session. + */ +struct Session +{ /** - * At what time did we initially establish (as in, complete session - * key handshake) this connection? Should be zero if status != KEY_CONFIRMED. + * Identity of the other peer. */ - struct GNUNET_TIME_Absolute time_established; + const struct GNUNET_PeerIdentity *peer; /** - * At what time did we last receive an encrypted message from the - * other peer? Should be zero if status != KEY_CONFIRMED. + * Key exchange state for this peer. */ - struct GNUNET_TIME_Absolute last_activity; + struct GSC_KeyExchangeInfo *kx; /** - * How valueable were the messages of this peer recently? + * Head of list of requests from clients for transmission to + * this peer. */ - unsigned long long current_preference; + struct GSC_ClientActiveRequest *active_client_request_head; /** - * Number of entries in 'ats'. + * Tail of list of requests from clients for transmission to + * this peer. */ - unsigned int ats_count; + struct GSC_ClientActiveRequest *active_client_request_tail; /** - * Bit map indicating which of the 32 sequence numbers before the last - * were received (good for accepting out-of-order packets and - * estimating reliability of the connection) + * Head of list of messages ready for encryption. */ - unsigned int last_packets_bitmap; + struct SessionMessageEntry *sme_head; /** - * last sequence number received on this connection (highest) + * Tail of list of messages ready for encryption. */ - uint32_t last_sequence_number_received; + struct SessionMessageEntry *sme_tail; /** - * last sequence number transmitted + * Current type map for this peer. */ - uint32_t last_sequence_number_sent; + struct GSC_TypeMap *tmap; /** - * Available bandwidth in for this peer (current target). + * Task to transmit corked messages with a delay. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_in; + struct GNUNET_SCHEDULER_Task *cork_task; /** - * Available bandwidth out for this peer (current target). + * Task to transmit our type map. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out; + struct GNUNET_SCHEDULER_Task *typemap_task; /** - * Internal bandwidth limit set for this peer (initially typically - * set to "-1"). Actual "bw_out" is MIN of - * "bpm_out_internal_limit" and "bw_out_external_limit". + * Retransmission delay we currently use for the typemap + * transmissions (if not confirmed). */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit; + struct GNUNET_TIME_Relative typemap_delay; /** - * External bandwidth limit set for this peer by the - * peer that we are communicating with. "bw_out" is MIN of - * "bw_out_internal_limit" and "bw_out_external_limit". + * Is this the first time we're sending the typemap? If so, + * we want to send it a bit faster the second time. 0 if + * we are sending for the first time, 1 if not. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit; - + int first_typemap; }; -/** - * Map of peer identities to 'struct Session'. - */ -static struct GNUNET_CONTAINER_MultiHashMap *sessions; - +GNUNET_NETWORK_STRUCT_BEGIN /** - * Session entry for "this" peer. + * Message sent to confirm that a typemap was received. */ -static struct Session self; +struct TypeMapConfirmationMessage +{ -/** - * Sum of all preferences among all neighbours. - */ -static unsigned long long preference_sum; + /** + * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP. + */ + struct GNUNET_MessageHeader header; + /** + * Reserved, always zero. + */ + uint32_t reserved GNUNET_PACKED; -// FIXME......... + /** + * Hash of the (decompressed) type map that was received. + */ + struct GNUNET_HashCode tm_hash; -/** - * At what time should the connection to the given neighbour - * time out (given no further activity?) - * - * @param n neighbour in question - * @return absolute timeout - */ -static struct GNUNET_TIME_Absolute -get_neighbour_timeout (struct Neighbour *n) -{ - return GNUNET_TIME_absolute_add (n->last_activity, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); -} +}; - -/** - * Helper function for update_preference_sum. - */ -static int -update_preference (void *cls, const GNUNET_HashCode * key, void *value) -{ - unsigned long long *ps = cls; - struct Neighbour *n = value; - - n->current_preference /= 2; - *ps += n->current_preference; - return GNUNET_OK; -} +GNUNET_NETWORK_STRUCT_END /** - * A preference value for a neighbour was update. Update - * the preference sum accordingly. - * - * @param inc how much was a preference value increased? + * Map of peer identities to `struct Session`. */ -static void -update_preference_sum (unsigned long long inc) -{ - unsigned long long os; - - os = preference_sum; - preference_sum += inc; - if (preference_sum >= os) - return; /* done! */ - /* overflow! compensate by cutting all values in half! */ - preference_sum = 0; - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &update_preference, - &preference_sum); - GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"), - preference_sum, GNUNET_NO); -} +static struct GNUNET_CONTAINER_MultiPeerMap *sessions; /** - * Find the entry for the given neighbour. + * Find the session for the given peer. * - * @param peer identity of the neighbour + * @param peer identity of the peer * @return NULL if we are not connected, otherwise the - * neighbour's entry. + * session handle */ -static struct Neighbour * -find_neighbour (const struct GNUNET_PeerIdentity *peer) +static struct Session * +find_session (const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey); + if (NULL == sessions) + return NULL; + return GNUNET_CONTAINER_multipeermap_get (sessions, + peer); } /** - * Function called by transport telling us that a peer - * changed status. + * End the session with the given peer (we are no longer + * connected). * - * @param n the peer that changed status + * @param pid identity of peer to kill session with */ -static void -handle_peer_status_change (struct Neighbour *n) +void +GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) { - struct PeerStatusNotifyMessage *psnm; - char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct GNUNET_TRANSPORT_ATS_Information *ats; - size_t size; + struct Session *session; + struct GSC_ClientActiveRequest *car; + struct SessionMessageEntry *sme; - if ((!n->is_connected) || (n->status != PEER_STATE_KEY_CONFIRMED)) + session = find_session (pid); + if (NULL == session) return; -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%4s' changed status\n", - GNUNET_i2s (&n->peer)); -#endif - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Destroying session for peer `%s'\n", + GNUNET_i2s (session->peer)); + if (NULL != session->cork_task) { - GNUNET_break (0); - /* recovery strategy: throw away performance data */ - GNUNET_array_grow (n->ats, n->ats_count, 0); - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); + GNUNET_SCHEDULER_cancel (session->cork_task); + session->cork_task = NULL; } - psnm = (struct PeerStatusNotifyMessage *) buf; - psnm->header.size = htons (size); - psnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE); - psnm->timeout = GNUNET_TIME_absolute_hton (get_neighbour_timeout (n)); - psnm->bandwidth_in = n->bw_in; - psnm->bandwidth_out = n->bw_out; - psnm->peer = n->peer; - psnm->ats_count = htonl (n->ats_count); - ats = &psnm->ats; - memcpy (ats, n->ats, - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); - ats[n->ats_count].type = htonl (0); - ats[n->ats_count].value = htonl (0); - send_to_all_clients (&psnm->header, GNUNET_YES, - GNUNET_CORE_OPTION_SEND_STATUS_CHANGE); - GNUNET_STATISTICS_update (stats, gettext_noop ("# peer status changes"), 1, - GNUNET_NO); -} - - - -/** - * Go over our message queue and if it is not too long, go - * over the pending requests from clients for this - * neighbour and send some clients a 'READY' notification. - * - * @param n which peer to process - */ -static void -schedule_peer_messages (struct Neighbour *n) -{ - struct GSC_ClientActiveRequest *car; - struct GSC_ClientActiveRequest *pos; - struct Client *c; - struct MessageEntry *mqe; - unsigned int queue_size; - - /* check if neighbour queue is empty enough! */ - if (n != &self) + while (NULL != (car = session->active_client_request_head)) { - queue_size = 0; - mqe = n->messages; - while (mqe != NULL) - { - queue_size++; - mqe = mqe->next; - } - if (queue_size >= MAX_PEER_QUEUE_SIZE) - { -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not considering client transmission requests: queue full\n"); -#endif - return; /* queue still full */ - } - /* find highest priority request */ - pos = n->active_client_request_head; - car = NULL; - while (pos != NULL) - { - if ((car == NULL) || (pos->priority > car->priority)) - car = pos; - pos = pos->next; - } + GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, + session->active_client_request_tail, car); + GSC_CLIENTS_reject_request (car, + GNUNET_NO); } - else + while (NULL != (sme = session->sme_head)) { - car = n->active_client_request_head; + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); + GNUNET_free (sme); } - if (car == NULL) - return; /* no pending requests */ -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Permitting client transmission request to `%s'\n", - GNUNET_i2s (&n->peer)); -#endif - GSC_CLIENTS_solicite_request (car); + if (NULL != session->typemap_task) + { + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_task = NULL; + } + GSC_CLIENTS_notify_clients_about_neighbour (session->peer, + session->tmap, + NULL); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (sessions, + session->peer, + session)); + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), + GNUNET_CONTAINER_multipeermap_size (sessions), + GNUNET_NO); + GSC_TYPEMAP_destroy (session->tmap); + session->tmap = NULL; + GNUNET_free (session); } - /** - * Free the given entry for the neighbour (it has - * already been removed from the list at this point). + * Transmit our current typemap message to the other peer. + * (Done periodically until the typemap is confirmed). * - * @param n neighbour to free + * @param cls the `struct Session *` */ static void -free_neighbour (struct Neighbour *n) +transmit_typemap_task (void *cls) { - struct MessageEntry *m; - struct GSC_ClientActiveRequest *car; + struct Session *session = cls; + struct GNUNET_MessageHeader *hdr; + struct GNUNET_TIME_Relative delay; -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Destroying neighbour entry for peer `%4s'\n", - GNUNET_i2s (&n->peer)); -#endif - if (n->skm != NULL) - { - GNUNET_free (n->skm); - n->skm = NULL; - } - while (NULL != (m = n->messages)) - { - n->messages = m->next; - GNUNET_free (m); - } - while (NULL != (m = n->encrypted_head)) - { - GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m); - GNUNET_free (m); - } - while (NULL != (car = n->active_client_request_head)) - { - GNUNET_CONTAINER_DLL_remove (n->active_client_request_head, - n->active_client_request_tail, car); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (car->client->requests, - &n->peer.hashPubKey, - car)); - GNUNET_free (car); - } - if (NULL != n->th) - { - GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th); - n->th = NULL; - } - if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->retry_plaintext_task); - if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->quota_update_task); - if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->keep_alive_task); - if (n->status == PEER_STATE_KEY_CONFIRMED) - GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"), - -1, GNUNET_NO); - GNUNET_array_grow (n->ats, n->ats_count, 0); - GNUNET_free_non_null (n->pending_ping); - GNUNET_free_non_null (n->pending_pong); - GNUNET_free (n); + "Sending TYPEMAP to %s\n", + GNUNET_i2s (session->peer)); + session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay); + delay = session->typemap_delay; + /* randomize a bit to avoid spont. sync */ + delay.rel_value_us += + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + 1000 * 1000); + session->typemap_task = + GNUNET_SCHEDULER_add_delayed (delay, + &transmit_typemap_task, + session); + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop ("# type map refreshes sent"), + 1, + GNUNET_NO); + hdr = GSC_TYPEMAP_compute_type_map_message (); + GSC_KX_encrypt_and_transmit (session->kx, + hdr, + ntohs (hdr->size)); + GNUNET_free (hdr); } - /** - * Consider freeing the given neighbour since we may not need - * to keep it around anymore. + * Restart the typemap task for the given session. * - * @param n neighbour to consider discarding + * @param session session to restart typemap transmission for */ static void -consider_free_neighbour (struct Neighbour *n); - - -/** - * Task triggered when a neighbour entry might have gotten stale. - * - * @param cls the 'struct Neighbour' - * @param tc scheduler context (not used) - */ -static void -consider_free_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +start_typemap_task (struct Session *session) { - struct Neighbour *n = cls; - - n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK; - consider_free_neighbour (n); + if (NULL != session->typemap_task) + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_delay = GNUNET_TIME_UNIT_SECONDS; + session->typemap_task = + GNUNET_SCHEDULER_add_delayed (session->typemap_delay, + &transmit_typemap_task, + session); } /** - * Consider freeing the given neighbour since we may not need - * to keep it around anymore. + * Create a session, a key exchange was just completed. * - * @param n neighbour to consider discarding + * @param peer peer that is now connected + * @param kx key exchange that completed */ -static void -consider_free_neighbour (struct Neighbour *n) +void +GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, + struct GSC_KeyExchangeInfo *kx) { - struct GNUNET_TIME_Relative left; + struct Session *session; - if ((n->th != NULL) || (n->pitr != NULL) || (GNUNET_YES == n->is_connected)) - return; /* no chance */ - - left = GNUNET_TIME_absolute_get_remaining (get_neighbour_timeout (n)); - if (left.rel_value > 0) - { - if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->dead_clean_task); - n->dead_clean_task = - GNUNET_SCHEDULER_add_delayed (left, &consider_free_task, n); - return; - } - /* actually free the neighbour... */ - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (neighbours, - &n->peer.hashPubKey, n)); - GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), - GNUNET_CONTAINER_multihashmap_size (neighbours), + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating session for peer `%s'\n", + GNUNET_i2s (peer)); + session = GNUNET_new (struct Session); + session->tmap = GSC_TYPEMAP_create (); + session->peer = peer; + session->kx = kx; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (sessions, + session->peer, + session, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), + GNUNET_CONTAINER_multipeermap_size (sessions), GNUNET_NO); - free_neighbour (n); + GSC_CLIENTS_notify_clients_about_neighbour (peer, + NULL, + session->tmap); + start_typemap_task (session); } /** - * Function called when the transport service is ready to - * receive an encrypted message for the respective peer + * The other peer has indicated that he 'lost' the session + * (KX down), reinitialize the session on our end, in particular + * this means to restart the typemap transmission. * - * @param cls neighbour to use message from - * @param size number of bytes we can transmit - * @param buf where to copy the message - * @return number of bytes transmitted + * @param peer peer that is now connected */ -static size_t -notify_encrypted_transmit_ready (void *cls, size_t size, void *buf) +void +GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer) { - struct Neighbour *n = cls; - struct MessageEntry *m; - size_t ret; - char *cbuf; - - n->th = NULL; - m = n->encrypted_head; - if (m == NULL) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encrypted message queue empty, no messages added to buffer for `%4s'\n", - GNUNET_i2s (&n->peer)); -#endif - return 0; - } - GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m); - ret = 0; - cbuf = buf; - if (buf != NULL) - { - GNUNET_assert (size >= m->size); - memcpy (cbuf, &m[1], m->size); - ret = m->size; - GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Copied message of type %u and size %u into transport buffer for `%4s'\n", - (unsigned int) - ntohs (((struct GNUNET_MessageHeader *) &m[1])->type), - (unsigned int) ret, GNUNET_i2s (&n->peer)); -#endif - process_encrypted_neighbour_queue (n); - } - else + struct Session *session; + + session = find_session (peer); + if (NULL == session) { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission of message of type %u and size %u failed\n", - (unsigned int) - ntohs (((struct GNUNET_MessageHeader *) &m[1])->type), - (unsigned int) m->size); -#endif + /* KX/session is new for both sides; thus no need to restart what + has not yet begun */ + return; } - GNUNET_free (m); - consider_free_neighbour (n); - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# encrypted bytes given to transport"), ret, - GNUNET_NO); - return ret; + start_typemap_task (session); } - - - /** - * Select messages for transmission. This heuristic uses a combination - * of earliest deadline first (EDF) scheduling (with bounded horizon) - * and priority-based discard (in case no feasible schedule exist) and - * speculative optimization (defer any kind of transmission until - * we either create a batch of significant size, 25% of max, or until - * we are close to a deadline). Furthermore, when scheduling the - * heuristic also packs as many messages into the batch as possible, - * starting with those with the earliest deadline. Yes, this is fun. + * The other peer has confirmed receiving our type map, + * check if it is current and if so, stop retransmitting it. * - * @param n neighbour to select messages from - * @param size number of bytes to select for transmission - * @param retry_time set to the time when we should try again - * (only valid if this function returns zero) - * @return number of bytes selected, or 0 if we decided to - * defer scheduling overall; in that case, retry_time is set. + * @param peer peer that confirmed the type map + * @param msg confirmation message we received */ -static size_t -select_messages (struct Neighbour *n, size_t size, - struct GNUNET_TIME_Relative *retry_time) +void +GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg) { - struct MessageEntry *pos; - struct MessageEntry *min; - struct MessageEntry *last; - unsigned int min_prio; - struct GNUNET_TIME_Absolute t; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative delta; - uint64_t avail; - struct GNUNET_TIME_Relative slack; /* how long could we wait before missing deadlines? */ - size_t off; - uint64_t tsize; - unsigned int queue_size; - int discard_low_prio; - - GNUNET_assert (NULL != n->messages); - now = GNUNET_TIME_absolute_get (); - /* last entry in linked list of messages processed */ - last = NULL; - /* should we remove the entry with the lowest - * priority from consideration for scheduling at the - * end of the loop? */ - queue_size = 0; - tsize = 0; - pos = n->messages; - while (pos != NULL) + const struct TypeMapConfirmationMessage *cmsg; + struct Session *session; + + session = find_session (peer); + if (NULL == session) { - queue_size++; - tsize += pos->size; - pos = pos->next; + GNUNET_break (0); + return; } - discard_low_prio = GNUNET_YES; - while (GNUNET_YES == discard_low_prio) + if (ntohs (msg->size) != sizeof (struct TypeMapConfirmationMessage)) { - min = NULL; - min_prio = UINT_MAX; - discard_low_prio = GNUNET_NO; - /* calculate number of bytes available for transmission at time "t" */ - avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window); - t = now; - /* how many bytes have we (hypothetically) scheduled so far */ - off = 0; - /* maximum time we can wait before transmitting anything - * and still make all of our deadlines */ - slack = GNUNET_TIME_UNIT_FOREVER_REL; - pos = n->messages; - /* note that we use "*2" here because we want to look - * a bit further into the future; much more makes no - * sense since new message might be scheduled in the - * meantime... */ - while ((pos != NULL) && (off < size * 2)) - { - if (pos->do_transmit == GNUNET_YES) - { - /* already removed from consideration */ - pos = pos->next; - continue; - } - if (discard_low_prio == GNUNET_NO) - { - delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline); - if (delta.rel_value > 0) - { - // FIXME: HUH? Check! - t = pos->deadline; - avail += - GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, delta); - } - if (avail < pos->size) - { - // FIXME: HUH? Check! - discard_low_prio = GNUNET_YES; /* we could not schedule this one! */ - } - else - { - avail -= pos->size; - /* update slack, considering both its absolute deadline - * and relative deadlines caused by other messages - * with their respective load */ - slack = - GNUNET_TIME_relative_min (slack, - GNUNET_BANDWIDTH_value_get_delay_for - (n->bw_out, avail)); - if (pos->deadline.abs_value <= now.abs_value) - { - /* now or never */ - slack = GNUNET_TIME_UNIT_ZERO; - } - else if (GNUNET_YES == pos->got_slack) - { - /* should be soon now! */ - slack = - GNUNET_TIME_relative_min (slack, - GNUNET_TIME_absolute_get_remaining - (pos->slack_deadline)); - } - else - { - slack = - GNUNET_TIME_relative_min (slack, - GNUNET_TIME_absolute_get_difference - (now, pos->deadline)); - pos->got_slack = GNUNET_YES; - pos->slack_deadline = - GNUNET_TIME_absolute_min (pos->deadline, - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_MAX_CORK_DELAY)); - } - } - } - off += pos->size; - t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check! - if (pos->priority <= min_prio) - { - /* update min for discard */ - min_prio = pos->priority; - min = pos; - } - pos = pos->next; - } - if (discard_low_prio) - { - GNUNET_assert (min != NULL); - /* remove lowest-priority entry from consideration */ - min->do_transmit = GNUNET_YES; /* means: discard (for now) */ - } - last = pos; + GNUNET_break_op (0); + return; } - /* guard against sending "tiny" messages with large headers without - * urgent deadlines */ - if ((slack.rel_value > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value) && - (size > 4 * off) && (queue_size <= MAX_PEER_QUEUE_SIZE - 2)) + cmsg = (const struct TypeMapConfirmationMessage *) msg; + if (GNUNET_YES != + GSC_TYPEMAP_check_hash (&cmsg->tm_hash)) { - /* less than 25% of message would be filled with deadlines still - * being met if we delay by one second or more; so just wait for - * more data; but do not wait longer than 1s (since we don't want - * to delay messages for a really long time either). */ - *retry_time = GNUNET_CONSTANTS_MAX_CORK_DELAY; - /* reset do_transmit values for next time */ - while (pos != last) - { - pos->do_transmit = GNUNET_NO; - pos = pos->next; - } - GNUNET_STATISTICS_update (stats, + /* our typemap has changed in the meantime, do not + accept confirmation */ + GNUNET_STATISTICS_update (GSC_stats, gettext_noop - ("# transmissions delayed due to corking"), 1, - GNUNET_NO); -#if DEBUG_CORE + ("# outdated typemap confirmations received"), + 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n", - (unsigned long long) retry_time->rel_value, (unsigned int) off, - (unsigned int) size); -#endif - return 0; - } - /* select marked messages (up to size) for transmission */ - off = 0; - pos = n->messages; - while (pos != last) - { - if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO)) - { - pos->do_transmit = GNUNET_YES; /* mark for transmission */ - off += pos->size; - size -= pos->size; -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Selecting message of size %u for transmission\n", - (unsigned int) pos->size); -#endif - } - else - { -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not selecting message of size %u for transmission at this time (maximum is %u)\n", - (unsigned int) pos->size, size); -#endif - pos->do_transmit = GNUNET_NO; /* mark for not transmitting! */ - } - pos = pos->next; + "Got outdated typemap confirmated from peer `%s'\n", + GNUNET_i2s (session->peer)); + return; } -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Selected %llu/%llu bytes of %u/%u plaintext messages for transmission to `%4s'.\n", - (unsigned long long) off, (unsigned long long) tsize, queue_size, - (unsigned int) MAX_PEER_QUEUE_SIZE, GNUNET_i2s (&n->peer)); -#endif - return off; -} - - -/** - * Batch multiple messages into a larger buffer. - * - * @param n neighbour to take messages from - * @param buf target buffer - * @param size size of buf - * @param deadline set to transmission deadline for the result - * @param retry_time set to the time when we should try again - * (only valid if this function returns zero) - * @param priority set to the priority of the batch - * @return number of bytes written to buf (can be zero) - */ -static size_t -batch_message (struct Neighbour *n, char *buf, size_t size, - struct GNUNET_TIME_Absolute *deadline, - struct GNUNET_TIME_Relative *retry_time, unsigned int *priority) -{ - char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage *) ntmb; - struct MessageEntry *pos; - struct MessageEntry *prev; - struct MessageEntry *next; - size_t ret; - - ret = 0; - *priority = 0; - *deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - *retry_time = GNUNET_TIME_UNIT_FOREVER_REL; - if (0 == select_messages (n, size, retry_time)) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No messages selected, will try again in %llu ms\n", - retry_time->rel_value); -#endif - return 0; - } - ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND); - ntm->ats_count = htonl (0); - ntm->ats.type = htonl (0); - ntm->ats.value = htonl (0); - ntm->peer = n->peer; - pos = n->messages; - prev = NULL; - while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader))) + "Got typemap confirmation from peer `%s'\n", + GNUNET_i2s (session->peer)); + if (NULL != session->typemap_task) { - next = pos->next; - if (GNUNET_YES == pos->do_transmit) - { - GNUNET_assert (pos->size <= size); - /* do notifications */ - /* FIXME: track if we have *any* client that wants - * full notifications and only do this if that is - * actually true */ - if (pos->size < - GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage)) - { - memcpy (&ntm[1], &pos[1], pos->size); - ntm->header.size = - htons (sizeof (struct NotifyTrafficMessage) + - sizeof (struct GNUNET_MessageHeader)); - send_to_all_clients (&ntm->header, GNUNET_YES, - GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND); - } - else - { - /* message too large for 'full' notifications, we do at - * least the 'hdr' type */ - memcpy (&ntm[1], &pos[1], sizeof (struct GNUNET_MessageHeader)); - } - ntm->header.size = - htons (sizeof (struct NotifyTrafficMessage) + pos->size); - send_to_all_clients (&ntm->header, GNUNET_YES, - GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND); -#if DEBUG_HANDSHAKE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encrypting %u bytes with message of type %u and size %u\n", - pos->size, - (unsigned int) - ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type), - (unsigned int) - ntohs (((const struct GNUNET_MessageHeader *) - &pos[1])->size)); -#endif - /* copy for encrypted transmission */ - memcpy (&buf[ret], &pos[1], pos->size); - ret += pos->size; - size -= pos->size; - *priority += pos->priority; -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding plaintext message of size %u with deadline %llu ms to batch\n", - (unsigned int) pos->size, - (unsigned long long) - GNUNET_TIME_absolute_get_remaining (pos->deadline).rel_value); -#endif - deadline->abs_value = - GNUNET_MIN (deadline->abs_value, pos->deadline.abs_value); - GNUNET_free (pos); - if (prev == NULL) - n->messages = next; - else - prev->next = next; - } - else - { - prev = pos; - } - pos = next; + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_task = NULL; } -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Deadline for message batch is %llu ms\n", - GNUNET_TIME_absolute_get_remaining (*deadline).rel_value); -#endif - return ret; + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop + ("# valid typemap confirmations received"), + 1, GNUNET_NO); } /** - * Remove messages with deadlines that have long expired from - * the queue. + * Notify the given client about the session (client is new). * - * @param n neighbour to inspect + * @param cls the `struct GSC_Client` + * @param key peer identity + * @param value the `struct Session` + * @return #GNUNET_OK (continue to iterate) */ -static void -discard_expired_messages (struct Neighbour *n) +static int +notify_client_about_session (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { - struct MessageEntry *prev; - struct MessageEntry *next; - struct MessageEntry *pos; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative delta; - int disc; - unsigned int queue_length; + struct GSC_Client *client = cls; + struct Session *session = value; - disc = GNUNET_NO; - now = GNUNET_TIME_absolute_get (); - prev = NULL; - queue_length = 0; - pos = n->messages; - while (pos != NULL) - { - queue_length++; - next = pos->next; - delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now); - if (delta.rel_value > PAST_EXPIRATION_DISCARD_TIME.rel_value) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Message is %llu ms past due, discarding.\n", - delta.rel_value); -#endif - if (prev == NULL) - n->messages = next; - else - prev->next = next; - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# messages discarded (expired prior to transmission)"), - 1, GNUNET_NO); - disc = GNUNET_YES; - GNUNET_free (pos); - } - else - prev = pos; - pos = next; - } - if ( (GNUNET_YES == disc) && - (queue_length == MAX_PEER_QUEUE_SIZE) ) - schedule_peer_messages (n); + GSC_CLIENTS_notify_client_about_neighbour (client, + session->peer, + NULL, /* old TMAP: none */ + session->tmap); + return GNUNET_OK; } /** - * Signature of the main function of a task. + * We have a new client, notify it about all current sessions. * - * @param cls closure - * @param tc context information (why was this task triggered now) + * @param client the new client */ -static void -retry_plaintext_processing (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +void +GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) { - struct Neighbour *n = cls; - - n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK; - process_plaintext_neighbour_queue (n); + /* notify new client about existing sessions */ + GNUNET_CONTAINER_multipeermap_iterate (sessions, + ¬ify_client_about_session, + client); } /** - * Check if we have plaintext messages for the specified neighbour - * pending, and if so, consider batching and encrypting them (and - * then trigger processing of the encrypted queue if needed). + * Try to perform a transmission on the given session. Will solicit + * additional messages if the 'sme' queue is not full enough. * - * @param n neighbour to check. + * @param session session to transmit messages from */ static void -process_plaintext_neighbour_queue (struct Neighbour *n) -{ - char pbuf[GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE + sizeof (struct EncryptedMessage)]; /* plaintext */ - size_t used; - struct EncryptedMessage *em; /* encrypted message */ - struct EncryptedMessage *ph; /* plaintext header */ - struct MessageEntry *me; - unsigned int priority; - struct GNUNET_TIME_Absolute deadline; - struct GNUNET_TIME_Relative retry_time; - struct GNUNET_CRYPTO_AesInitializationVector iv; - struct GNUNET_CRYPTO_AuthKey auth_key; - - if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (n->retry_plaintext_task); - n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK; - } - switch (n->status) - { - case PEER_STATE_DOWN: - send_key (n); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to `%4s', deferring processing of plaintext messages.\n", - GNUNET_i2s (&n->peer)); -#endif - return; - case PEER_STATE_KEY_SENT: - if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK) - n->retry_set_key_task = - GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency, - &set_key_retry_task, n); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to `%4s', deferring processing of plaintext messages.\n", - GNUNET_i2s (&n->peer)); -#endif - return; - case PEER_STATE_KEY_RECEIVED: - if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK) - n->retry_set_key_task = - GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency, - &set_key_retry_task, n); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to `%4s', deferring processing of plaintext messages.\n", - GNUNET_i2s (&n->peer)); -#endif - return; - case PEER_STATE_KEY_CONFIRMED: - /* ready to continue */ - break; - } - discard_expired_messages (n); - if (n->messages == NULL) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Plaintext message queue for `%4s' is empty.\n", - GNUNET_i2s (&n->peer)); -#endif - return; /* no pending messages */ - } - if (n->encrypted_head != NULL) - { -#if DEBUG_CORE > 2 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n", - GNUNET_i2s (&n->peer)); -#endif - return; /* wait for messages already encrypted to be - * processed first! */ - } - ph = (struct EncryptedMessage *) pbuf; - deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - priority = 0; - used = sizeof (struct EncryptedMessage); - used += - batch_message (n, &pbuf[used], - GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE, &deadline, - &retry_time, &priority); - if (used == sizeof (struct EncryptedMessage)) - { -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No messages selected for transmission to `%4s' at this time, will try again later.\n", - GNUNET_i2s (&n->peer)); -#endif - /* no messages selected for sending, try again later... */ - n->retry_plaintext_task = - GNUNET_SCHEDULER_add_delayed (retry_time, &retry_plaintext_processing, - n); - return; - } - GSC_KX_encrypt_and_transmit (n->kx, - &pbuf[struct EncryptedMessage], - used - sizeof (struct EncryptedMessage)); - schedule_peer_messages (n); -} - - +try_transmission (struct Session *session); /** - * Check if we have encrypted messages for the specified neighbour - * pending, and if so, check with the transport about sending them - * out. + * Queue a request from a client for transmission to a particular peer. * - * @param n neighbour to check. + * @param car request to queue; this handle is then shared between + * the caller (CLIENTS subsystem) and SESSIONS and must not + * be released by either until either #GSC_SESSIONS_dequeue(), + * #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed() + * have been invoked on it */ -static void -process_encrypted_neighbour_queue (struct Neighbour *n) +void +GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) { - struct MessageEntry *m; + struct Session *session; - if (n->th != NULL) - return; /* request already pending */ - if (GNUNET_YES != n->is_connected) - { - GNUNET_break (0); - return; - } - m = n->encrypted_head; - if (m == NULL) + session = find_session (&car->target); + if (NULL == session) { - /* encrypted queue empty, try plaintext instead */ - process_plaintext_neighbour_queue (n); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropped client request for transmission (am disconnected)\n"); + GNUNET_break (0); /* should have been rejected earlier */ + GSC_CLIENTS_reject_request (car, + GNUNET_NO); return; } -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n", - (unsigned int) m->size, GNUNET_i2s (&n->peer), - (unsigned long long) - GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value); -#endif - n->th = - GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size, - m->priority, - GNUNET_TIME_absolute_get_remaining - (m->deadline), - ¬ify_encrypted_transmit_ready, - n); - if (n->th == NULL) + if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { - /* message request too large or duplicate request */ GNUNET_break (0); - /* discard encrypted message */ - GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m); - GNUNET_free (m); - process_encrypted_neighbour_queue (n); + GSC_CLIENTS_reject_request (car, + GNUNET_YES); + return; } -} - - -/** - * Initialize a new 'struct Neighbour'. - * - * @param pid ID of the new neighbour - * @return handle for the new neighbour - */ -static struct Neighbour * -create_neighbour (const struct GNUNET_PeerIdentity *pid) -{ - struct Neighbour *n; - struct GNUNET_TIME_Absolute now; - -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating neighbour entry for peer `%4s'\n", GNUNET_i2s (pid)); -#endif - n = GNUNET_malloc (sizeof (struct Neighbour)); - n->peer = *pid; - GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key); - now = GNUNET_TIME_absolute_get (); - n->encrypt_key_created = now; - n->last_activity = now; - n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY; - n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; - n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; - n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init (UINT32_MAX); - n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; - n->ping_challenge = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (neighbours, - &n->peer.hashPubKey, n, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), - GNUNET_CONTAINER_multihashmap_size (neighbours), - GNUNET_NO); - neighbour_quota_update (n, NULL); - consider_free_neighbour (n); - return n; + "Received client transmission request. queueing\n"); + GNUNET_CONTAINER_DLL_insert_tail (session->active_client_request_head, + session->active_client_request_tail, + car); + try_transmission (session); } - /** - * We have a new client, notify it about all current sessions. + * Dequeue a request from a client from transmission to a particular peer. * - * @param client the new client + * @param car request to dequeue; this handle will then be 'owned' by + * the caller (CLIENTS sysbsystem) */ void -GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) +GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) { - /* notify new client about existing neighbours */ - GNUNET_CONTAINER_multihashmap_iterate (neighbours, - ¬ify_client_about_neighbour, client); + struct Session *session; + + if (0 == + memcmp (&car->target, + &GSC_my_identity, + sizeof (struct GNUNET_PeerIdentity))) + return; + session = find_session (&car->target); + GNUNET_assert (NULL != session); + GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, + session->active_client_request_tail, + car); + /* dequeueing of 'high' priority messages may unblock + transmission for lower-priority messages, so we also + need to try in this case. */ + try_transmission (session); } /** - * Queue a request from a client for transmission to a particular peer. + * Solicit messages for transmission, starting with those of the highest + * priority. * - * @param car request to queue; this handle is then shared between - * the caller (CLIENTS subsystem) and SESSIONS and must not - * be released by either until either 'GNUNET_SESSIONS_dequeue', - * 'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed' - * have been invoked on it + * @param session session to solict messages for + * @param msize how many bytes do we have already */ -void -GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) +static void +solicit_messages (struct Session *session, + size_t msize) { - struct Neighbour *n; // FIXME: session... + struct GSC_ClientActiveRequest *car; + struct GSC_ClientActiveRequest *nxt; + size_t so_size; + enum GNUNET_CORE_Priority pmax; - n = find_neighbour (&car->peer); - if ((n == NULL) || (GNUNET_YES != n->is_connected) || - (n->status != PEER_STATE_KEY_CONFIRMED)) + so_size = msize; + pmax = GNUNET_CORE_PRIO_BACKGROUND; + for (car = session->active_client_request_head; NULL != car; car = car->next) { - /* neighbour must have disconnected since request was issued, - * ignore (client will realize it once it processes the - * disconnect notification) */ -#if DEBUG_CORE_CLIENT + if (GNUNET_YES == car->was_solicited) + continue; + pmax = GNUNET_MAX (pmax, car->priority); + } + nxt = session->active_client_request_head; + while (NULL != (car = nxt)) + { + nxt = car->next; + if (car->priority < pmax) + continue; + if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) + break; + so_size += car->msize; + if (GNUNET_YES == car->was_solicited) + continue; + car->was_solicited = GNUNET_YES; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropped client request for transmission (am disconnected)\n"); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# send requests dropped (disconnected)"), 1, - GNUNET_NO); - GSC_CLIENTS_reject_requests (car); - return; + "Soliciting message with priority %u\n", + car->priority); + GSC_CLIENTS_solicit_request (car); + /* The above call may *dequeue* requests and thereby + clobber 'nxt'. Hence we need to restart from the + head of the list. */ + nxt = session->active_client_request_head; + so_size = msize; } -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received client transmission request. queueing\n"); -#endif - GNUNET_CONTAINER_DLL_insert (n->active_client_request_head, - n->active_client_request_tail, car); - - // schedule_peer_messages (n); } /** - * Dequeue a request from a client from transmission to a particular peer. + * Some messages were delayed (corked), but the timeout has now expired. + * Send them now. * - * @param car request to dequeue; this handle will then be 'owned' by - * the caller (CLIENTS sysbsystem) + * @param cls `struct Session` with the messages to transmit now */ -void -GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) +static void +pop_cork_task (void *cls) { - struct Session *s; + struct Session *session = cls; - s = find_session (&car->peer); - GNUNET_CONTAINER_DLL_remove (s->active_client_request_head, - s->active_client_request_tail, car); + session->cork_task = NULL; + try_transmission (session); } - /** - * Transmit a message to a particular peer. + * Try to perform a transmission on the given session. Will solicit + * additional messages if the 'sme' queue is not full enough or has + * only low-priority messages. * - * @param car original request that was queued and then solicited; - * this handle will now be 'owned' by the SESSIONS subsystem - * @param msg message to transmit + * @param session session to transmit messages from */ -void -GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg) +static void +try_transmission (struct Session *session) { - struct MessageEntry *prev; - struct MessageEntry *pos; - struct MessageEntry *e; - struct MessageEntry *min_prio_entry; - struct MessageEntry *min_prio_prev; - unsigned int min_prio; - unsigned int queue_size; - - n = find_neighbour (&sm->peer); - if ((n == NULL) || (GNUNET_YES != n->is_connected) || - (n->status != PEER_STATE_KEY_CONFIRMED)) + struct SessionMessageEntry *pos; + size_t msize; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Absolute min_deadline; + enum GNUNET_CORE_Priority maxp; + enum GNUNET_CORE_Priority maxpc; + struct GSC_ClientActiveRequest *car; + int excess; + + msize = 0; + min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; + /* if the peer has excess bandwidth, background traffic is allowed, + otherwise not */ + if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <= + GSC_NEIGHBOURS_get_queue_length (session->kx)) { - /* attempt to send message to peer that is not connected anymore - * (can happen due to asynchrony) */ - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# messages discarded (disconnected)"), 1, - GNUNET_NO); - if (client != NULL) - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission queue already very long, waiting...\n"); + return; /* queue already too long */ } -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n", - "SEND", (unsigned int) msize, GNUNET_i2s (&sm->peer)); -#endif - discard_expired_messages (n); - /* bound queue size */ - /* NOTE: this entire block to bound the queue size should be - * obsolete with the new client-request code and the - * 'schedule_peer_messages' mechanism; we still have this code in - * here for now as a sanity check for the new mechanmism; - * ultimately, we should probably simply reject SEND messages that - * are not 'approved' (or provide a new core API for very unreliable - * delivery that always sends with priority 0). Food for thought. */ - min_prio = UINT32_MAX; - min_prio_entry = NULL; - min_prio_prev = NULL; - queue_size = 0; - prev = NULL; - pos = n->messages; - while (pos != NULL) + excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx); + if (GNUNET_YES == excess) + maxp = GNUNET_CORE_PRIO_BACKGROUND; + else + maxp = GNUNET_CORE_PRIO_BEST_EFFORT; + /* determine highest priority of 'ready' messages we already solicited from clients */ + pos = session->sme_head; + while ((NULL != pos) && + (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)) { - if (pos->priority <= min_prio) - { - min_prio_entry = pos; - min_prio_prev = prev; - min_prio = pos->priority; - } - queue_size++; - prev = pos; + GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); + msize += pos->size; + maxp = GNUNET_MAX (maxp, pos->priority); + min_deadline = GNUNET_TIME_absolute_min (min_deadline, + pos->deadline); pos = pos->next; } - if (queue_size >= MAX_PEER_QUEUE_SIZE) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calculating transmission set with %u priority (%s) and %s earliest deadline\n", + maxp, + (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + + if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL) { - /* queue full */ - if (ntohl (sm->priority) <= min_prio) + /* if highest already solicited priority from clients is not critical, + check if there are higher-priority messages to be solicited from clients */ + if (GNUNET_YES == excess) + maxpc = GNUNET_CORE_PRIO_BACKGROUND; + else + maxpc = GNUNET_CORE_PRIO_BEST_EFFORT; + for (car = session->active_client_request_head; NULL != car; car = car->next) { - /* discard new entry; this should no longer happen! */ - GNUNET_break (0); -#if DEBUG_CORE + if (GNUNET_YES == car->was_solicited) + continue; + maxpc = GNUNET_MAX (maxpc, + car->priority); + } + if (maxpc > maxp) + { + /* we have messages waiting for solicitation that have a higher + priority than those that we already accepted; solicit the + high-priority messages first */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n", - queue_size, (unsigned int) MAX_PEER_QUEUE_SIZE, - (unsigned int) msize, (unsigned int) ntohs (message->type)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# discarded CORE_SEND requests"), - 1, GNUNET_NO); - - if (client != NULL) - GNUNET_SERVER_receive_done (client, GNUNET_OK); + "Soliciting messages based on priority (%u > %u)\n", + maxpc, + maxp); + solicit_messages (session, 0); return; } - GNUNET_assert (min_prio_entry != NULL); - /* discard "min_prio_entry" */ -#if DEBUG_CORE + } + else + { + /* never solicit more, we have critical messages to process */ + excess = GNUNET_NO; + maxpc = GNUNET_CORE_PRIO_BACKGROUND; + } + now = GNUNET_TIME_absolute_get (); + if ( ( (GNUNET_YES == excess) || + (maxpc >= GNUNET_CORE_PRIO_BEST_EFFORT) ) && + ( (0 == msize) || + ( (msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) && + (min_deadline.abs_value_us > now.abs_value_us))) ) + { + /* not enough ready yet (tiny message & cork possible), or no messages at all, + and either excess bandwidth or best-effort or higher message waiting at + client; in this case, we try to solicit more */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queue full, discarding existing older request\n"); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# discarded lower priority CORE_SEND requests"), - 1, GNUNET_NO); - if (min_prio_prev == NULL) - n->messages = min_prio_entry->next; + "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n", + excess, + maxpc, + (unsigned int) msize, + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + solicit_messages (session, + msize); + if (msize > 0) + { + /* if there is data to send, just not yet, make sure we do transmit + * it once the deadline is reached */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Corking until %s\n", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + if (NULL != session->cork_task) + GNUNET_SCHEDULER_cancel (session->cork_task); + session->cork_task + = GNUNET_SCHEDULER_add_at (min_deadline, + &pop_cork_task, + session); + } else - min_prio_prev->next = min_prio_entry->next; - GNUNET_free (min_prio_entry); + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queue empty, waiting for solicitations\n"); + } + return; } - -#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding transmission request for `%4s' of size %u to queue\n", - GNUNET_i2s (&sm->peer), (unsigned int) msize); -#endif - e = GNUNET_malloc (sizeof (struct MessageEntry) + msize); - e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline); - e->priority = ntohl (sm->priority); - e->size = msize; - if (GNUNET_YES != (int) ntohl (sm->cork)) - e->got_slack = GNUNET_YES; - memcpy (&e[1], &sm[1], msize); - - /* insert, keep list sorted by deadline */ - prev = NULL; - pos = n->messages; - while ((pos != NULL) && (pos->deadline.abs_value < e->deadline.abs_value)) + "Building combined plaintext buffer to transmit message!\n"); + /* create plaintext buffer of all messages (that fit), encrypt and + transmit */ { - prev = pos; - pos = pos->next; + static unsigned long long total_bytes; + static unsigned int total_msgs; + char pbuf[msize]; /* plaintext */ + size_t used; + + used = 0; + while ( (NULL != (pos = session->sme_head)) && + (used + pos->size <= msize) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding message of type %d (%d/%d) to payload for %s\n", + ntohs (((const struct GNUNET_MessageHeader *)&pos[1])->type), + pos->is_typemap, + pos->is_typemap_confirm, + GNUNET_i2s (session->peer)); + GNUNET_memcpy (&pbuf[used], + &pos[1], + pos->size); + used += pos->size; + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + pos); + GNUNET_free (pos); + } + /* compute average payload size */ + total_bytes += used; + total_msgs++; + if (0 == total_msgs) + { + /* 2^32 messages, wrap around... */ + total_msgs = 1; + total_bytes = used; + } + GNUNET_STATISTICS_set (GSC_stats, + "# avg payload per encrypted message", + total_bytes / total_msgs, + GNUNET_NO); + /* now actually transmit... */ + session->ready_to_transmit = GNUNET_NO; + GSC_KX_encrypt_and_transmit (session->kx, + pbuf, + used); } - if (prev == NULL) - n->messages = e; - else - prev->next = e; - e->next = pos; - - /* consider scheduling now */ - process_plaintext_neighbour_queue (n); - } - /** - * Send a message to the neighbour. + * Send an updated typemap message to the neighbour now, + * and restart typemap transmissions. * * @param cls the message * @param key neighbour's identity - * @param value 'struct Neighbour' of the target - * @return always GNUNET_OK + * @param value `struct Neighbour` of the target + * @return always #GNUNET_OK */ static int -do_send_message (void *cls, const GNUNET_HashCode * key, void *value) +do_restart_typemap_message (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { - struct GNUNET_MessageHeader *hdr = cls; - struct Neighbour *n = value; - struct MessageEntry *m; + const struct GNUNET_MessageHeader *hdr = cls; + struct Session *session = value; + struct SessionMessageEntry *sme; uint16_t size; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Restarting sending TYPEMAP to %s\n", + GNUNET_i2s (session->peer)); size = ntohs (hdr->size); - m = GNUNET_malloc (sizeof (struct MessageEntry) + size); - memcpy (&m[1], hdr, size); - m->deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - m->slack_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - m->priority = UINT_MAX; - m->sender_status = n->status; - m->size = size; - GNUNET_CONTAINER_DLL_insert (n->message_head, - n->message_tail, - m); + for (sme = session->sme_head; NULL != sme; sme = sme->next) + { + if (GNUNET_YES == sme->is_typemap) + { + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); + GNUNET_free (sme); + break; + } + } + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); + sme->is_typemap = GNUNET_YES; + GNUNET_memcpy (&sme[1], + hdr, + size); + sme->size = size; + sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + GNUNET_CONTAINER_DLL_insert (session->sme_head, + session->sme_tail, + sme); + try_transmission (session); + start_typemap_task (session); return GNUNET_OK; } /** - * Broadcast a message to all neighbours. + * Broadcast an updated typemap message to all neighbours. + * Restarts the retransmissions until the typemaps are confirmed. * * @param msg message to transmit */ void -GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg) +GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg) { if (NULL == sessions) return; - GNUNET_CONTAINER_multihashmap_iterate (sessions, - &do_send_message, msg); -} - - -/** - * Helper function for GSC_SESSIONS_handle_client_iterate_peers. - * - * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies - * @param key identity of the connected peer - * @param value the 'struct Neighbour' for the peer - * @return GNUNET_OK (continue to iterate) - */ -static int -queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value) -{ - struct GNUNET_SERVER_TransmitContext *tc = cls; - struct Neighbour *n = value; - char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct GNUNET_TRANSPORT_ATS_Information *ats; - size_t size; - struct ConnectNotifyMessage *cnm; - - cnm = (struct ConnectNotifyMessage *) buf; - if (n->status != PEER_STATE_KEY_CONFIRMED) - return GNUNET_OK; - size = - sizeof (struct ConnectNotifyMessage) + - (n->ats_count) * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - /* recovery strategy: throw away performance data */ - GNUNET_array_grow (n->ats, n->ats_count, 0); - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - } - cnm = (struct ConnectNotifyMessage *) buf; - cnm->header.size = htons (size); - cnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); - cnm->ats_count = htonl (n->ats_count); - ats = &cnm->ats; - memcpy (ats, n->ats, - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); - ats[n->ats_count].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR); - ats[n->ats_count].value = htonl (0); -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n", - "NOTIFY_CONNECT"); -#endif - cnm->peer = n->peer; - GNUNET_SERVER_transmit_context_append_message (tc, &cnm->header); - return GNUNET_OK; -} - - -/** - * End the session with the given peer (we are no longer - * connected). - * - * @param pid identity of peer to kill session with - */ -void -GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) -{ + GNUNET_CONTAINER_multipeermap_iterate (sessions, + &do_restart_typemap_message, + (void *) msg); } @@ -1480,214 +874,202 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) void GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid) { + struct Session *session; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transport solicits for %s\n", + GNUNET_i2s (pid)); + session = find_session (pid); + if (NULL == session) + return; + try_transmission (session); } /** * Transmit a message to a particular peer. * - * @param car original request that was queued and then solicited, - * ownership does not change (dequeue will be called soon). + * @param car original request that was queued and then solicited; + * this handle will now be 'owned' by the SESSIONS subsystem * @param msg message to transmit + * @param cork is corking allowed? + * @param priority how important is this message */ void GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg) + const struct GNUNET_MessageHeader *msg, + int cork, + enum GNUNET_CORE_Priority priority) { -} + struct Session *session; + struct SessionMessageEntry *sme; + struct SessionMessageEntry *pos; + size_t msize; - -/** - * We have a new client, notify it about all current sessions. - * - * @param client the new client - */ -void -GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) -{ + session = find_session (&car->target); + if (NULL == session) + return; + msize = ntohs (msg->size); + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize); + GNUNET_memcpy (&sme[1], + msg, + msize); + sme->size = msize; + sme->priority = priority; + if (GNUNET_YES == cork) + { + sme->deadline = + GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Mesage corked, delaying transmission\n"); + } + pos = session->sme_head; + while ( (NULL != pos) && + (pos->priority >= sme->priority) ) + pos = pos->next; + if (NULL == pos) + GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, + session->sme_tail, + sme); + else + GNUNET_CONTAINER_DLL_insert_after (session->sme_head, + session->sme_tail, + pos->prev, + sme); + try_transmission (session); } /** - * Handle CORE_ITERATE_PEERS request. For this request type, the client - * does not have to have transmitted an INIT request. All current peers - * are returned, regardless of which message types they accept. + * We have received a typemap message from a peer, update ours. + * Notifies clients about the session. * - * @param cls unused - * @param client client sending the iteration request - * @param message iteration request message + * @param peer peer this is about + * @param msg typemap update message */ void -GSC_SESSIONS_handle_client_iterate_peers (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_MessageHeader done_msg; - struct GNUNET_SERVER_TransmitContext *tc; - - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &queue_connect_message, - tc); - done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); - GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); + struct Session *session; + struct GSC_TypeMap *nmap; + struct SessionMessageEntry *sme; + struct TypeMapConfirmationMessage *tmc; + + nmap = GSC_TYPEMAP_get_from_message (msg); + if (NULL == nmap) + { + GNUNET_break_op (0); + return; /* malformed */ + } + session = find_session (peer); + if (NULL == session) + { + GSC_TYPEMAP_destroy (nmap); + GNUNET_break (0); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received TYPEMAP from %s\n", + GNUNET_i2s (session->peer)); + for (sme = session->sme_head; NULL != sme; sme = sme->next) + { + if (GNUNET_YES == sme->is_typemap_confirm) + { + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); + GNUNET_free (sme); + break; + } + } + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + + sizeof (struct TypeMapConfirmationMessage)); + sme->deadline = GNUNET_TIME_absolute_get (); + sme->size = sizeof (struct TypeMapConfirmationMessage); + sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + sme->is_typemap_confirm = GNUNET_YES; + tmc = (struct TypeMapConfirmationMessage *) &sme[1]; + tmc->header.size = htons (sizeof (struct TypeMapConfirmationMessage)); + tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP); + tmc->reserved = htonl (0); + GSC_TYPEMAP_hash (nmap, + &tmc->tm_hash); + GNUNET_CONTAINER_DLL_insert (session->sme_head, + session->sme_tail, + sme); + try_transmission (session); + GSC_CLIENTS_notify_clients_about_neighbour (peer, + session->tmap, + nmap); + GSC_TYPEMAP_destroy (session->tmap); + session->tmap = nmap; } /** - * Handle CORE_PEER_CONNECTED request. Notify client about connection - * to the given neighbour. For this request type, the client does not - * have to have transmitted an INIT request. All current peers are - * returned, regardless of which message types they accept. + * The given peer send a message of the specified type. Make sure the + * respective bit is set in its type-map and that clients are notified + * about the session. * - * @param cls unused - * @param client client sending the iteration request - * @param message iteration request message + * @param peer peer this is about + * @param type type of the message */ void -GSC_SESSIONS_handle_client_have_peer (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer, + uint16_t type) { - struct GNUNET_MessageHeader done_msg; - struct GNUNET_SERVER_TransmitContext *tc; - const struct GNUNET_PeerIdentity *peer; + struct Session *session; + struct GSC_TypeMap *nmap; - peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK! - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_CONTAINER_multihashmap_get_multiple (neighbours, &peer->hashPubKey, - &queue_connect_message, tc); - done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); - GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); + if (0 == memcmp (peer, + &GSC_my_identity, + sizeof (struct GNUNET_PeerIdentity))) + return; + session = find_session (peer); + GNUNET_assert (NULL != session); + if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, + &type, 1)) + return; /* already in it */ + nmap = GSC_TYPEMAP_extend (session->tmap, + &type, + 1); + GSC_CLIENTS_notify_clients_about_neighbour (peer, + session->tmap, + nmap); + GSC_TYPEMAP_destroy (session->tmap); + session->tmap = nmap; } - /** - * Handle REQUEST_INFO request. For this request type, the client must - * have transmitted an INIT first. - * - * @param cls unused - * @param client client sending the request - * @param message iteration request message + * Initialize sessions subsystem. */ void -GSC_SESSIONS_handle_client_request_info (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +GSC_SESSIONS_init () { - const struct RequestInfoMessage *rcm; - struct GSC_Client *pos; - struct Neighbour *n; - struct ConfigurationInfoMessage cim; - int32_t want_reserv; - int32_t got_reserv; - unsigned long long old_preference; - struct GNUNET_TIME_Relative rdelay; - - rdelay = GNUNET_TIME_relative_get_zero (); -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Core service receives `%s' request.\n", - "REQUEST_INFO"); -#endif - rcm = (const struct RequestInfoMessage *) message; - n = find_neighbour (&rcm->peer); - memset (&cim, 0, sizeof (cim)); - if ((n != NULL) && (GNUNET_YES == n->is_connected)) - { - want_reserv = ntohl (rcm->reserve_inbound); - if (n->bw_out_internal_limit.value__ != rcm->limit_outbound.value__) - { - n->bw_out_internal_limit = rcm->limit_outbound; - if (n->bw_out.value__ != - GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit, - n->bw_out_external_limit).value__) - { - n->bw_out = - GNUNET_BANDWIDTH_value_min (n->bw_out_internal_limit, - n->bw_out_external_limit); - GNUNET_BANDWIDTH_tracker_update_quota (&n->available_recv_window, - n->bw_out); - GNUNET_TRANSPORT_set_quota (transport, &n->peer, n->bw_in, n->bw_out); - handle_peer_status_change (n); - } - } - if (want_reserv < 0) - { - got_reserv = want_reserv; - } - else if (want_reserv > 0) - { - rdelay = - GNUNET_BANDWIDTH_tracker_get_delay (&n->available_recv_window, - want_reserv); - if (rdelay.rel_value == 0) - got_reserv = want_reserv; - else - got_reserv = 0; /* all or nothing */ - } - else - got_reserv = 0; - GNUNET_BANDWIDTH_tracker_consume (&n->available_recv_window, got_reserv); - old_preference = n->current_preference; - n->current_preference += GNUNET_ntohll (rcm->preference_change); - if (old_preference > n->current_preference) - { - /* overflow; cap at maximum value */ - n->current_preference = ULLONG_MAX; - } - update_preference_sum (n->current_preference - old_preference); -#if DEBUG_CORE_QUOTA - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received reservation request for %d bytes for peer `%4s', reserved %d bytes, suggesting delay of %llu ms\n", - (int) want_reserv, GNUNET_i2s (&rcm->peer), (int) got_reserv, - (unsigned long long) rdelay.rel_value); -#endif - cim.reserved_amount = htonl (got_reserv); - cim.reserve_delay = GNUNET_TIME_relative_hton (rdelay); - cim.bw_out = n->bw_out; - cim.preference = n->current_preference; - } - else - { - /* Technically, this COULD happen (due to asynchronous behavior), - * but it should be rare, so we should generate an info event - * to help diagnosis of serious errors that might be masked by this */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Client asked for preference change with peer `%s', which is not connected!\n"), - GNUNET_i2s (&rcm->peer)); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - cim.header.size = htons (sizeof (struct ConfigurationInfoMessage)); - cim.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIGURATION_INFO); - cim.peer = rcm->peer; - cim.rim_id = rcm->rim_id; -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending `%s' message to client.\n", - "CONFIGURATION_INFO"); -#endif - GSC_CLIENTS_send_to_client (client, &cim.header, GNUNET_NO); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + sessions = GNUNET_CONTAINER_multipeermap_create (128, + GNUNET_YES); } - - - /** - * Initialize sessions subsystem. + * Helper function for #GSC_SESSIONS_done() to free all + * active sessions. + * + * @param cls NULL + * @param key identity of the connected peer + * @param value the `struct Session` for the peer + * @return #GNUNET_OK (continue to iterate) */ -int -GSC_SESSIONS_init () +static int +free_session_helper (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { - neighbours = GNUNET_CONTAINER_multihashmap_create (128); - self.public_key = &my_public_key; - self.peer = my_identity; - self.last_activity = GNUNET_TIME_UNIT_FOREVER_ABS; - self.status = PEER_STATE_KEY_CONFIRMED; - self.is_connected = GNUNET_YES; + /* struct Session *session = value; */ + + GSC_SESSIONS_end (key); return GNUNET_OK; } @@ -1698,10 +1080,14 @@ GSC_SESSIONS_init () void GSC_SESSIONS_done () { - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper, - NULL); - GNUNET_CONTAINER_multihashmap_destroy (neighbours); - neighbours = NULL; - GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), - 0, GNUNET_NO); + if (NULL != sessions) + { + GNUNET_CONTAINER_multipeermap_iterate (sessions, + &free_session_helper, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (sessions); + sessions = NULL; + } } + +/* end of gnunet-service-core_sessions.c */