X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcore%2Fgnunet-service-core_sessions.c;h=1f697cf160d2d04e2cc2a1d5839808db0c21c67f;hb=b0c7119fa2f43fe1b5978651152974359de5a5d2;hp=b0f4a5f42346065f6cf4c700e5b332d398015afa;hpb=bef8aa69e425a0d6b30c892c8d1a2be9a49eecb5;p=oweals%2Fgnunet.git diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c index b0f4a5f42..1f697cf16 100644 --- a/src/core/gnunet-service-core_sessions.c +++ b/src/core/gnunet-service-core_sessions.c @@ -19,127 +19,123 @@ */ /** - * @file core/gnunet-service-core_neighbours.c - * @brief code for managing of 'encrypted' sessions (key exchange done) + * @file core/gnunet-service-core_sessions.c + * @brief code for managing of 'encrypted' sessions (key exchange done) * @author Christian Grothoff */ #include "platform.h" -#include "gnunet_service_core.h" -#include "gnunet_service_core_neighbours.h" -#include "gnunet_service_core_kx.h" -#include "gnunet_service_core_sessions.h" +#include "gnunet-service-core.h" +#include "gnunet-service-core_neighbours.h" +#include "gnunet-service-core_kx.h" +#include "gnunet-service-core_typemap.h" +#include "gnunet-service-core_sessions.h" +#include "gnunet-service-core_clients.h" +#include "gnunet_constants.h" /** - * Record kept for each request for transmission issued by a - * client that is still pending. + * How often do we transmit our typemap? */ -struct ClientActiveRequest; +#define TYPEMAP_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) + /** - * Data kept per session. + * Message ready for encryption. This struct is followed by the + * actual content of the message. */ -struct Session +struct SessionMessageEntry { - /** - * Identity of the other peer. - */ - struct GNUNET_PeerIdentity peer; /** - * Head of list of requests from clients for transmission to - * this peer. + * We keep messages in a doubly linked list. */ - struct ClientActiveRequest *active_client_request_head; + struct SessionMessageEntry *next; /** - * Tail of list of requests from clients for transmission to - * this peer. + * We keep messages in a doubly linked list. */ - struct ClientActiveRequest *active_client_request_tail; + struct SessionMessageEntry *prev; /** - * Performance data for the peer. + * Deadline for transmission, 1s after we received it (if we + * are not corking), otherwise "now". Note that this message + * does NOT expire past its deadline. */ - struct GNUNET_TRANSPORT_ATS_Information *ats; + struct GNUNET_TIME_Absolute deadline; /** - * Information about the key exchange with the other peer. + * How long is the message? (number of bytes following the "struct + * MessageEntry", but not including the size of "struct + * MessageEntry" itself!) */ - struct GSC_KeyExchangeInfo *kxinfo; - + size_t size; - /** - * ID of task used for cleaning up dead neighbour entries. - */ - GNUNET_SCHEDULER_TaskIdentifier dead_clean_task; +}; - /** - * ID of task used for updating bandwidth quota for this neighbour. - */ - GNUNET_SCHEDULER_TaskIdentifier quota_update_task; +/** + * Data kept per session. + */ +struct Session +{ /** - * At what time did we initially establish (as in, complete session - * key handshake) this connection? Should be zero if status != KEY_CONFIRMED. + * Identity of the other peer. */ - struct GNUNET_TIME_Absolute time_established; + struct GNUNET_PeerIdentity peer; /** - * At what time did we last receive an encrypted message from the - * other peer? Should be zero if status != KEY_CONFIRMED. + * Head of list of requests from clients for transmission to + * this peer. */ - struct GNUNET_TIME_Absolute last_activity; + struct GSC_ClientActiveRequest *active_client_request_head; /** - * How valueable were the messages of this peer recently? + * Tail of list of requests from clients for transmission to + * this peer. */ - unsigned long long current_preference; + struct GSC_ClientActiveRequest *active_client_request_tail; /** - * Number of entries in 'ats'. + * Head of list of messages ready for encryption. */ - unsigned int ats_count; + struct SessionMessageEntry *sme_head; /** - * Bit map indicating which of the 32 sequence numbers before the last - * were received (good for accepting out-of-order packets and - * estimating reliability of the connection) + * Tail of list of messages ready for encryption. */ - unsigned int last_packets_bitmap; + struct SessionMessageEntry *sme_tail; /** - * last sequence number received on this connection (highest) + * Information about the key exchange with the other peer. */ - uint32_t last_sequence_number_received; + struct GSC_KeyExchangeInfo *kxinfo; /** - * last sequence number transmitted + * Current type map for this peer. */ - uint32_t last_sequence_number_sent; + struct GSC_TypeMap *tmap; /** - * Available bandwidth in for this peer (current target). + * At what time did we initially establish this session? + * (currently unused, should be integrated with ATS in the + * future...). */ - struct GNUNET_BANDWIDTH_Value32NBO bw_in; + struct GNUNET_TIME_Absolute time_established; /** - * Available bandwidth out for this peer (current target). + * Task to transmit corked messages with a delay. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out; + GNUNET_SCHEDULER_TaskIdentifier cork_task; /** - * Internal bandwidth limit set for this peer (initially typically - * set to "-1"). Actual "bw_out" is MIN of - * "bpm_out_internal_limit" and "bw_out_external_limit". + * Task to transmit our type map. */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out_internal_limit; + GNUNET_SCHEDULER_TaskIdentifier typemap_task; /** - * External bandwidth limit set for this peer by the - * peer that we are communicating with. "bw_out" is MIN of - * "bw_out_internal_limit" and "bw_out_external_limit". + * Is the neighbour queue empty and thus ready for us + * to transmit an encrypted message? */ - struct GNUNET_BANDWIDTH_Value32NBO bw_out_external_limit; + int ready_to_transmit; }; @@ -151,1236 +147,680 @@ static struct GNUNET_CONTAINER_MultiHashMap *sessions; /** - * Session entry for "this" peer. + * Find the session for the given peer. + * + * @param peer identity of the peer + * @return NULL if we are not connected, otherwise the + * session handle */ -static struct Session self; +static struct Session * +find_session (const struct GNUNET_PeerIdentity *peer) +{ + return GNUNET_CONTAINER_multihashmap_get (sessions, &peer->hashPubKey); +} + /** - * Sum of all preferences among all neighbours. + * End the session with the given peer (we are no longer + * connected). + * + * @param pid identity of peer to kill session with */ -static unsigned long long preference_sum; +void +GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) +{ + struct Session *session; + struct GSC_ClientActiveRequest *car; + struct SessionMessageEntry *sme; + session = find_session (pid); + if (NULL == session) + return; +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying session for peer `%4s'\n", + GNUNET_i2s (&session->peer)); +#endif + if (GNUNET_SCHEDULER_NO_TASK != session->cork_task) + { + GNUNET_SCHEDULER_cancel (session->cork_task); + session->cork_task = GNUNET_SCHEDULER_NO_TASK; + } + while (NULL != (car = session->active_client_request_head)) + { + GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, + session->active_client_request_tail, car); + GSC_CLIENTS_reject_request (car); + } + while (NULL != (sme = session->sme_head)) + { + GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme); + GNUNET_free (sme); + } + GNUNET_SCHEDULER_cancel (session->typemap_task); + GSC_CLIENTS_notify_clients_about_neighbour (&session->peer, NULL, + 0 /* FIXME: ATSI */ , + session->tmap, NULL); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (sessions, + &session-> + peer.hashPubKey, + session)); + GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# entries in session map"), + GNUNET_CONTAINER_multihashmap_size (sessions), + GNUNET_NO); + GSC_TYPEMAP_destroy (session->tmap); + session->tmap = NULL; + GNUNET_free (session); +} -// FIXME......... /** - * At what time should the connection to the given neighbour - * time out (given no further activity?) + * Transmit our current typemap message to the other peer. + * (Done periodically in case an update got lost). * - * @param n neighbour in question - * @return absolute timeout + * @param cls the 'struct Session*' + * @param tc unused */ -static struct GNUNET_TIME_Absolute -get_neighbour_timeout (struct Neighbour *n) +static void +transmit_typemap_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - return GNUNET_TIME_absolute_add (n->last_activity, - GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + struct Session *session = cls; + struct GNUNET_MessageHeader *hdr; + struct GNUNET_TIME_Relative delay; + + delay = TYPEMAP_FREQUENCY; + /* randomize a bit to avoid spont. sync */ + delay.rel_value += + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000); + session->typemap_task = + GNUNET_SCHEDULER_add_delayed (delay, &transmit_typemap_task, session); + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop ("# type map refreshes sent"), 1, + GNUNET_NO); + hdr = GSC_TYPEMAP_compute_type_map_message (); + GSC_KX_encrypt_and_transmit (session->kxinfo, hdr, ntohs (hdr->size)); + GNUNET_free (hdr); } /** - * Helper function for update_preference_sum. + * Create a session, a key exchange was just completed. + * + * @param peer peer that is now connected + * @param kx key exchange that completed */ -static int -update_preference (void *cls, const GNUNET_HashCode * key, void *value) +void +GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, + struct GSC_KeyExchangeInfo *kx) { - unsigned long long *ps = cls; - struct Neighbour *n = value; + struct Session *session; - n->current_preference /= 2; - *ps += n->current_preference; - return GNUNET_OK; +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating session for peer `%4s'\n", + GNUNET_i2s (peer)); +#endif + session = GNUNET_malloc (sizeof (struct Session)); + session->tmap = GSC_TYPEMAP_create (); + session->peer = *peer; + session->kxinfo = kx; + session->time_established = GNUNET_TIME_absolute_get (); + session->typemap_task = + GNUNET_SCHEDULER_add_now (&transmit_typemap_task, session); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (sessions, &peer->hashPubKey, + session, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# entries in session map"), + GNUNET_CONTAINER_multihashmap_size (sessions), + GNUNET_NO); + GSC_CLIENTS_notify_clients_about_neighbour (peer, NULL, 0 /* FIXME: ATSI */ , + NULL, session->tmap); } /** - * A preference value for a neighbour was update. Update - * the preference sum accordingly. + * Notify the given client about the session (client is new). * - * @param inc how much was a preference value increased? + * @param cls the 'struct GSC_Client' + * @param key peer identity + * @param value the 'struct Session' + * @return GNUNET_OK (continue to iterate) */ -static void -update_preference_sum (unsigned long long inc) +static int +notify_client_about_session (void *cls, const GNUNET_HashCode * key, + void *value) { - unsigned long long os; - - os = preference_sum; - preference_sum += inc; - if (preference_sum >= os) - return; /* done! */ - /* overflow! compensate by cutting all values in half! */ - preference_sum = 0; - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &update_preference, - &preference_sum); - GNUNET_STATISTICS_set (stats, gettext_noop ("# total peer preference"), - preference_sum, GNUNET_NO); + struct GSC_Client *client = cls; + struct Session *session = value; + + GSC_CLIENTS_notify_client_about_neighbour (client, &session->peer, NULL, 0, /* FIXME: ATS!? */ + NULL, /* old TMAP: none */ + session->tmap); + return GNUNET_OK; } /** - * Find the entry for the given neighbour. + * We have a new client, notify it about all current sessions. * - * @param peer identity of the neighbour - * @return NULL if we are not connected, otherwise the - * neighbour's entry. + * @param client the new client */ -static struct Neighbour * -find_neighbour (const struct GNUNET_PeerIdentity *peer) +void +GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) { - return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey); + /* notify new client about existing sessions */ + GNUNET_CONTAINER_multihashmap_iterate (sessions, ¬ify_client_about_session, + client); } /** - * Function called by transport telling us that a peer - * changed status. + * Try to perform a transmission on the given session. Will solicit + * additional messages if the 'sme' queue is not full enough. * - * @param n the peer that changed status + * @param session session to transmit messages from */ static void -handle_peer_status_change (struct Neighbour *n) -{ - struct PeerStatusNotifyMessage *psnm; - char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct GNUNET_TRANSPORT_ATS_Information *ats; - size_t size; - - if ((!n->is_connected) || (n->status != PEER_STATE_KEY_CONFIRMED)) - return; -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%4s' changed status\n", - GNUNET_i2s (&n->peer)); -#endif - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - /* recovery strategy: throw away performance data */ - GNUNET_array_grow (n->ats, n->ats_count, 0); - size = - sizeof (struct PeerStatusNotifyMessage) + - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information); - } - psnm = (struct PeerStatusNotifyMessage *) buf; - psnm->header.size = htons (size); - psnm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_STATUS_CHANGE); - psnm->timeout = GNUNET_TIME_absolute_hton (get_neighbour_timeout (n)); - psnm->bandwidth_in = n->bw_in; - psnm->bandwidth_out = n->bw_out; - psnm->peer = n->peer; - psnm->ats_count = htonl (n->ats_count); - ats = &psnm->ats; - memcpy (ats, n->ats, - n->ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information)); - ats[n->ats_count].type = htonl (0); - ats[n->ats_count].value = htonl (0); - send_to_all_clients (&psnm->header, GNUNET_YES, - GNUNET_CORE_OPTION_SEND_STATUS_CHANGE); - GNUNET_STATISTICS_update (stats, gettext_noop ("# peer status changes"), 1, - GNUNET_NO); -} - +try_transmission (struct Session *session); /** - * Go over our message queue and if it is not too long, go - * over the pending requests from clients for this - * neighbour and send some clients a 'READY' notification. + * Queue a request from a client for transmission to a particular peer. * - * @param n which peer to process + * @param car request to queue; this handle is then shared between + * the caller (CLIENTS subsystem) and SESSIONS and must not + * be released by either until either 'GNUNET_SESSIONS_dequeue', + * 'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed' + * have been invoked on it */ -static void -schedule_peer_messages (struct Neighbour *n) +void +GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) { - struct SendMessageReady smr; - struct ClientActiveRequest *car; - struct ClientActiveRequest *pos; - struct Client *c; - struct MessageEntry *mqe; - unsigned int queue_size; - - /* check if neighbour queue is empty enough! */ - if (n != &self) + struct Session *session; + + session = find_session (&car->target); + if (session == NULL) { - queue_size = 0; - mqe = n->messages; - while (mqe != NULL) - { - queue_size++; - mqe = mqe->next; - } - if (queue_size >= MAX_PEER_QUEUE_SIZE) - { -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not considering client transmission requests: queue full\n"); +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dropped client request for transmission (am disconnected)\n"); #endif - return; /* queue still full */ - } - /* find highest priority request */ - pos = n->active_client_request_head; - car = NULL; - while (pos != NULL) - { - if ((car == NULL) || (pos->priority > car->priority)) - car = pos; - pos = pos->next; - } + GNUNET_break (0); /* should have been rejected earlier */ + GSC_CLIENTS_reject_request (car); + return; } - else + if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { - car = n->active_client_request_head; + GNUNET_break (0); + GSC_CLIENTS_reject_request (car); + return; } - if (car == NULL) - return; /* no pending requests */ -#if DEBUG_CORE_CLIENT +#if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Permitting client transmission request to `%s'\n", - GNUNET_i2s (&n->peer)); + "Received client transmission request. queueing\n"); #endif - GSC_CLIENTS_solicite_request (car); + GNUNET_CONTAINER_DLL_insert (session->active_client_request_head, + session->active_client_request_tail, car); + try_transmission (session); } - /** - * Free the given entry for the neighbour (it has - * already been removed from the list at this point). + * Dequeue a request from a client from transmission to a particular peer. * - * @param n neighbour to free + * @param car request to dequeue; this handle will then be 'owned' by + * the caller (CLIENTS sysbsystem) */ -static void -free_neighbour (struct Neighbour *n) +void +GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) { - struct MessageEntry *m; - struct ClientActiveRequest *car; + struct Session *s; -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Destroying neighbour entry for peer `%4s'\n", - GNUNET_i2s (&n->peer)); -#endif - if (n->skm != NULL) - { - GNUNET_free (n->skm); - n->skm = NULL; - } - while (NULL != (m = n->messages)) - { - n->messages = m->next; - GNUNET_free (m); - } - while (NULL != (m = n->encrypted_head)) - { - GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m); - GNUNET_free (m); - } - while (NULL != (car = n->active_client_request_head)) - { - GNUNET_CONTAINER_DLL_remove (n->active_client_request_head, - n->active_client_request_tail, car); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (car->client->requests, - &n->peer.hashPubKey, - car)); - GNUNET_free (car); - } - if (NULL != n->th) - { - GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th); - n->th = NULL; - } - if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->retry_plaintext_task); - if (n->quota_update_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->quota_update_task); - if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->keep_alive_task); - if (n->status == PEER_STATE_KEY_CONFIRMED) - GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"), - -1, GNUNET_NO); - GNUNET_array_grow (n->ats, n->ats_count, 0); - GNUNET_free_non_null (n->pending_ping); - GNUNET_free_non_null (n->pending_pong); - GNUNET_free (n); + if (0 == + memcmp (&car->target, &GSC_my_identity, + sizeof (struct GNUNET_PeerIdentity))) + return; + s = find_session (&car->target); + GNUNET_assert (NULL != s); + GNUNET_CONTAINER_DLL_remove (s->active_client_request_head, + s->active_client_request_tail, car); } - -/** - * Consider freeing the given neighbour since we may not need - * to keep it around anymore. - * - * @param n neighbour to consider discarding - */ -static void -consider_free_neighbour (struct Neighbour *n); - - /** - * Task triggered when a neighbour entry might have gotten stale. + * Discard all expired active transmission requests from clients. * - * @param cls the 'struct Neighbour' - * @param tc scheduler context (not used) + * @param session session to clean up */ static void -consider_free_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +discard_expired_requests (struct Session *session) { - struct Neighbour *n = cls; + struct GSC_ClientActiveRequest *pos; + struct GSC_ClientActiveRequest *nxt; + struct GNUNET_TIME_Absolute now; - n->dead_clean_task = GNUNET_SCHEDULER_NO_TASK; - consider_free_neighbour (n); + now = GNUNET_TIME_absolute_get (); + pos = NULL; + nxt = session->active_client_request_head; + while (NULL != nxt) + { + pos = nxt; + nxt = pos->next; + if ((pos->deadline.abs_value < now.abs_value) && + (GNUNET_YES != pos->was_solicited)) + { + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop + ("# messages discarded (expired prior to transmission)"), + 1, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, + session->active_client_request_tail, pos); + GSC_CLIENTS_reject_request (pos); + } + } } /** - * Consider freeing the given neighbour since we may not need - * to keep it around anymore. + * Solicit messages for transmission. * - * @param n neighbour to consider discarding + * @param session session to solict messages for */ static void -consider_free_neighbour (struct Neighbour *n) +solicit_messages (struct Session *session) { - struct GNUNET_TIME_Relative left; - - if ((n->th != NULL) || (n->pitr != NULL) || (GNUNET_YES == n->is_connected)) - return; /* no chance */ - - left = GNUNET_TIME_absolute_get_remaining (get_neighbour_timeout (n)); - if (left.rel_value > 0) - { - if (n->dead_clean_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->dead_clean_task); - n->dead_clean_task = - GNUNET_SCHEDULER_add_delayed (left, &consider_free_task, n); - return; + struct GSC_ClientActiveRequest *car; + struct GSC_ClientActiveRequest *nxt; + size_t so_size; + + discard_expired_requests (session); + so_size = 0; + nxt = session->active_client_request_head; + while (NULL != (car = nxt)) + { + nxt = car->next; + if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) + break; + so_size += car->msize; + if (car->was_solicited == GNUNET_YES) + continue; + car->was_solicited = GNUNET_YES; + GSC_CLIENTS_solicit_request (car); } - /* actually free the neighbour... */ - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (neighbours, - &n->peer.hashPubKey, n)); - GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), - GNUNET_CONTAINER_multihashmap_size (neighbours), - GNUNET_NO); - free_neighbour (n); } /** - * Function called when the transport service is ready to - * receive an encrypted message for the respective peer + * Some messages were delayed (corked), but the timeout has now expired. + * Send them now. * - * @param cls neighbour to use message from - * @param size number of bytes we can transmit - * @param buf where to copy the message - * @return number of bytes transmitted + * @param cls 'struct Session' with the messages to transmit now + * @param tc scheduler context (unused) */ -static size_t -notify_encrypted_transmit_ready (void *cls, size_t size, void *buf) +static void +pop_cork_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct Neighbour *n = cls; - struct MessageEntry *m; - size_t ret; - char *cbuf; - - n->th = NULL; - m = n->encrypted_head; - if (m == NULL) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encrypted message queue empty, no messages added to buffer for `%4s'\n", - GNUNET_i2s (&n->peer)); -#endif - return 0; - } - GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m); - ret = 0; - cbuf = buf; - if (buf != NULL) - { - GNUNET_assert (size >= m->size); - memcpy (cbuf, &m[1], m->size); - ret = m->size; - GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Copied message of type %u and size %u into transport buffer for `%4s'\n", - (unsigned int) - ntohs (((struct GNUNET_MessageHeader *) &m[1])->type), - (unsigned int) ret, GNUNET_i2s (&n->peer)); -#endif - process_encrypted_neighbour_queue (n); - } - else - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission of message of type %u and size %u failed\n", - (unsigned int) - ntohs (((struct GNUNET_MessageHeader *) &m[1])->type), - (unsigned int) m->size); -#endif - } - GNUNET_free (m); - consider_free_neighbour (n); - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# encrypted bytes given to transport"), ret, - GNUNET_NO); - return ret; -} - - + struct Session *session = cls; + session->cork_task = GNUNET_SCHEDULER_NO_TASK; + try_transmission (session); +} /** - * Select messages for transmission. This heuristic uses a combination - * of earliest deadline first (EDF) scheduling (with bounded horizon) - * and priority-based discard (in case no feasible schedule exist) and - * speculative optimization (defer any kind of transmission until - * we either create a batch of significant size, 25% of max, or until - * we are close to a deadline). Furthermore, when scheduling the - * heuristic also packs as many messages into the batch as possible, - * starting with those with the earliest deadline. Yes, this is fun. + * Try to perform a transmission on the given session. Will solicit + * additional messages if the 'sme' queue is not full enough. * - * @param n neighbour to select messages from - * @param size number of bytes to select for transmission - * @param retry_time set to the time when we should try again - * (only valid if this function returns zero) - * @return number of bytes selected, or 0 if we decided to - * defer scheduling overall; in that case, retry_time is set. + * @param session session to transmit messages from */ -static size_t -select_messages (struct Neighbour *n, size_t size, - struct GNUNET_TIME_Relative *retry_time) +static void +try_transmission (struct Session *session) { - struct MessageEntry *pos; - struct MessageEntry *min; - struct MessageEntry *last; - unsigned int min_prio; - struct GNUNET_TIME_Absolute t; + struct SessionMessageEntry *pos; + size_t msize; struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative delta; - uint64_t avail; - struct GNUNET_TIME_Relative slack; /* how long could we wait before missing deadlines? */ - size_t off; - uint64_t tsize; - unsigned int queue_size; - int discard_low_prio; - - GNUNET_assert (NULL != n->messages); - now = GNUNET_TIME_absolute_get (); - /* last entry in linked list of messages processed */ - last = NULL; - /* should we remove the entry with the lowest - * priority from consideration for scheduling at the - * end of the loop? */ - queue_size = 0; - tsize = 0; - pos = n->messages; - while (pos != NULL) - { - queue_size++; - tsize += pos->size; + struct GNUNET_TIME_Absolute min_deadline; + + if (GNUNET_YES != session->ready_to_transmit) + return; + msize = 0; + min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; + /* check 'ready' messages */ + pos = session->sme_head; + while ((NULL != pos) && + (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)) + { + GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); + msize += pos->size; + min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline); pos = pos->next; } - discard_low_prio = GNUNET_YES; - while (GNUNET_YES == discard_low_prio) - { - min = NULL; - min_prio = UINT_MAX; - discard_low_prio = GNUNET_NO; - /* calculate number of bytes available for transmission at time "t" */ - avail = GNUNET_BANDWIDTH_tracker_get_available (&n->available_send_window); - t = now; - /* how many bytes have we (hypothetically) scheduled so far */ - off = 0; - /* maximum time we can wait before transmitting anything - * and still make all of our deadlines */ - slack = GNUNET_TIME_UNIT_FOREVER_REL; - pos = n->messages; - /* note that we use "*2" here because we want to look - * a bit further into the future; much more makes no - * sense since new message might be scheduled in the - * meantime... */ - while ((pos != NULL) && (off < size * 2)) - { - if (pos->do_transmit == GNUNET_YES) - { - /* already removed from consideration */ - pos = pos->next; - continue; - } - if (discard_low_prio == GNUNET_NO) - { - delta = GNUNET_TIME_absolute_get_difference (t, pos->deadline); - if (delta.rel_value > 0) - { - // FIXME: HUH? Check! - t = pos->deadline; - avail += - GNUNET_BANDWIDTH_value_get_available_until (n->bw_out, delta); - } - if (avail < pos->size) - { - // FIXME: HUH? Check! - discard_low_prio = GNUNET_YES; /* we could not schedule this one! */ - } - else - { - avail -= pos->size; - /* update slack, considering both its absolute deadline - * and relative deadlines caused by other messages - * with their respective load */ - slack = - GNUNET_TIME_relative_min (slack, - GNUNET_BANDWIDTH_value_get_delay_for - (n->bw_out, avail)); - if (pos->deadline.abs_value <= now.abs_value) - { - /* now or never */ - slack = GNUNET_TIME_UNIT_ZERO; - } - else if (GNUNET_YES == pos->got_slack) - { - /* should be soon now! */ - slack = - GNUNET_TIME_relative_min (slack, - GNUNET_TIME_absolute_get_remaining - (pos->slack_deadline)); - } - else - { - slack = - GNUNET_TIME_relative_min (slack, - GNUNET_TIME_absolute_get_difference - (now, pos->deadline)); - pos->got_slack = GNUNET_YES; - pos->slack_deadline = - GNUNET_TIME_absolute_min (pos->deadline, - GNUNET_TIME_relative_to_absolute - (GNUNET_CONSTANTS_MAX_CORK_DELAY)); - } - } - } - off += pos->size; - t = GNUNET_TIME_absolute_max (pos->deadline, t); // HUH? Check! - if (pos->priority <= min_prio) - { - /* update min for discard */ - min_prio = pos->priority; - min = pos; - } - pos = pos->next; - } - if (discard_low_prio) - { - GNUNET_assert (min != NULL); - /* remove lowest-priority entry from consideration */ - min->do_transmit = GNUNET_YES; /* means: discard (for now) */ - } - last = pos; - } - /* guard against sending "tiny" messages with large headers without - * urgent deadlines */ - if ((slack.rel_value > GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value) && - (size > 4 * off) && (queue_size <= MAX_PEER_QUEUE_SIZE - 2)) + now = GNUNET_TIME_absolute_get (); + if ((msize == 0) || + ((msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) && + (min_deadline.abs_value > now.abs_value))) { - /* less than 25% of message would be filled with deadlines still - * being met if we delay by one second or more; so just wait for - * more data; but do not wait longer than 1s (since we don't want - * to delay messages for a really long time either). */ - *retry_time = GNUNET_CONSTANTS_MAX_CORK_DELAY; - /* reset do_transmit values for next time */ - while (pos != last) + /* not enough ready yet, try to solicit more */ + solicit_messages (session); + if (msize > 0) { - pos->do_transmit = GNUNET_NO; - pos = pos->next; + /* if there is data to send, just not yet, make sure we do transmit + * it once the deadline is reached */ + if (session->cork_task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (session->cork_task); + session->cork_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining + (min_deadline), &pop_cork_task, + session); } - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# transmissions delayed due to corking"), 1, - GNUNET_NO); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Deferring transmission for %llums due to underfull message buffer size (%u/%u)\n", - (unsigned long long) retry_time->rel_value, (unsigned int) off, - (unsigned int) size); -#endif - return 0; + return; } - /* select marked messages (up to size) for transmission */ - off = 0; - pos = n->messages; - while (pos != last) + /* create plaintext buffer of all messages, encrypt and transmit */ { - if ((pos->size <= size) && (pos->do_transmit == GNUNET_NO)) + static unsigned long long total_bytes; + static unsigned int total_msgs; + char pbuf[msize]; /* plaintext */ + size_t used; + + used = 0; + while ((NULL != (pos = session->sme_head)) && (used + pos->size <= msize)) { - pos->do_transmit = GNUNET_YES; /* mark for transmission */ - off += pos->size; - size -= pos->size; -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Selecting message of size %u for transmission\n", - (unsigned int) pos->size); -#endif + memcpy (&pbuf[used], &pos[1], pos->size); + used += pos->size; + GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, pos); + GNUNET_free (pos); } - else + /* compute average payload size */ + total_bytes += used; + total_msgs++; + if (0 == total_msgs) { -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not selecting message of size %u for transmission at this time (maximum is %u)\n", - (unsigned int) pos->size, size); -#endif - pos->do_transmit = GNUNET_NO; /* mark for not transmitting! */ + /* 2^32 messages, wrap around... */ + total_msgs = 1; + total_bytes = used; } - pos = pos->next; + GNUNET_STATISTICS_set (GSC_stats, "# avg payload per encrypted message", + total_bytes / total_msgs, GNUNET_NO); + /* now actually transmit... */ + session->ready_to_transmit = GNUNET_NO; + GSC_KX_encrypt_and_transmit (session->kxinfo, pbuf, used); } -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Selected %llu/%llu bytes of %u/%u plaintext messages for transmission to `%4s'.\n", - (unsigned long long) off, (unsigned long long) tsize, queue_size, - (unsigned int) MAX_PEER_QUEUE_SIZE, GNUNET_i2s (&n->peer)); -#endif - return off; } /** - * Batch multiple messages into a larger buffer. + * Send a message to the neighbour now. * - * @param n neighbour to take messages from - * @param buf target buffer - * @param size size of buf - * @param deadline set to transmission deadline for the result - * @param retry_time set to the time when we should try again - * (only valid if this function returns zero) - * @param priority set to the priority of the batch - * @return number of bytes written to buf (can be zero) + * @param cls the message + * @param key neighbour's identity + * @param value 'struct Neighbour' of the target + * @return always GNUNET_OK */ -static size_t -batch_message (struct Neighbour *n, char *buf, size_t size, - struct GNUNET_TIME_Absolute *deadline, - struct GNUNET_TIME_Relative *retry_time, unsigned int *priority) +static int +do_send_message (void *cls, const GNUNET_HashCode * key, void *value) { - char ntmb[GNUNET_SERVER_MAX_MESSAGE_SIZE - 1]; - struct NotifyTrafficMessage *ntm = (struct NotifyTrafficMessage *) ntmb; - struct MessageEntry *pos; - struct MessageEntry *prev; - struct MessageEntry *next; - size_t ret; - - ret = 0; - *priority = 0; - *deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - *retry_time = GNUNET_TIME_UNIT_FOREVER_REL; - if (0 == select_messages (n, size, retry_time)) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No messages selected, will try again in %llu ms\n", - retry_time->rel_value); -#endif - return 0; - } - ntm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND); - ntm->ats_count = htonl (0); - ntm->ats.type = htonl (0); - ntm->ats.value = htonl (0); - ntm->peer = n->peer; - pos = n->messages; - prev = NULL; - while ((pos != NULL) && (size >= sizeof (struct GNUNET_MessageHeader))) - { - next = pos->next; - if (GNUNET_YES == pos->do_transmit) - { - GNUNET_assert (pos->size <= size); - /* do notifications */ - /* FIXME: track if we have *any* client that wants - * full notifications and only do this if that is - * actually true */ - if (pos->size < - GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct NotifyTrafficMessage)) - { - memcpy (&ntm[1], &pos[1], pos->size); - ntm->header.size = - htons (sizeof (struct NotifyTrafficMessage) + - sizeof (struct GNUNET_MessageHeader)); - send_to_all_clients (&ntm->header, GNUNET_YES, - GNUNET_CORE_OPTION_SEND_HDR_OUTBOUND); - } - else - { - /* message too large for 'full' notifications, we do at - * least the 'hdr' type */ - memcpy (&ntm[1], &pos[1], sizeof (struct GNUNET_MessageHeader)); - } - ntm->header.size = - htons (sizeof (struct NotifyTrafficMessage) + pos->size); - send_to_all_clients (&ntm->header, GNUNET_YES, - GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND); -#if DEBUG_HANDSHAKE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encrypting %u bytes with message of type %u and size %u\n", - pos->size, - (unsigned int) - ntohs (((const struct GNUNET_MessageHeader *) &pos[1])->type), - (unsigned int) - ntohs (((const struct GNUNET_MessageHeader *) - &pos[1])->size)); -#endif - /* copy for encrypted transmission */ - memcpy (&buf[ret], &pos[1], pos->size); - ret += pos->size; - size -= pos->size; - *priority += pos->priority; -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding plaintext message of size %u with deadline %llu ms to batch\n", - (unsigned int) pos->size, - (unsigned long long) - GNUNET_TIME_absolute_get_remaining (pos->deadline).rel_value); -#endif - deadline->abs_value = - GNUNET_MIN (deadline->abs_value, pos->deadline.abs_value); - GNUNET_free (pos); - if (prev == NULL) - n->messages = next; - else - prev->next = next; - } - else - { - prev = pos; - } - pos = next; - } -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Deadline for message batch is %llu ms\n", - GNUNET_TIME_absolute_get_remaining (*deadline).rel_value); -#endif - return ret; + const struct GNUNET_MessageHeader *hdr = cls; + struct Session *session = value; + struct SessionMessageEntry *m; + uint16_t size; + + size = ntohs (hdr->size); + m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); + memcpy (&m[1], hdr, size); + m->size = size; + GNUNET_CONTAINER_DLL_insert (session->sme_head, session->sme_tail, m); + try_transmission (session); + return GNUNET_OK; } /** - * Remove messages with deadlines that have long expired from - * the queue. + * Broadcast a message to all neighbours. * - * @param n neighbour to inspect + * @param msg message to transmit */ -static void -discard_expired_messages (struct Neighbour *n) +void +GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg) { - struct MessageEntry *prev; - struct MessageEntry *next; - struct MessageEntry *pos; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative delta; - int disc; - unsigned int queue_length; - - disc = GNUNET_NO; - now = GNUNET_TIME_absolute_get (); - prev = NULL; - queue_length = 0; - pos = n->messages; - while (pos != NULL) - { - queue_length++; - next = pos->next; - delta = GNUNET_TIME_absolute_get_difference (pos->deadline, now); - if (delta.rel_value > PAST_EXPIRATION_DISCARD_TIME.rel_value) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Message is %llu ms past due, discarding.\n", - delta.rel_value); -#endif - if (prev == NULL) - n->messages = next; - else - prev->next = next; - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# messages discarded (expired prior to transmission)"), - 1, GNUNET_NO); - disc = GNUNET_YES; - GNUNET_free (pos); - } - else - prev = pos; - pos = next; - } - if ( (GNUNET_YES == disc) && - (queue_length == MAX_PEER_QUEUE_SIZE) ) - schedule_peer_messages (n); + if (NULL == sessions) + return; + GNUNET_CONTAINER_multihashmap_iterate (sessions, &do_send_message, + (void *) msg); } /** - * Signature of the main function of a task. + * Traffic is being solicited for the given peer. This means that the + * message queue on the transport-level (NEIGHBOURS subsystem) is now + * empty and it is now OK to transmit another (non-control) message. * - * @param cls closure - * @param tc context information (why was this task triggered now) + * @param pid identity of peer ready to receive data */ -static void -retry_plaintext_processing (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +void +GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid) { - struct Neighbour *n = cls; + struct Session *session; - n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK; - process_plaintext_neighbour_queue (n); + session = find_session (pid); + if (NULL == session) + return; + session->ready_to_transmit = GNUNET_YES; + try_transmission (session); } /** - * Check if we have plaintext messages for the specified neighbour - * pending, and if so, consider batching and encrypting them (and - * then trigger processing of the encrypted queue if needed). + * Transmit a message to a particular peer. * - * @param n neighbour to check. + * @param car original request that was queued and then solicited; + * this handle will now be 'owned' by the SESSIONS subsystem + * @param msg message to transmit + * @param cork is corking allowed? */ -static void -process_plaintext_neighbour_queue (struct Neighbour *n) +void +GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, + const struct GNUNET_MessageHeader *msg, int cork) { - char pbuf[GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE + sizeof (struct EncryptedMessage)]; /* plaintext */ - size_t used; - struct EncryptedMessage *em; /* encrypted message */ - struct EncryptedMessage *ph; /* plaintext header */ - struct MessageEntry *me; - unsigned int priority; - struct GNUNET_TIME_Absolute deadline; - struct GNUNET_TIME_Relative retry_time; - struct GNUNET_CRYPTO_AesInitializationVector iv; - struct GNUNET_CRYPTO_AuthKey auth_key; + struct Session *session; + struct SessionMessageEntry *sme; + size_t msize; - if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (n->retry_plaintext_task); - n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK; - } - switch (n->status) - { - case PEER_STATE_DOWN: - send_key (n); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to `%4s', deferring processing of plaintext messages.\n", - GNUNET_i2s (&n->peer)); -#endif - return; - case PEER_STATE_KEY_SENT: - if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK) - n->retry_set_key_task = - GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency, - &set_key_retry_task, n); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to `%4s', deferring processing of plaintext messages.\n", - GNUNET_i2s (&n->peer)); -#endif - return; - case PEER_STATE_KEY_RECEIVED: - if (n->retry_set_key_task == GNUNET_SCHEDULER_NO_TASK) - n->retry_set_key_task = - GNUNET_SCHEDULER_add_delayed (n->set_key_retry_frequency, - &set_key_retry_task, n); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not yet connected to `%4s', deferring processing of plaintext messages.\n", - GNUNET_i2s (&n->peer)); -#endif + session = find_session (&car->target); + if (NULL == session) return; - case PEER_STATE_KEY_CONFIRMED: - /* ready to continue */ - break; - } - discard_expired_messages (n); - if (n->messages == NULL) - { -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Plaintext message queue for `%4s' is empty.\n", - GNUNET_i2s (&n->peer)); -#endif - return; /* no pending messages */ - } - if (n->encrypted_head != NULL) - { -#if DEBUG_CORE > 2 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Encrypted message queue for `%4s' is still full, delaying plaintext processing.\n", - GNUNET_i2s (&n->peer)); -#endif - return; /* wait for messages already encrypted to be - * processed first! */ - } - ph = (struct EncryptedMessage *) pbuf; - deadline = GNUNET_TIME_UNIT_FOREVER_ABS; - priority = 0; - used = sizeof (struct EncryptedMessage); - used += - batch_message (n, &pbuf[used], - GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE, &deadline, - &retry_time, &priority); - if (used == sizeof (struct EncryptedMessage)) - { -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "No messages selected for transmission to `%4s' at this time, will try again later.\n", - GNUNET_i2s (&n->peer)); -#endif - /* no messages selected for sending, try again later... */ - n->retry_plaintext_task = - GNUNET_SCHEDULER_add_delayed (retry_time, &retry_plaintext_processing, - n); - return; - } - GSC_KX_encrypt_and_transmit (n->kx, - &pbuf[struct EncryptedMessage], - used - sizeof (struct EncryptedMessage)); - schedule_peer_messages (n); + msize = ntohs (msg->size); + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize); + memcpy (&sme[1], msg, msize); + sme->size = msize; + if (GNUNET_YES == cork) + sme->deadline = + GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); + GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, sme); + try_transmission (session); } - - /** - * Check if we have encrypted messages for the specified neighbour - * pending, and if so, check with the transport about sending them - * out. + * Helper function for GSC_SESSIONS_handle_client_iterate_peers. * - * @param n neighbour to check. + * @param cls the 'struct GNUNET_SERVER_TransmitContext' to queue replies + * @param key identity of the connected peer + * @param value the 'struct Neighbour' for the peer + * @return GNUNET_OK (continue to iterate) */ -static void -process_encrypted_neighbour_queue (struct Neighbour *n) +#include "core.h" +static int +queue_connect_message (void *cls, const GNUNET_HashCode * key, void *value) { - struct MessageEntry *m; - - if (n->th != NULL) - return; /* request already pending */ - if (GNUNET_YES != n->is_connected) - { - GNUNET_break (0); - return; - } - m = n->encrypted_head; - if (m == NULL) - { - /* encrypted queue empty, try plaintext instead */ - process_plaintext_neighbour_queue (n); - return; - } -#if DEBUG_CORE > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n", - (unsigned int) m->size, GNUNET_i2s (&n->peer), - (unsigned long long) - GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value); -#endif - n->th = - GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size, - m->priority, - GNUNET_TIME_absolute_get_remaining - (m->deadline), - ¬ify_encrypted_transmit_ready, - n); - if (n->th == NULL) - { - /* message request too large or duplicate request */ - GNUNET_break (0); - /* discard encrypted message */ - GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m); - GNUNET_free (m); - process_encrypted_neighbour_queue (n); - } + struct GNUNET_SERVER_TransmitContext *tc = cls; + struct Session *session = value; + struct ConnectNotifyMessage cnm; + + /* FIXME: code duplication with clients... */ + cnm.header.size = htons (sizeof (struct ConnectNotifyMessage)); + cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); + // FIXME: full ats... + cnm.ats_count = htonl (0); + cnm.peer = session->peer; + GNUNET_SERVER_transmit_context_append_message (tc, &cnm.header); + return GNUNET_OK; } /** - * Initialize a new 'struct Neighbour'. + * Handle CORE_ITERATE_PEERS request. For this request type, the client + * does not have to have transmitted an INIT request. All current peers + * are returned, regardless of which message types they accept. * - * @param pid ID of the new neighbour - * @return handle for the new neighbour + * @param cls unused + * @param client client sending the iteration request + * @param message iteration request message */ -static struct Neighbour * -create_neighbour (const struct GNUNET_PeerIdentity *pid) +void +GSC_SESSIONS_handle_client_iterate_peers (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader + *message) { - struct Neighbour *n; - struct GNUNET_TIME_Absolute now; - -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating neighbour entry for peer `%4s'\n", GNUNET_i2s (pid)); -#endif - n = GNUNET_malloc (sizeof (struct Neighbour)); - n->peer = *pid; - GNUNET_CRYPTO_aes_create_session_key (&n->encrypt_key); - now = GNUNET_TIME_absolute_get (); - n->encrypt_key_created = now; - n->last_activity = now; - n->set_key_retry_frequency = INITIAL_SET_KEY_RETRY_FREQUENCY; - n->bw_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; - n->bw_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; - n->bw_out_internal_limit = GNUNET_BANDWIDTH_value_init (UINT32_MAX); - n->bw_out_external_limit = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; - n->ping_challenge = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (neighbours, - &n->peer.hashPubKey, n, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), - GNUNET_CONTAINER_multihashmap_size (neighbours), - GNUNET_NO); - neighbour_quota_update (n, NULL); - consider_free_neighbour (n); - return n; + struct GNUNET_MessageHeader done_msg; + struct GNUNET_SERVER_TransmitContext *tc; + + tc = GNUNET_SERVER_transmit_context_create (client); + GNUNET_CONTAINER_multihashmap_iterate (sessions, &queue_connect_message, tc); + done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); + done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); + GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); + GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); } - /** - * We have a new client, notify it about all current sessions. + * Handle CORE_PEER_CONNECTED request. Notify client about connection + * to the given neighbour. For this request type, the client does not + * have to have transmitted an INIT request. All current peers are + * returned, regardless of which message types they accept. * - * @param client the new client + * @param cls unused + * @param client client sending the iteration request + * @param message iteration request message */ void -GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) +GSC_SESSIONS_handle_client_have_peer (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader + *message) { - /* notify new client about existing neighbours */ - GNUNET_CONTAINER_multihashmap_iterate (neighbours, - ¬ify_client_about_neighbour, client); + struct GNUNET_MessageHeader done_msg; + struct GNUNET_SERVER_TransmitContext *tc; + const struct GNUNET_PeerIdentity *peer; + + peer = (const struct GNUNET_PeerIdentity *) &message[1]; // YUCK! + tc = GNUNET_SERVER_transmit_context_create (client); + GNUNET_CONTAINER_multihashmap_get_multiple (sessions, &peer->hashPubKey, + &queue_connect_message, tc); + done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); + done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); + GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); + GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); } /** - * Queue a request from a client for transmission to a particular peer. + * We've received a typemap message from a peer, update ours. + * Notifies clients about the session. * - * @param car request to queue; this handle is then shared between - * the caller (CLIENTS subsystem) and SESSIONS and must not - * be released by either until either 'GNUNET_SESSIONS_dequeue', - * 'GNUNET_SESSIONS_transmit' or 'GNUNET_CLIENTS_failed' - * have been invoked on it + * @param peer peer this is about + * @param msg typemap update message */ void -GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) +GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg) { - struct Neighbour *n; // FIXME: session... + struct Session *session; + struct GSC_TypeMap *nmap; - n = find_neighbour (&car->peer); - if ((n == NULL) || (GNUNET_YES != n->is_connected) || - (n->status != PEER_STATE_KEY_CONFIRMED)) + nmap = GSC_TYPEMAP_get_from_message (msg); + if (NULL == nmap) + return; /* malformed */ + session = find_session (peer); + if (NULL == session) { - /* neighbour must have disconnected since request was issued, - * ignore (client will realize it once it processes the - * disconnect notification) */ -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dropped client request for transmission (am disconnected)\n"); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# send requests dropped (disconnected)"), 1, - GNUNET_NO); - GSC_CLIENTS_reject_requests (car); + GNUNET_break (0); return; } -#if DEBUG_CORE_CLIENT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received client transmission request. queueing\n"); -#endif - GNUNET_CONTAINER_DLL_insert (n->active_client_request_head, - n->active_client_request_tail, car); - - // schedule_peer_messages (n); + GSC_CLIENTS_notify_clients_about_neighbour (peer, NULL, 0, /* FIXME: ATS */ + session->tmap, nmap); + GSC_TYPEMAP_destroy (session->tmap); + session->tmap = nmap; } /** - * Dequeue a request from a client from transmission to a particular peer. + * The given peer send a message of the specified type. Make sure the + * respective bit is set in its type-map and that clients are notified + * about the session. * - * @param car request to dequeue; this handle will then be 'owned' by - * the caller (CLIENTS sysbsystem) + * @param peer peer this is about + * @param type type of the message */ void -GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) +GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer, + uint16_t type) { - struct Session *s; + struct Session *session; + struct GSC_TypeMap *nmap; - s = find_session (&car->peer); - GNUNET_CONTAINER_DLL_remove (s->active_client_request_head, - s->active_client_request_tail, car); + if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity))) + return; + session = find_session (peer); + GNUNET_assert (NULL != session); + if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1)) + return; /* already in it */ + nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1); + GSC_CLIENTS_notify_clients_about_neighbour (peer, NULL, 0, /* FIXME: ATS */ + session->tmap, nmap); + GSC_TYPEMAP_destroy (session->tmap); + session->tmap = nmap; } - /** - * Transmit a message to a particular peer. - * - * @param car original request that was queued and then solicited; - * this handle will now be 'owned' by the SESSIONS subsystem - * @param msg message to transmit + * Initialize sessions subsystem. */ void -GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, - const struct GNUNET_MessageHeader *msg) +GSC_SESSIONS_init () { - struct MessageEntry *prev; - struct MessageEntry *pos; - struct MessageEntry *e; - struct MessageEntry *min_prio_entry; - struct MessageEntry *min_prio_prev; - unsigned int min_prio; - unsigned int queue_size; - - n = find_neighbour (&sm->peer); - if ((n == NULL) || (GNUNET_YES != n->is_connected) || - (n->status != PEER_STATE_KEY_CONFIRMED)) - { - /* attempt to send message to peer that is not connected anymore - * (can happen due to asynchrony) */ - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# messages discarded (disconnected)"), 1, - GNUNET_NO); - if (client != NULL) - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Core received `%s' request, queueing %u bytes of plaintext data for transmission to `%4s'.\n", - "SEND", (unsigned int) msize, GNUNET_i2s (&sm->peer)); -#endif - discard_expired_messages (n); - /* bound queue size */ - /* NOTE: this entire block to bound the queue size should be - * obsolete with the new client-request code and the - * 'schedule_peer_messages' mechanism; we still have this code in - * here for now as a sanity check for the new mechanmism; - * ultimately, we should probably simply reject SEND messages that - * are not 'approved' (or provide a new core API for very unreliable - * delivery that always sends with priority 0). Food for thought. */ - min_prio = UINT32_MAX; - min_prio_entry = NULL; - min_prio_prev = NULL; - queue_size = 0; - prev = NULL; - pos = n->messages; - while (pos != NULL) - { - if (pos->priority <= min_prio) - { - min_prio_entry = pos; - min_prio_prev = prev; - min_prio = pos->priority; - } - queue_size++; - prev = pos; - pos = pos->next; - } - if (queue_size >= MAX_PEER_QUEUE_SIZE) - { - /* queue full */ - if (ntohl (sm->priority) <= min_prio) - { - /* discard new entry; this should no longer happen! */ - GNUNET_break (0); -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queue full (%u/%u), discarding new request (%u bytes of type %u)\n", - queue_size, (unsigned int) MAX_PEER_QUEUE_SIZE, - (unsigned int) msize, (unsigned int) ntohs (message->type)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# discarded CORE_SEND requests"), - 1, GNUNET_NO); - - if (client != NULL) - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } - GNUNET_assert (min_prio_entry != NULL); - /* discard "min_prio_entry" */ -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queue full, discarding existing older request\n"); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop - ("# discarded lower priority CORE_SEND requests"), - 1, GNUNET_NO); - if (min_prio_prev == NULL) - n->messages = min_prio_entry->next; - else - min_prio_prev->next = min_prio_entry->next; - GNUNET_free (min_prio_entry); - } - -#if DEBUG_CORE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding transmission request for `%4s' of size %u to queue\n", - GNUNET_i2s (&sm->peer), (unsigned int) msize); -#endif - e = GNUNET_malloc (sizeof (struct MessageEntry) + msize); - e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline); - e->priority = ntohl (sm->priority); - e->size = msize; - if (GNUNET_YES != (int) ntohl (sm->cork)) - e->got_slack = GNUNET_YES; - memcpy (&e[1], &sm[1], msize); - - /* insert, keep list sorted by deadline */ - prev = NULL; - pos = n->messages; - while ((pos != NULL) && (pos->deadline.abs_value < e->deadline.abs_value)) - { - prev = pos; - pos = pos->next; - } - if (prev == NULL) - n->messages = e; - else - prev->next = e; - e->next = pos; - - /* consider scheduling now */ - process_plaintext_neighbour_queue (n); - + sessions = GNUNET_CONTAINER_multihashmap_create (128); } - -int -GSC_NEIGHBOURS_init () +/** + * Helper function for GSC_SESSIONS_handle_client_iterate_peers. + * + * @param cls NULL + * @param key identity of the connected peer + * @param value the 'struct Session' for the peer + * @return GNUNET_OK (continue to iterate) + */ +static int +free_session_helper (void *cls, const GNUNET_HashCode * key, void *value) { - neighbours = GNUNET_CONTAINER_multihashmap_create (128); - self.public_key = &my_public_key; - self.peer = my_identity; - self.last_activity = GNUNET_TIME_UNIT_FOREVER_ABS; - self.status = PEER_STATE_KEY_CONFIRMED; - self.is_connected = GNUNET_YES; + struct Session *session = value; + + GSC_SESSIONS_end (&session->peer); return GNUNET_OK; } +/** + * Shutdown sessions subsystem. + */ void -GSC_NEIGHBOURS_done () +GSC_SESSIONS_done () { - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper, - NULL); - GNUNET_CONTAINER_multihashmap_destroy (neighbours); - neighbours = NULL; - GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"), - 0, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_iterate (sessions, &free_session_helper, NULL); + GNUNET_CONTAINER_multihashmap_destroy (sessions); + sessions = NULL; } + +/* end of gnunet-service-core_sessions.c */