X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcore%2Fgnunet-service-core_sessions.c;h=973ef9c2faaa7d5592efa9c6df1e4ce478aed866;hb=4b766fd267ca83a8faa4e22353d5942074d6f2b7;hp=5e8bd537f81005247623995f4003ad702519844b;hpb=a740974b42420e5619052d6a13bc3146ddb5a376;p=oweals%2Fgnunet.git diff --git a/src/core/gnunet-service-core_sessions.c b/src/core/gnunet-service-core_sessions.c index 5e8bd537f..973ef9c2f 100644 --- a/src/core/gnunet-service-core_sessions.c +++ b/src/core/gnunet-service-core_sessions.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009-2014 Christian Grothoff (and other contributing authors) + Copyright (C) 2009-2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -25,24 +25,18 @@ */ #include "platform.h" #include "gnunet-service-core.h" -#include "gnunet-service-core_neighbours.h" #include "gnunet-service-core_kx.h" #include "gnunet-service-core_typemap.h" #include "gnunet-service-core_sessions.h" -#include "gnunet-service-core_clients.h" #include "gnunet_constants.h" #include "core.h" /** - * How often do we transmit our typemap? + * How many encrypted messages do we queue at most? + * Needed to bound memory consumption. */ -#define TYPEMAP_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) - -/** - * How often do we transmit our typemap on first attempt? - */ -#define TYPEMAP_FREQUENCY_FIRST GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) +#define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4 /** @@ -62,6 +56,21 @@ struct SessionMessageEntry */ struct SessionMessageEntry *prev; + /** + * How important is this message. + */ + enum GNUNET_CORE_Priority priority; + + /** + * Flag set to #GNUNET_YES if this is a typemap message. + */ + int is_typemap; + + /** + * Flag set to #GNUNET_YES if this is a typemap confirmation message. + */ + int is_typemap_confirm; + /** * Deadline for transmission, 1s after we received it (if we * are not corking), otherwise "now". Note that this message @@ -70,17 +79,12 @@ struct SessionMessageEntry struct GNUNET_TIME_Absolute deadline; /** - * How long is the message? (number of bytes following the "struct - * MessageEntry", but not including the size of "struct - * MessageEntry" itself!) + * How long is the message? (number of bytes following the `struct + * MessageEntry`, but not including the size of `struct + * MessageEntry` itself!) */ size_t size; - /** - * How important is this message. - */ - enum GNUNET_CORE_Priority priority; - }; @@ -92,7 +96,12 @@ struct Session /** * Identity of the other peer. */ - struct GNUNET_PeerIdentity peer; + const struct GNUNET_PeerIdentity *peer; + + /** + * Key exchange state for this peer. + */ + struct GSC_KeyExchangeInfo *kx; /** * Head of list of requests from clients for transmission to @@ -116,11 +125,6 @@ struct Session */ struct SessionMessageEntry *sme_tail; - /** - * Information about the key exchange with the other peer. - */ - struct GSC_KeyExchangeInfo *kxinfo; - /** * Current type map for this peer. */ @@ -129,18 +133,18 @@ struct Session /** * Task to transmit corked messages with a delay. */ - GNUNET_SCHEDULER_TaskIdentifier cork_task; + struct GNUNET_SCHEDULER_Task *cork_task; /** * Task to transmit our type map. */ - GNUNET_SCHEDULER_TaskIdentifier typemap_task; + struct GNUNET_SCHEDULER_Task *typemap_task; /** - * Is the neighbour queue empty and thus ready for us - * to transmit an encrypted message? + * Retransmission delay we currently use for the typemap + * transmissions (if not confirmed). */ - int ready_to_transmit; + struct GNUNET_TIME_Relative typemap_delay; /** * Is this the first time we're sending the typemap? If so, @@ -151,6 +155,34 @@ struct Session }; +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Message sent to confirm that a typemap was received. + */ +struct TypeMapConfirmationMessage +{ + + /** + * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP. + */ + struct GNUNET_MessageHeader header; + + /** + * Reserved, always zero. + */ + uint32_t reserved GNUNET_PACKED; + + /** + * Hash of the (decompressed) type map that was received. + */ + struct GNUNET_HashCode tm_hash; + +}; + +GNUNET_NETWORK_STRUCT_END + + /** * Map of peer identities to `struct Session`. */ @@ -167,7 +199,10 @@ static struct GNUNET_CONTAINER_MultiPeerMap *sessions; static struct Session * find_session (const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multipeermap_get (sessions, peer); + if (NULL == sessions) + return NULL; + return GNUNET_CONTAINER_multipeermap_get (sessions, + peer); } @@ -188,32 +223,41 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) if (NULL == session) return; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Destroying session for peer `%4s'\n", - GNUNET_i2s (&session->peer)); - if (GNUNET_SCHEDULER_NO_TASK != session->cork_task) + "Destroying session for peer `%s'\n", + GNUNET_i2s (session->peer)); + if (NULL != session->cork_task) { GNUNET_SCHEDULER_cancel (session->cork_task); - session->cork_task = GNUNET_SCHEDULER_NO_TASK; + session->cork_task = NULL; } while (NULL != (car = session->active_client_request_head)) { GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, session->active_client_request_tail, car); - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_NO); } while (NULL != (sme = session->sme_head)) { - GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, sme); + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); GNUNET_free (sme); } - GNUNET_SCHEDULER_cancel (session->typemap_task); - GSC_CLIENTS_notify_clients_about_neighbour (&session->peer, - session->tmap, NULL); + if (NULL != session->typemap_task) + { + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_task = NULL; + } + GSC_CLIENTS_notify_clients_about_neighbour (session->peer, + session->tmap, + NULL); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (sessions, - &session->peer, + session->peer, session)); - GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"), + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), GNUNET_CONTAINER_multipeermap_size (sessions), GNUNET_NO); GSC_TYPEMAP_destroy (session->tmap); @@ -224,42 +268,60 @@ GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid) /** * Transmit our current typemap message to the other peer. - * (Done periodically in case an update got lost). + * (Done periodically until the typemap is confirmed). * * @param cls the `struct Session *` - * @param tc unused */ static void -transmit_typemap_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +transmit_typemap_task (void *cls) { struct Session *session = cls; struct GNUNET_MessageHeader *hdr; struct GNUNET_TIME_Relative delay; - if (0 == session->first_typemap) - { - delay = TYPEMAP_FREQUENCY_FIRST; - session->first_typemap = 1; - } - else - { - delay = TYPEMAP_FREQUENCY; - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending TYPEMAP to %s\n", + GNUNET_i2s (session->peer)); + session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay); + delay = session->typemap_delay; /* randomize a bit to avoid spont. sync */ delay.rel_value_us += - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000); + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + 1000 * 1000); session->typemap_task = - GNUNET_SCHEDULER_add_delayed (delay, &transmit_typemap_task, session); + GNUNET_SCHEDULER_add_delayed (delay, + &transmit_typemap_task, + session); GNUNET_STATISTICS_update (GSC_stats, - gettext_noop ("# type map refreshes sent"), 1, + gettext_noop ("# type map refreshes sent"), + 1, GNUNET_NO); hdr = GSC_TYPEMAP_compute_type_map_message (); - GSC_KX_encrypt_and_transmit (session->kxinfo, hdr, ntohs (hdr->size)); + GSC_KX_encrypt_and_transmit (session->kx, + hdr, + ntohs (hdr->size)); GNUNET_free (hdr); } +/** + * Restart the typemap task for the given session. + * + * @param session session to restart typemap transmission for + */ +static void +start_typemap_task (struct Session *session) +{ + if (NULL != session->typemap_task) + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_delay = GNUNET_TIME_UNIT_SECONDS; + session->typemap_task = + GNUNET_SCHEDULER_add_delayed (session->typemap_delay, + &transmit_typemap_task, + session); +} + + /** * Create a session, a key exchange was just completed. * @@ -273,23 +335,103 @@ GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer, struct Session *session; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating session for peer `%4s'\n", + "Creating session for peer `%s'\n", GNUNET_i2s (peer)); session = GNUNET_new (struct Session); session->tmap = GSC_TYPEMAP_create (); - session->peer = *peer; - session->kxinfo = kx; - session->typemap_task = - GNUNET_SCHEDULER_add_now (&transmit_typemap_task, session); + session->peer = peer; + session->kx = kx; GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (sessions, peer, + GNUNET_CONTAINER_multipeermap_put (sessions, + session->peer, session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"), + GNUNET_STATISTICS_set (GSC_stats, + gettext_noop ("# peers connected"), GNUNET_CONTAINER_multipeermap_size (sessions), GNUNET_NO); GSC_CLIENTS_notify_clients_about_neighbour (peer, - NULL, session->tmap); + NULL, + session->tmap); + start_typemap_task (session); +} + + +/** + * The other peer has indicated that he 'lost' the session + * (KX down), reinitialize the session on our end, in particular + * this means to restart the typemap transmission. + * + * @param peer peer that is now connected + */ +void +GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer) +{ + struct Session *session; + + session = find_session (peer); + if (NULL == session) + { + /* KX/session is new for both sides; thus no need to restart what + has not yet begun */ + return; + } + start_typemap_task (session); +} + + +/** + * The other peer has confirmed receiving our type map, + * check if it is current and if so, stop retransmitting it. + * + * @param peer peer that confirmed the type map + * @param msg confirmation message we received + */ +void +GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *msg) +{ + const struct TypeMapConfirmationMessage *cmsg; + struct Session *session; + + session = find_session (peer); + if (NULL == session) + { + GNUNET_break (0); + return; + } + if (ntohs (msg->size) != sizeof (struct TypeMapConfirmationMessage)) + { + GNUNET_break_op (0); + return; + } + cmsg = (const struct TypeMapConfirmationMessage *) msg; + if (GNUNET_YES != + GSC_TYPEMAP_check_hash (&cmsg->tm_hash)) + { + /* our typemap has changed in the meantime, do not + accept confirmation */ + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop + ("# outdated typemap confirmations received"), + 1, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got outdated typemap confirmated from peer `%s'\n", + GNUNET_i2s (session->peer)); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got typemap confirmation from peer `%s'\n", + GNUNET_i2s (session->peer)); + if (NULL != session->typemap_task) + { + GNUNET_SCHEDULER_cancel (session->typemap_task); + session->typemap_task = NULL; + } + GNUNET_STATISTICS_update (GSC_stats, + gettext_noop + ("# valid typemap confirmations received"), + 1, GNUNET_NO); } @@ -309,7 +451,8 @@ notify_client_about_session (void *cls, struct GSC_Client *client = cls; struct Session *session = value; - GSC_CLIENTS_notify_client_about_neighbour (client, &session->peer, + GSC_CLIENTS_notify_client_about_neighbour (client, + session->peer, NULL, /* old TMAP: none */ session->tmap); return GNUNET_OK; @@ -325,7 +468,8 @@ void GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client) { /* notify new client about existing sessions */ - GNUNET_CONTAINER_multipeermap_iterate (sessions, ¬ify_client_about_session, + GNUNET_CONTAINER_multipeermap_iterate (sessions, + ¬ify_client_about_session, client); } @@ -360,19 +504,22 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Dropped client request for transmission (am disconnected)\n"); GNUNET_break (0); /* should have been rejected earlier */ - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_NO); return; } if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { GNUNET_break (0); - GSC_CLIENTS_reject_request (car); + GSC_CLIENTS_reject_request (car, + GNUNET_YES); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received client transmission request. queueing\n"); - GNUNET_CONTAINER_DLL_insert (session->active_client_request_head, - session->active_client_request_tail, car); + GNUNET_CONTAINER_DLL_insert_tail (session->active_client_request_head, + session->active_client_request_tail, + car); try_transmission (session); } @@ -386,50 +533,22 @@ GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car) void GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car) { - struct Session *s; + struct Session *session; if (0 == - memcmp (&car->target, &GSC_my_identity, + memcmp (&car->target, + &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity))) return; - s = find_session (&car->target); - GNUNET_assert (NULL != s); - GNUNET_CONTAINER_DLL_remove (s->active_client_request_head, - s->active_client_request_tail, car); -} - - -/** - * Discard all expired active transmission requests from clients. - * - * @param session session to clean up - */ -static void -discard_expired_requests (struct Session *session) -{ - struct GSC_ClientActiveRequest *pos; - struct GSC_ClientActiveRequest *nxt; - struct GNUNET_TIME_Absolute now; - - now = GNUNET_TIME_absolute_get (); - pos = NULL; - nxt = session->active_client_request_head; - while (NULL != nxt) - { - pos = nxt; - nxt = pos->next; - if ((pos->deadline.abs_value_us < now.abs_value_us) && - (GNUNET_YES != pos->was_solicited)) - { - GNUNET_STATISTICS_update (GSC_stats, - gettext_noop - ("# messages discarded (expired prior to transmission)"), - 1, GNUNET_NO); - GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, - session->active_client_request_tail, pos); - GSC_CLIENTS_reject_request (pos); - } - } + session = find_session (&car->target); + GNUNET_assert (NULL != session); + GNUNET_CONTAINER_DLL_remove (session->active_client_request_head, + session->active_client_request_tail, + car); + /* dequeueing of 'high' priority messages may unblock + transmission for lower-priority messages, so we also + need to try in this case. */ + try_transmission (session); } @@ -449,7 +568,6 @@ solicit_messages (struct Session *session, size_t so_size; enum GNUNET_CORE_Priority pmax; - discard_expired_requests (session); so_size = msize; pmax = GNUNET_CORE_PRIO_BACKGROUND; for (car = session->active_client_request_head; NULL != car; car = car->next) @@ -470,7 +588,15 @@ solicit_messages (struct Session *session, if (GNUNET_YES == car->was_solicited) continue; car->was_solicited = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting message with priority %u\n", + car->priority); GSC_CLIENTS_solicit_request (car); + /* The above call may *dequeue* requests and thereby + clobber 'nxt'. Hence we need to restart from the + head of the list. */ + nxt = session->active_client_request_head; + so_size = msize; } } @@ -480,15 +606,13 @@ solicit_messages (struct Session *session, * Send them now. * * @param cls `struct Session` with the messages to transmit now - * @param tc scheduler context (unused) */ static void -pop_cork_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +pop_cork_task (void *cls) { struct Session *session = cls; - session->cork_task = GNUNET_SCHEDULER_NO_TASK; + session->cork_task = NULL; try_transmission (session); } @@ -511,14 +635,19 @@ try_transmission (struct Session *session) enum GNUNET_CORE_Priority maxpc; struct GSC_ClientActiveRequest *car; int excess; - - if (GNUNET_YES != session->ready_to_transmit) - return; + msize = 0; min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS; /* if the peer has excess bandwidth, background traffic is allowed, otherwise not */ - excess = GSC_NEIGHBOURS_check_excess_bandwidth (&session->peer); + if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <= + GSC_NEIGHBOURS_get_queue_length (session->kx)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission queue already very long, waiting...\n"); + return; /* queue already too long */ + } + excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx); if (GNUNET_YES == excess) maxp = GNUNET_CORE_PRIO_BACKGROUND; else @@ -531,9 +660,17 @@ try_transmission (struct Session *session) GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE); msize += pos->size; maxp = GNUNET_MAX (maxp, pos->priority); - min_deadline = GNUNET_TIME_absolute_min (min_deadline, pos->deadline); + min_deadline = GNUNET_TIME_absolute_min (min_deadline, + pos->deadline); pos = pos->next; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calculating transmission set with %u priority (%s) and %s earliest deadline\n", + maxp, + (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL) { /* if highest already solicited priority from clients is not critical, @@ -546,18 +683,28 @@ try_transmission (struct Session *session) { if (GNUNET_YES == car->was_solicited) continue; - maxpc = GNUNET_MAX (maxpc, car->priority); + maxpc = GNUNET_MAX (maxpc, + car->priority); } if (maxpc > maxp) { /* we have messages waiting for solicitation that have a higher priority than those that we already accepted; solicit the high-priority messages first */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting messages based on priority (%u > %u)\n", + maxpc, + maxp); solicit_messages (session, 0); return; } } - + else + { + /* never solicit more, we have critical messages to process */ + excess = GNUNET_NO; + maxpc = GNUNET_CORE_PRIO_BACKGROUND; + } now = GNUNET_TIME_absolute_get (); if ( ( (GNUNET_YES == excess) || (maxpc >= GNUNET_CORE_PRIO_BEST_EFFORT) ) && @@ -568,21 +715,39 @@ try_transmission (struct Session *session) /* not enough ready yet (tiny message & cork possible), or no messages at all, and either excess bandwidth or best-effort or higher message waiting at client; in this case, we try to solicit more */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n", + excess, + maxpc, + (unsigned int) msize, + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); solicit_messages (session, msize); if (msize > 0) { /* if there is data to send, just not yet, make sure we do transmit * it once the deadline is reached */ - if (GNUNET_SCHEDULER_NO_TASK != session->cork_task) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Corking until %s\n", + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline), + GNUNET_YES)); + if (NULL != session->cork_task) GNUNET_SCHEDULER_cancel (session->cork_task); - session->cork_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining - (min_deadline), &pop_cork_task, - session); + session->cork_task + = GNUNET_SCHEDULER_add_at (min_deadline, + &pop_cork_task, + session); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queue empty, waiting for solicitations\n"); } return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Building combined plaintext buffer to transmit message!\n"); /* create plaintext buffer of all messages (that fit), encrypt and transmit */ { @@ -592,11 +757,22 @@ try_transmission (struct Session *session) size_t used; used = 0; - while ((NULL != (pos = session->sme_head)) && (used + pos->size <= msize)) + while ( (NULL != (pos = session->sme_head)) && + (used + pos->size <= msize) ) { - memcpy (&pbuf[used], &pos[1], pos->size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding message of type %d (%d/%d) to payload for %s\n", + ntohs (((const struct GNUNET_MessageHeader *)&pos[1])->type), + pos->is_typemap, + pos->is_typemap_confirm, + GNUNET_i2s (session->peer)); + GNUNET_memcpy (&pbuf[used], + &pos[1], + pos->size); used += pos->size; - GNUNET_CONTAINER_DLL_remove (session->sme_head, session->sme_tail, pos); + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + pos); GNUNET_free (pos); } /* compute average payload size */ @@ -610,16 +786,20 @@ try_transmission (struct Session *session) } GNUNET_STATISTICS_set (GSC_stats, "# avg payload per encrypted message", - total_bytes / total_msgs, GNUNET_NO); + total_bytes / total_msgs, + GNUNET_NO); /* now actually transmit... */ session->ready_to_transmit = GNUNET_NO; - GSC_KX_encrypt_and_transmit (session->kxinfo, pbuf, used); + GSC_KX_encrypt_and_transmit (session->kx, + pbuf, + used); } } /** - * Send a message to the neighbour now. + * Send an updated typemap message to the neighbour now, + * and restart typemap transmissions. * * @param cls the message * @param key neighbour's identity @@ -627,38 +807,59 @@ try_transmission (struct Session *session) * @return always #GNUNET_OK */ static int -do_send_message (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +do_restart_typemap_message (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { const struct GNUNET_MessageHeader *hdr = cls; struct Session *session = value; - struct SessionMessageEntry *m; + struct SessionMessageEntry *sme; uint16_t size; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Restarting sending TYPEMAP to %s\n", + GNUNET_i2s (session->peer)); size = ntohs (hdr->size); - m = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); - memcpy (&m[1], hdr, size); - m->size = size; - m->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; - GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, session->sme_tail, m); + for (sme = session->sme_head; NULL != sme; sme = sme->next) + { + if (GNUNET_YES == sme->is_typemap) + { + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); + GNUNET_free (sme); + break; + } + } + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size); + sme->is_typemap = GNUNET_YES; + GNUNET_memcpy (&sme[1], + hdr, + size); + sme->size = size; + sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + GNUNET_CONTAINER_DLL_insert (session->sme_head, + session->sme_tail, + sme); try_transmission (session); + start_typemap_task (session); return GNUNET_OK; } /** - * Broadcast a message to all neighbours. + * Broadcast an updated typemap message to all neighbours. + * Restarts the retransmissions until the typemaps are confirmed. * * @param msg message to transmit */ void -GSC_SESSIONS_broadcast (const struct GNUNET_MessageHeader *msg) +GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg) { if (NULL == sessions) return; GNUNET_CONTAINER_multipeermap_iterate (sessions, - &do_send_message, + &do_restart_typemap_message, (void *) msg); } @@ -675,10 +876,12 @@ GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid) { struct Session *session; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transport solicits for %s\n", + GNUNET_i2s (pid)); session = find_session (pid); if (NULL == session) return; - session->ready_to_transmit = GNUNET_YES; try_transmission (session); } @@ -708,15 +911,21 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, return; msize = ntohs (msg->size); sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize); - memcpy (&sme[1], msg, msize); + GNUNET_memcpy (&sme[1], + msg, + msize); sme->size = msize; sme->priority = priority; if (GNUNET_YES == cork) + { sme->deadline = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Mesage corked, delaying transmission\n"); + } pos = session->sme_head; while ( (NULL != pos) && - (pos->priority > sme->priority) ) + (pos->priority >= sme->priority) ) pos = pos->next; if (NULL == pos) GNUNET_CONTAINER_DLL_insert_tail (session->sme_head, @@ -732,60 +941,7 @@ GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car, /** - * Helper function for #GSC_SESSIONS_handle_client_iterate_peers(). - * - * @param cls the `struct GNUNET_SERVER_TransmitContext` to queue replies - * @param key identity of the connected peer - * @param value the `struct Neighbour` for the peer - * @return #GNUNET_OK (continue to iterate) - */ -static int -queue_connect_message (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) -{ - struct GNUNET_SERVER_TransmitContext *tc = cls; - struct Session *session = value; - struct ConnectNotifyMessage cnm; - - /* FIXME: code duplication with clients... */ - cnm.header.size = htons (sizeof (struct ConnectNotifyMessage)); - cnm.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT); - cnm.reserved = htonl (0); - cnm.peer = session->peer; - GNUNET_SERVER_transmit_context_append_message (tc, &cnm.header); - return GNUNET_OK; -} - - -/** - * Handle CORE_ITERATE_PEERS request. For this request type, the client - * does not have to have transmitted an INIT request. All current peers - * are returned, regardless of which message types they accept. - * - * @param cls unused - * @param client client sending the iteration request - * @param message iteration request message - */ -void -GSC_SESSIONS_handle_client_iterate_peers (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - struct GNUNET_MessageHeader done_msg; - struct GNUNET_SERVER_TransmitContext *tc; - - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_CONTAINER_multipeermap_iterate (sessions, &queue_connect_message, tc); - done_msg.size = htons (sizeof (struct GNUNET_MessageHeader)); - done_msg.type = htons (GNUNET_MESSAGE_TYPE_CORE_ITERATE_PEERS_END); - GNUNET_SERVER_transmit_context_append_message (tc, &done_msg); - GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); -} - - -/** - * We've received a typemap message from a peer, update ours. + * We have received a typemap message from a peer, update ours. * Notifies clients about the session. * * @param peer peer this is about @@ -797,18 +953,55 @@ GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer, { struct Session *session; struct GSC_TypeMap *nmap; + struct SessionMessageEntry *sme; + struct TypeMapConfirmationMessage *tmc; nmap = GSC_TYPEMAP_get_from_message (msg); if (NULL == nmap) + { + GNUNET_break_op (0); return; /* malformed */ + } session = find_session (peer); if (NULL == session) { + GSC_TYPEMAP_destroy (nmap); GNUNET_break (0); return; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received TYPEMAP from %s\n", + GNUNET_i2s (session->peer)); + for (sme = session->sme_head; NULL != sme; sme = sme->next) + { + if (GNUNET_YES == sme->is_typemap_confirm) + { + GNUNET_CONTAINER_DLL_remove (session->sme_head, + session->sme_tail, + sme); + GNUNET_free (sme); + break; + } + } + sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + + sizeof (struct TypeMapConfirmationMessage)); + sme->deadline = GNUNET_TIME_absolute_get (); + sme->size = sizeof (struct TypeMapConfirmationMessage); + sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL; + sme->is_typemap_confirm = GNUNET_YES; + tmc = (struct TypeMapConfirmationMessage *) &sme[1]; + tmc->header.size = htons (sizeof (struct TypeMapConfirmationMessage)); + tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP); + tmc->reserved = htonl (0); + GSC_TYPEMAP_hash (nmap, + &tmc->tm_hash); + GNUNET_CONTAINER_DLL_insert (session->sme_head, + session->sme_tail, + sme); + try_transmission (session); GSC_CLIENTS_notify_clients_about_neighbour (peer, - session->tmap, nmap); + session->tmap, + nmap); GSC_TYPEMAP_destroy (session->tmap); session->tmap = nmap; } @@ -829,15 +1022,21 @@ GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer, struct Session *session; struct GSC_TypeMap *nmap; - if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity))) + if (0 == memcmp (peer, + &GSC_my_identity, + sizeof (struct GNUNET_PeerIdentity))) return; session = find_session (peer); GNUNET_assert (NULL != session); - if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1)) + if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, + &type, 1)) return; /* already in it */ - nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1); + nmap = GSC_TYPEMAP_extend (session->tmap, + &type, + 1); GSC_CLIENTS_notify_clients_about_neighbour (peer, - session->tmap, nmap); + session->tmap, + nmap); GSC_TYPEMAP_destroy (session->tmap); session->tmap = nmap; } @@ -849,12 +1048,14 @@ GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer, void GSC_SESSIONS_init () { - sessions = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); + sessions = GNUNET_CONTAINER_multipeermap_create (128, + GNUNET_YES); } /** - * Helper function for GSC_SESSIONS_handle_client_iterate_peers. + * Helper function for #GSC_SESSIONS_done() to free all + * active sessions. * * @param cls NULL * @param key identity of the connected peer @@ -866,9 +1067,9 @@ free_session_helper (void *cls, const struct GNUNET_PeerIdentity *key, void *value) { - struct Session *session = value; + /* struct Session *session = value; */ - GSC_SESSIONS_end (&session->peer); + GSC_SESSIONS_end (key); return GNUNET_OK; } @@ -881,7 +1082,9 @@ GSC_SESSIONS_done () { if (NULL != sessions) { - GNUNET_CONTAINER_multipeermap_iterate (sessions, &free_session_helper, NULL); + GNUNET_CONTAINER_multipeermap_iterate (sessions, + &free_session_helper, + NULL); GNUNET_CONTAINER_multipeermap_destroy (sessions); sessions = NULL; }