fix memory leak
[oweals/gnunet.git] / src / core / gnunet-service-core_sessions.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2014, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_sessions.c
23  * @brief code for managing of 'encrypted' sessions (key exchange done)
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-core.h"
28 #include "gnunet-service-core_kx.h"
29 #include "gnunet-service-core_typemap.h"
30 #include "gnunet-service-core_sessions.h"
31 #include "gnunet_constants.h"
32 #include "core.h"
33
34
35 /**
36  * How many encrypted messages do we queue at most?
37  * Needed to bound memory consumption.
38  */
39 #define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4
40
41
42 /**
43  * Message ready for encryption.  This struct is followed by the
44  * actual content of the message.
45  */
46 struct SessionMessageEntry
47 {
48
49   /**
50    * We keep messages in a doubly linked list.
51    */
52   struct SessionMessageEntry *next;
53
54   /**
55    * We keep messages in a doubly linked list.
56    */
57   struct SessionMessageEntry *prev;
58
59   /**
60    * How important is this message.
61    */
62   enum GNUNET_CORE_Priority priority;
63
64   /**
65    * Flag set to #GNUNET_YES if this is a typemap message.
66    */
67   int is_typemap;
68
69   /**
70    * Flag set to #GNUNET_YES if this is a typemap confirmation message.
71    */
72   int is_typemap_confirm;
73
74   /**
75    * Deadline for transmission, 1s after we received it (if we
76    * are not corking), otherwise "now".  Note that this message
77    * does NOT expire past its deadline.
78    */
79   struct GNUNET_TIME_Absolute deadline;
80
81   /**
82    * How long is the message? (number of bytes following the `struct
83    * MessageEntry`, but not including the size of `struct
84    * MessageEntry` itself!)
85    */
86   size_t size;
87
88 };
89
90
91 /**
92  * Data kept per session.
93  */
94 struct Session
95 {
96   /**
97    * Identity of the other peer.
98    */
99   const struct GNUNET_PeerIdentity *peer;
100
101   /**
102    * Key exchange state for this peer.
103    */
104   struct GSC_KeyExchangeInfo *kx;
105
106   /**
107    * Head of list of requests from clients for transmission to
108    * this peer.
109    */
110   struct GSC_ClientActiveRequest *active_client_request_head;
111
112   /**
113    * Tail of list of requests from clients for transmission to
114    * this peer.
115    */
116   struct GSC_ClientActiveRequest *active_client_request_tail;
117
118   /**
119    * Head of list of messages ready for encryption.
120    */
121   struct SessionMessageEntry *sme_head;
122
123   /**
124    * Tail of list of messages ready for encryption.
125    */
126   struct SessionMessageEntry *sme_tail;
127
128   /**
129    * Current type map for this peer.
130    */
131   struct GSC_TypeMap *tmap;
132
133   /**
134    * Task to transmit corked messages with a delay.
135    */
136   struct GNUNET_SCHEDULER_Task *cork_task;
137
138   /**
139    * Task to transmit our type map.
140    */
141   struct GNUNET_SCHEDULER_Task *typemap_task;
142
143   /**
144    * Retransmission delay we currently use for the typemap
145    * transmissions (if not confirmed).
146    */
147   struct GNUNET_TIME_Relative typemap_delay;
148
149   /**
150    * Is the neighbour queue empty and thus ready for us
151    * to transmit an encrypted message?
152    */
153   int ready_to_transmit;
154
155   /**
156    * Is this the first time we're sending the typemap? If so,
157    * we want to send it a bit faster the second time.  0 if
158    * we are sending for the first time, 1 if not.
159    */
160   int first_typemap;
161 };
162
163
164 GNUNET_NETWORK_STRUCT_BEGIN
165
166 /**
167  * Message sent to confirm that a typemap was received.
168  */
169 struct TypeMapConfirmationMessage
170 {
171
172   /**
173    * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP.
174    */
175   struct GNUNET_MessageHeader header;
176
177   /**
178    * Reserved, always zero.
179    */
180   uint32_t reserved GNUNET_PACKED;
181
182   /**
183    * Hash of the (decompressed) type map that was received.
184    */
185   struct GNUNET_HashCode tm_hash;
186
187 };
188
189 GNUNET_NETWORK_STRUCT_END
190
191
192 /**
193  * Map of peer identities to `struct Session`.
194  */
195 static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
196
197
198 /**
199  * Find the session for the given peer.
200  *
201  * @param peer identity of the peer
202  * @return NULL if we are not connected, otherwise the
203  *         session handle
204  */
205 static struct Session *
206 find_session (const struct GNUNET_PeerIdentity *peer)
207 {
208   if (NULL == sessions)
209     return NULL;
210   return GNUNET_CONTAINER_multipeermap_get (sessions,
211                                             peer);
212 }
213
214
215 /**
216  * End the session with the given peer (we are no longer
217  * connected).
218  *
219  * @param pid identity of peer to kill session with
220  */
221 void
222 GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
223 {
224   struct Session *session;
225   struct GSC_ClientActiveRequest *car;
226   struct SessionMessageEntry *sme;
227
228   session = find_session (pid);
229   if (NULL == session)
230     return;
231   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
232               "Destroying session for peer `%s'\n",
233               GNUNET_i2s (session->peer));
234   if (NULL != session->cork_task)
235   {
236     GNUNET_SCHEDULER_cancel (session->cork_task);
237     session->cork_task = NULL;
238   }
239   while (NULL != (car = session->active_client_request_head))
240   {
241     GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
242                                  session->active_client_request_tail, car);
243     GSC_CLIENTS_reject_request (car,
244                                 GNUNET_NO);
245   }
246   while (NULL != (sme = session->sme_head))
247   {
248     GNUNET_CONTAINER_DLL_remove (session->sme_head,
249                                  session->sme_tail,
250                                  sme);
251     GNUNET_free (sme);
252   }
253   if (NULL != session->typemap_task)
254   {
255     GNUNET_SCHEDULER_cancel (session->typemap_task);
256     session->typemap_task = NULL;
257   }
258   GSC_CLIENTS_notify_clients_about_neighbour (session->peer,
259                                               session->tmap,
260                                               NULL);
261   GNUNET_assert (GNUNET_YES ==
262                  GNUNET_CONTAINER_multipeermap_remove (sessions,
263                                                        session->peer,
264                                                        session));
265   GNUNET_STATISTICS_set (GSC_stats,
266                          gettext_noop ("# peers connected"),
267                          GNUNET_CONTAINER_multipeermap_size (sessions),
268                          GNUNET_NO);
269   GSC_TYPEMAP_destroy (session->tmap);
270   session->tmap = NULL;
271   GNUNET_free (session);
272 }
273
274
275 /**
276  * Transmit our current typemap message to the other peer.
277  * (Done periodically until the typemap is confirmed).
278  *
279  * @param cls the `struct Session *`
280  */
281 static void
282 transmit_typemap_task (void *cls)
283 {
284   struct Session *session = cls;
285   struct GNUNET_MessageHeader *hdr;
286   struct GNUNET_TIME_Relative delay;
287
288   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
289               "Sending TYPEMAP to %s\n",
290               GNUNET_i2s (session->peer));
291   session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay);
292   delay = session->typemap_delay;
293   /* randomize a bit to avoid spont. sync */
294   delay.rel_value_us +=
295       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
296                                 1000 * 1000);
297   session->typemap_task =
298       GNUNET_SCHEDULER_add_delayed (delay,
299                                     &transmit_typemap_task,
300                                     session);
301   GNUNET_STATISTICS_update (GSC_stats,
302                             gettext_noop ("# type map refreshes sent"),
303                             1,
304                             GNUNET_NO);
305   hdr = GSC_TYPEMAP_compute_type_map_message ();
306   GSC_KX_encrypt_and_transmit (session->kx,
307                                hdr,
308                                ntohs (hdr->size));
309   GNUNET_free (hdr);
310 }
311
312
313 /**
314  * Restart the typemap task for the given session.
315  *
316  * @param session session to restart typemap transmission for
317  */
318 static void
319 start_typemap_task (struct Session *session)
320 {
321   if (NULL != session->typemap_task)
322     GNUNET_SCHEDULER_cancel (session->typemap_task);
323   session->typemap_delay = GNUNET_TIME_UNIT_SECONDS;
324   session->typemap_task =
325     GNUNET_SCHEDULER_add_delayed (session->typemap_delay,
326                                   &transmit_typemap_task,
327                                   session);
328 }
329
330
331 /**
332  * Create a session, a key exchange was just completed.
333  *
334  * @param peer peer that is now connected
335  * @param kx key exchange that completed
336  */
337 void
338 GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
339                      struct GSC_KeyExchangeInfo *kx)
340 {
341   struct Session *session;
342
343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
344               "Creating session for peer `%s'\n",
345               GNUNET_i2s (peer));
346   session = GNUNET_new (struct Session);
347   session->tmap = GSC_TYPEMAP_create ();
348   session->peer = peer;
349   session->kx = kx;
350   GNUNET_assert (GNUNET_OK ==
351                  GNUNET_CONTAINER_multipeermap_put (sessions,
352                                                     session->peer,
353                                                     session,
354                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
355   GNUNET_STATISTICS_set (GSC_stats,
356                          gettext_noop ("# peers connected"),
357                          GNUNET_CONTAINER_multipeermap_size (sessions),
358                          GNUNET_NO);
359   GSC_CLIENTS_notify_clients_about_neighbour (peer,
360                                               NULL,
361                                               session->tmap);
362   start_typemap_task (session);
363 }
364
365
366 /**
367  * The other peer has indicated that he 'lost' the session
368  * (KX down), reinitialize the session on our end, in particular
369  * this means to restart the typemap transmission.
370  *
371  * @param peer peer that is now connected
372  */
373 void
374 GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer)
375 {
376   struct Session *session;
377
378   session = find_session (peer);
379   if (NULL == session)
380   {
381     /* KX/session is new for both sides; thus no need to restart what
382        has not yet begun */
383     return;
384   }
385   start_typemap_task (session);
386 }
387
388
389 /**
390  * The other peer has confirmed receiving our type map,
391  * check if it is current and if so, stop retransmitting it.
392  *
393  * @param peer peer that confirmed the type map
394  * @param msg confirmation message we received
395  */
396 void
397 GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer,
398                               const struct GNUNET_MessageHeader *msg)
399 {
400   const struct TypeMapConfirmationMessage *cmsg;
401   struct Session *session;
402
403   session = find_session (peer);
404   if (NULL == session)
405   {
406     GNUNET_break (0);
407     return;
408   }
409   if (ntohs (msg->size) != sizeof (struct TypeMapConfirmationMessage))
410   {
411     GNUNET_break_op (0);
412     return;
413   }
414   cmsg = (const struct TypeMapConfirmationMessage *) msg;
415   if (GNUNET_YES !=
416       GSC_TYPEMAP_check_hash (&cmsg->tm_hash))
417   {
418     /* our typemap has changed in the meantime, do not
419        accept confirmation */
420     GNUNET_STATISTICS_update (GSC_stats,
421                               gettext_noop
422                               ("# outdated typemap confirmations received"),
423                               1, GNUNET_NO);
424     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
425                 "Got outdated typemap confirmated from peer `%s'\n",
426                 GNUNET_i2s (session->peer));
427     return;
428   }
429   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
430               "Got typemap confirmation from peer `%s'\n",
431               GNUNET_i2s (session->peer));
432   if (NULL != session->typemap_task)
433   {
434     GNUNET_SCHEDULER_cancel (session->typemap_task);
435     session->typemap_task = NULL;
436   }
437   GNUNET_STATISTICS_update (GSC_stats,
438                             gettext_noop
439                             ("# valid typemap confirmations received"),
440                             1, GNUNET_NO);
441 }
442
443
444 /**
445  * Notify the given client about the session (client is new).
446  *
447  * @param cls the `struct GSC_Client`
448  * @param key peer identity
449  * @param value the `struct Session`
450  * @return #GNUNET_OK (continue to iterate)
451  */
452 static int
453 notify_client_about_session (void *cls,
454                              const struct GNUNET_PeerIdentity *key,
455                              void *value)
456 {
457   struct GSC_Client *client = cls;
458   struct Session *session = value;
459
460   GSC_CLIENTS_notify_client_about_neighbour (client,
461                                              session->peer,
462                                              NULL,      /* old TMAP: none */
463                                              session->tmap);
464   return GNUNET_OK;
465 }
466
467
468 /**
469  * We have a new client, notify it about all current sessions.
470  *
471  * @param client the new client
472  */
473 void
474 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
475 {
476   /* notify new client about existing sessions */
477   GNUNET_CONTAINER_multipeermap_iterate (sessions,
478                                          &notify_client_about_session,
479                                          client);
480 }
481
482
483 /**
484  * Try to perform a transmission on the given session.  Will solicit
485  * additional messages if the 'sme' queue is not full enough.
486  *
487  * @param session session to transmit messages from
488  */
489 static void
490 try_transmission (struct Session *session);
491
492
493 /**
494  * Queue a request from a client for transmission to a particular peer.
495  *
496  * @param car request to queue; this handle is then shared between
497  *         the caller (CLIENTS subsystem) and SESSIONS and must not
498  *         be released by either until either #GSC_SESSIONS_dequeue(),
499  *         #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed()
500  *         have been invoked on it
501  */
502 void
503 GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
504 {
505   struct Session *session;
506
507   session = find_session (&car->target);
508   if (NULL == session)
509   {
510     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511                 "Dropped client request for transmission (am disconnected)\n");
512     GNUNET_break (0);           /* should have been rejected earlier */
513     GSC_CLIENTS_reject_request (car,
514                                 GNUNET_NO);
515     return;
516   }
517   if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
518   {
519     GNUNET_break (0);
520     GSC_CLIENTS_reject_request (car,
521                                 GNUNET_YES);
522     return;
523   }
524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525               "Received client transmission request. queueing\n");
526   GNUNET_CONTAINER_DLL_insert_tail (session->active_client_request_head,
527                                     session->active_client_request_tail,
528                                     car);
529   try_transmission (session);
530 }
531
532
533 /**
534  * Dequeue a request from a client from transmission to a particular peer.
535  *
536  * @param car request to dequeue; this handle will then be 'owned' by
537  *        the caller (CLIENTS sysbsystem)
538  */
539 void
540 GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
541 {
542   struct Session *session;
543
544   if (0 ==
545       memcmp (&car->target,
546               &GSC_my_identity,
547               sizeof (struct GNUNET_PeerIdentity)))
548     return;
549   session = find_session (&car->target);
550   GNUNET_assert (NULL != session);
551   GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
552                                session->active_client_request_tail,
553                                car);
554   /* dequeueing of 'high' priority messages may unblock
555      transmission for lower-priority messages, so we also
556      need to try in this case. */
557   try_transmission (session);
558 }
559
560
561 /**
562  * Solicit messages for transmission, starting with those of the highest
563  * priority.
564  *
565  * @param session session to solict messages for
566  * @param msize how many bytes do we have already
567  */
568 static void
569 solicit_messages (struct Session *session,
570                   size_t msize)
571 {
572   struct GSC_ClientActiveRequest *car;
573   struct GSC_ClientActiveRequest *nxt;
574   size_t so_size;
575   enum GNUNET_CORE_Priority pmax;
576
577   so_size = msize;
578   pmax = GNUNET_CORE_PRIO_BACKGROUND;
579   for (car = session->active_client_request_head; NULL != car; car = car->next)
580   {
581     if (GNUNET_YES == car->was_solicited)
582       continue;
583     pmax = GNUNET_MAX (pmax, car->priority);
584   }
585   nxt = session->active_client_request_head;
586   while (NULL != (car = nxt))
587   {
588     nxt = car->next;
589     if (car->priority < pmax)
590       continue;
591     if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
592       break;
593     so_size += car->msize;
594     if (GNUNET_YES == car->was_solicited)
595       continue;
596     car->was_solicited = GNUNET_YES;
597     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
598                 "Soliciting message with priority %u\n",
599                 car->priority);
600     GSC_CLIENTS_solicit_request (car);
601     /* The above call may *dequeue* requests and thereby
602        clobber 'nxt'. Hence we need to restart from the
603        head of the list. */
604     nxt = session->active_client_request_head;
605     so_size = msize;
606   }
607 }
608
609
610 /**
611  * Some messages were delayed (corked), but the timeout has now expired.
612  * Send them now.
613  *
614  * @param cls `struct Session` with the messages to transmit now
615  */
616 static void
617 pop_cork_task (void *cls)
618 {
619   struct Session *session = cls;
620
621   session->cork_task = NULL;
622   try_transmission (session);
623 }
624
625
626 /**
627  * Try to perform a transmission on the given session. Will solicit
628  * additional messages if the 'sme' queue is not full enough or has
629  * only low-priority messages.
630  *
631  * @param session session to transmit messages from
632  */
633 static void
634 try_transmission (struct Session *session)
635 {
636   struct SessionMessageEntry *pos;
637   size_t msize;
638   struct GNUNET_TIME_Absolute now;
639   struct GNUNET_TIME_Absolute min_deadline;
640   enum GNUNET_CORE_Priority maxp;
641   enum GNUNET_CORE_Priority maxpc;
642   struct GSC_ClientActiveRequest *car;
643   int excess;
644
645   if (GNUNET_YES != session->ready_to_transmit)
646   {
647     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
648                 "Not yet ready to transmit, not evaluating queue\n");
649     return;
650   }
651   msize = 0;
652   min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
653   /* if the peer has excess bandwidth, background traffic is allowed,
654      otherwise not */
655   if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <=
656       GSC_NEIGHBOURS_get_queue_length (session->kx))
657   {
658     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659                 "Transmission queue already very long, waiting...\n");
660     return; /* queue already too long */
661   }
662   excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx);
663   if (GNUNET_YES == excess)
664     maxp = GNUNET_CORE_PRIO_BACKGROUND;
665   else
666     maxp = GNUNET_CORE_PRIO_BEST_EFFORT;
667   /* determine highest priority of 'ready' messages we already solicited from clients */
668   pos = session->sme_head;
669   while ((NULL != pos) &&
670          (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE))
671   {
672     GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
673     msize += pos->size;
674     maxp = GNUNET_MAX (maxp, pos->priority);
675     min_deadline = GNUNET_TIME_absolute_min (min_deadline,
676                                              pos->deadline);
677     pos = pos->next;
678   }
679   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
680               "Calculating transmission set with %u priority (%s) and %s earliest deadline\n",
681               maxp,
682               (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth",
683               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline),
684                                                       GNUNET_YES));
685
686   if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL)
687   {
688     /* if highest already solicited priority from clients is not critical,
689        check if there are higher-priority messages to be solicited from clients */
690     if (GNUNET_YES == excess)
691       maxpc = GNUNET_CORE_PRIO_BACKGROUND;
692     else
693       maxpc = GNUNET_CORE_PRIO_BEST_EFFORT;
694     for (car = session->active_client_request_head; NULL != car; car = car->next)
695     {
696       if (GNUNET_YES == car->was_solicited)
697         continue;
698       maxpc = GNUNET_MAX (maxpc,
699                           car->priority);
700     }
701     if (maxpc > maxp)
702     {
703       /* we have messages waiting for solicitation that have a higher
704          priority than those that we already accepted; solicit the
705          high-priority messages first */
706       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707                   "Soliciting messages based on priority (%u > %u)\n",
708                   maxpc,
709                   maxp);
710       solicit_messages (session, 0);
711       return;
712     }
713   }
714   else
715   {
716     /* never solicit more, we have critical messages to process */
717     excess = GNUNET_NO;
718     maxpc = GNUNET_CORE_PRIO_BACKGROUND;
719   }
720   now = GNUNET_TIME_absolute_get ();
721   if ( ( (GNUNET_YES == excess) ||
722          (maxpc >= GNUNET_CORE_PRIO_BEST_EFFORT) ) &&
723        ( (0 == msize) ||
724          ( (msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) &&
725            (min_deadline.abs_value_us > now.abs_value_us))) )
726   {
727     /* not enough ready yet (tiny message & cork possible), or no messages at all,
728        and either excess bandwidth or best-effort or higher message waiting at
729        client; in this case, we try to solicit more */
730     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
731                 "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n",
732                 excess,
733                 maxpc,
734                 (unsigned int) msize,
735                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline),
736                                                         GNUNET_YES));
737     solicit_messages (session,
738                       msize);
739     if (msize > 0)
740     {
741       /* if there is data to send, just not yet, make sure we do transmit
742        * it once the deadline is reached */
743       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744                   "Corking until %s\n",
745                   GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline),
746                                                           GNUNET_YES));
747       if (NULL != session->cork_task)
748         GNUNET_SCHEDULER_cancel (session->cork_task);
749       session->cork_task
750         = GNUNET_SCHEDULER_add_at (min_deadline,
751                                    &pop_cork_task,
752                                    session);
753     }
754     else
755     {
756       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
757                   "Queue empty, waiting for solicitations\n");
758     }
759     return;
760   }
761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762               "Building combined plaintext buffer to transmit message!\n");
763   /* create plaintext buffer of all messages (that fit), encrypt and
764      transmit */
765   {
766     static unsigned long long total_bytes;
767     static unsigned int total_msgs;
768     char pbuf[msize];           /* plaintext */
769     size_t used;
770
771     used = 0;
772     while ( (NULL != (pos = session->sme_head)) &&
773             (used + pos->size <= msize) )
774     {
775       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776                   "Adding message of type %d (%d/%d) to payload for %s\n",
777                   ntohs (((const struct GNUNET_MessageHeader *)&pos[1])->type),
778                   pos->is_typemap,
779                   pos->is_typemap_confirm,
780                   GNUNET_i2s (session->peer));
781       GNUNET_memcpy (&pbuf[used],
782                      &pos[1],
783                      pos->size);
784       used += pos->size;
785       GNUNET_CONTAINER_DLL_remove (session->sme_head,
786                                    session->sme_tail,
787                                    pos);
788       GNUNET_free (pos);
789     }
790     /* compute average payload size */
791     total_bytes += used;
792     total_msgs++;
793     if (0 == total_msgs)
794     {
795       /* 2^32 messages, wrap around... */
796       total_msgs = 1;
797       total_bytes = used;
798     }
799     GNUNET_STATISTICS_set (GSC_stats,
800                            "# avg payload per encrypted message",
801                            total_bytes / total_msgs,
802                            GNUNET_NO);
803     /* now actually transmit... */
804     session->ready_to_transmit = GNUNET_NO;
805     GSC_KX_encrypt_and_transmit (session->kx,
806                                  pbuf,
807                                  used);
808   }
809 }
810
811
812 /**
813  * Send an updated typemap message to the neighbour now,
814  * and restart typemap transmissions.
815  *
816  * @param cls the message
817  * @param key neighbour's identity
818  * @param value `struct Neighbour` of the target
819  * @return always #GNUNET_OK
820  */
821 static int
822 do_restart_typemap_message (void *cls,
823                             const struct GNUNET_PeerIdentity *key,
824                             void *value)
825 {
826   const struct GNUNET_MessageHeader *hdr = cls;
827   struct Session *session = value;
828   struct SessionMessageEntry *sme;
829   uint16_t size;
830
831   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
832               "Restarting sending TYPEMAP to %s\n",
833               GNUNET_i2s (session->peer));
834   size = ntohs (hdr->size);
835   for (sme = session->sme_head; NULL != sme; sme = sme->next)
836   {
837     if (GNUNET_YES == sme->is_typemap)
838     {
839       GNUNET_CONTAINER_DLL_remove (session->sme_head,
840                                    session->sme_tail,
841                                    sme);
842       GNUNET_free (sme);
843       break;
844     }
845   }
846   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size);
847   sme->is_typemap = GNUNET_YES;
848   GNUNET_memcpy (&sme[1],
849                  hdr,
850                  size);
851   sme->size = size;
852   sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
853   GNUNET_CONTAINER_DLL_insert (session->sme_head,
854                                session->sme_tail,
855                                sme);
856   try_transmission (session);
857   start_typemap_task (session);
858   return GNUNET_OK;
859 }
860
861
862 /**
863  * Broadcast an updated typemap message to all neighbours.
864  * Restarts the retransmissions until the typemaps are confirmed.
865  *
866  * @param msg message to transmit
867  */
868 void
869 GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg)
870 {
871   if (NULL == sessions)
872     return;
873   GNUNET_CONTAINER_multipeermap_iterate (sessions,
874                                          &do_restart_typemap_message,
875                                          (void *) msg);
876 }
877
878
879 /**
880  * Traffic is being solicited for the given peer.  This means that the
881  * message queue on the transport-level (NEIGHBOURS subsystem) is now
882  * empty and it is now OK to transmit another (non-control) message.
883  *
884  * @param pid identity of peer ready to receive data
885  */
886 void
887 GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
888 {
889   struct Session *session;
890
891   session = find_session (pid);
892   if (NULL == session)
893     return;
894   session->ready_to_transmit = GNUNET_YES;
895   try_transmission (session);
896 }
897
898
899 /**
900  * Transmit a message to a particular peer.
901  *
902  * @param car original request that was queued and then solicited;
903  *            this handle will now be 'owned' by the SESSIONS subsystem
904  * @param msg message to transmit
905  * @param cork is corking allowed?
906  * @param priority how important is this message
907  */
908 void
909 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
910                        const struct GNUNET_MessageHeader *msg,
911                        int cork,
912                        enum GNUNET_CORE_Priority priority)
913 {
914   struct Session *session;
915   struct SessionMessageEntry *sme;
916   struct SessionMessageEntry *pos;
917   size_t msize;
918
919   session = find_session (&car->target);
920   if (NULL == session)
921     return;
922   msize = ntohs (msg->size);
923   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize);
924   GNUNET_memcpy (&sme[1],
925                  msg,
926                  msize);
927   sme->size = msize;
928   sme->priority = priority;
929   if (GNUNET_YES == cork)
930   {
931     sme->deadline =
932         GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
933     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934                 "Mesage corked, delaying transmission\n");
935   }
936   pos = session->sme_head;
937   while ( (NULL != pos) &&
938           (pos->priority >= sme->priority) )
939     pos = pos->next;
940   if (NULL == pos)
941     GNUNET_CONTAINER_DLL_insert_tail (session->sme_head,
942                                       session->sme_tail,
943                                       sme);
944   else
945     GNUNET_CONTAINER_DLL_insert_after (session->sme_head,
946                                        session->sme_tail,
947                                        pos->prev,
948                                        sme);
949   try_transmission (session);
950 }
951
952
953 /**
954  * We have received a typemap message from a peer, update ours.
955  * Notifies clients about the session.
956  *
957  * @param peer peer this is about
958  * @param msg typemap update message
959  */
960 void
961 GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer,
962                           const struct GNUNET_MessageHeader *msg)
963 {
964   struct Session *session;
965   struct GSC_TypeMap *nmap;
966   struct SessionMessageEntry *sme;
967   struct TypeMapConfirmationMessage *tmc;
968
969   nmap = GSC_TYPEMAP_get_from_message (msg);
970   if (NULL == nmap)
971   {
972     GNUNET_break_op (0);
973     return;                     /* malformed */
974   }
975   session = find_session (peer);
976   if (NULL == session)
977   {
978     GNUNET_break (0);
979     return;
980   }
981   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
982               "Received TYPEMAP from %s\n",
983               GNUNET_i2s (session->peer));
984   for (sme = session->sme_head; NULL != sme; sme = sme->next)
985   {
986     if (GNUNET_YES == sme->is_typemap_confirm)
987     {
988       GNUNET_CONTAINER_DLL_remove (session->sme_head,
989                                    session->sme_tail,
990                                    sme);
991       GNUNET_free (sme);
992       break;
993     }
994   }
995   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) +
996                        sizeof (struct TypeMapConfirmationMessage));
997   sme->deadline = GNUNET_TIME_absolute_get ();
998   sme->size = sizeof (struct TypeMapConfirmationMessage);
999   sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
1000   sme->is_typemap_confirm = GNUNET_YES;
1001   tmc = (struct TypeMapConfirmationMessage *) &sme[1];
1002   tmc->header.size = htons (sizeof (struct TypeMapConfirmationMessage));
1003   tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP);
1004   tmc->reserved = htonl (0);
1005   GSC_TYPEMAP_hash (nmap,
1006                     &tmc->tm_hash);
1007   GNUNET_CONTAINER_DLL_insert (session->sme_head,
1008                                session->sme_tail,
1009                                sme);
1010   try_transmission (session);
1011   GSC_CLIENTS_notify_clients_about_neighbour (peer,
1012                                               session->tmap,
1013                                               nmap);
1014   GSC_TYPEMAP_destroy (session->tmap);
1015   session->tmap = nmap;
1016 }
1017
1018
1019 /**
1020  * The given peer send a message of the specified type.  Make sure the
1021  * respective bit is set in its type-map and that clients are notified
1022  * about the session.
1023  *
1024  * @param peer peer this is about
1025  * @param type type of the message
1026  */
1027 void
1028 GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer,
1029                              uint16_t type)
1030 {
1031   struct Session *session;
1032   struct GSC_TypeMap *nmap;
1033
1034   if (0 == memcmp (peer,
1035                    &GSC_my_identity,
1036                    sizeof (struct GNUNET_PeerIdentity)))
1037     return;
1038   session = find_session (peer);
1039   GNUNET_assert (NULL != session);
1040   if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap,
1041                                             &type, 1))
1042     return;                     /* already in it */
1043   nmap = GSC_TYPEMAP_extend (session->tmap,
1044                              &type,
1045                              1);
1046   GSC_CLIENTS_notify_clients_about_neighbour (peer,
1047                                               session->tmap,
1048                                               nmap);
1049   GSC_TYPEMAP_destroy (session->tmap);
1050   session->tmap = nmap;
1051 }
1052
1053
1054 /**
1055  * Initialize sessions subsystem.
1056  */
1057 void
1058 GSC_SESSIONS_init ()
1059 {
1060   sessions = GNUNET_CONTAINER_multipeermap_create (128,
1061                                                    GNUNET_YES);
1062 }
1063
1064
1065 /**
1066  * Helper function for #GSC_SESSIONS_done() to free all
1067  * active sessions.
1068  *
1069  * @param cls NULL
1070  * @param key identity of the connected peer
1071  * @param value the `struct Session` for the peer
1072  * @return #GNUNET_OK (continue to iterate)
1073  */
1074 static int
1075 free_session_helper (void *cls,
1076                      const struct GNUNET_PeerIdentity *key,
1077                      void *value)
1078 {
1079   /* struct Session *session = value; */
1080
1081   GSC_SESSIONS_end (key);
1082   return GNUNET_OK;
1083 }
1084
1085
1086 /**
1087  * Shutdown sessions subsystem.
1088  */
1089 void
1090 GSC_SESSIONS_done ()
1091 {
1092   if (NULL != sessions)
1093   {
1094     GNUNET_CONTAINER_multipeermap_iterate (sessions,
1095                                            &free_session_helper,
1096                                            NULL);
1097     GNUNET_CONTAINER_multipeermap_destroy (sessions);
1098     sessions = NULL;
1099   }
1100 }
1101
1102 /* end of gnunet-service-core_sessions.c */