refactor DHT for new service API
[oweals/gnunet.git] / src / core / gnunet-service-core_sessions.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2014, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_sessions.c
23  * @brief code for managing of 'encrypted' sessions (key exchange done)
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-core.h"
28 #include "gnunet-service-core_kx.h"
29 #include "gnunet-service-core_typemap.h"
30 #include "gnunet-service-core_sessions.h"
31 #include "gnunet_constants.h"
32 #include "core.h"
33
34
35 /**
36  * How many encrypted messages do we queue at most?
37  * Needed to bound memory consumption.
38  */
39 #define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4
40
41
42 /**
43  * Message ready for encryption.  This struct is followed by the
44  * actual content of the message.
45  */
46 struct SessionMessageEntry
47 {
48
49   /**
50    * We keep messages in a doubly linked list.
51    */
52   struct SessionMessageEntry *next;
53
54   /**
55    * We keep messages in a doubly linked list.
56    */
57   struct SessionMessageEntry *prev;
58
59   /**
60    * Deadline for transmission, 1s after we received it (if we
61    * are not corking), otherwise "now".  Note that this message
62    * does NOT expire past its deadline.
63    */
64   struct GNUNET_TIME_Absolute deadline;
65
66   /**
67    * How long is the message? (number of bytes following the `struct
68    * MessageEntry`, but not including the size of `struct
69    * MessageEntry` itself!)
70    */
71   size_t size;
72
73   /**
74    * How important is this message.
75    */
76   enum GNUNET_CORE_Priority priority;
77
78 };
79
80
81 /**
82  * Data kept per session.
83  */
84 struct Session
85 {
86   /**
87    * Identity of the other peer.
88    */
89   const struct GNUNET_PeerIdentity *peer;
90
91   /**
92    * Key exchange state for this peer.
93    */ 
94   struct GSC_KeyExchangeInfo *kx;
95   
96   /**
97    * Head of list of requests from clients for transmission to
98    * this peer.
99    */
100   struct GSC_ClientActiveRequest *active_client_request_head;
101
102   /**
103    * Tail of list of requests from clients for transmission to
104    * this peer.
105    */
106   struct GSC_ClientActiveRequest *active_client_request_tail;
107
108   /**
109    * Head of list of messages ready for encryption.
110    */
111   struct SessionMessageEntry *sme_head;
112
113   /**
114    * Tail of list of messages ready for encryption.
115    */
116   struct SessionMessageEntry *sme_tail;
117
118   /**
119    * Current type map for this peer.
120    */
121   struct GSC_TypeMap *tmap;
122
123   /**
124    * Task to transmit corked messages with a delay.
125    */
126   struct GNUNET_SCHEDULER_Task *cork_task;
127
128   /**
129    * Task to transmit our type map.
130    */
131   struct GNUNET_SCHEDULER_Task *typemap_task;
132
133   /**
134    * Retransmission delay we currently use for the typemap
135    * transmissions (if not confirmed).
136    */
137   struct GNUNET_TIME_Relative typemap_delay;
138
139   /**
140    * Is the neighbour queue empty and thus ready for us
141    * to transmit an encrypted message?
142    */
143   int ready_to_transmit;
144
145   /**
146    * Is this the first time we're sending the typemap? If so,
147    * we want to send it a bit faster the second time.  0 if
148    * we are sending for the first time, 1 if not.
149    */
150   int first_typemap;
151 };
152
153
154 GNUNET_NETWORK_STRUCT_BEGIN
155
156 /**
157  * Message sent to confirm that a typemap was received.
158  */
159 struct TypeMapConfirmationMessage
160 {
161
162   /**
163    * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP.
164    */
165   struct GNUNET_MessageHeader header;
166
167   /**
168    * Reserved, always zero.
169    */
170   uint32_t reserved GNUNET_PACKED;
171
172   /**
173    * Hash of the (decompressed) type map that was received.
174    */
175   struct GNUNET_HashCode tm_hash;
176
177 };
178
179 GNUNET_NETWORK_STRUCT_END
180
181
182 /**
183  * Map of peer identities to `struct Session`.
184  */
185 static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
186
187
188 /**
189  * Find the session for the given peer.
190  *
191  * @param peer identity of the peer
192  * @return NULL if we are not connected, otherwise the
193  *         session handle
194  */
195 static struct Session *
196 find_session (const struct GNUNET_PeerIdentity *peer)
197 {
198   if (NULL == sessions)
199     return NULL;
200   return GNUNET_CONTAINER_multipeermap_get (sessions,
201                                             peer);
202 }
203
204
205 /**
206  * End the session with the given peer (we are no longer
207  * connected).
208  *
209  * @param pid identity of peer to kill session with
210  */
211 void
212 GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
213 {
214   struct Session *session;
215   struct GSC_ClientActiveRequest *car;
216   struct SessionMessageEntry *sme;
217
218   session = find_session (pid);
219   if (NULL == session)
220     return;
221   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
222               "Destroying session for peer `%s'\n",
223               GNUNET_i2s (session->peer));
224   if (NULL != session->cork_task)
225   {
226     GNUNET_SCHEDULER_cancel (session->cork_task);
227     session->cork_task = NULL;
228   }
229   while (NULL != (car = session->active_client_request_head))
230   {
231     GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
232                                  session->active_client_request_tail, car);
233     GSC_CLIENTS_reject_request (car,
234                                 GNUNET_NO);
235   }
236   while (NULL != (sme = session->sme_head))
237   {
238     GNUNET_CONTAINER_DLL_remove (session->sme_head,
239                                  session->sme_tail,
240                                  sme);
241     GNUNET_free (sme);
242   }
243   if (NULL != session->typemap_task)
244   {
245     GNUNET_SCHEDULER_cancel (session->typemap_task);
246     session->typemap_task = NULL;
247   }
248   GSC_CLIENTS_notify_clients_about_neighbour (session->peer,
249                                               session->tmap,
250                                               NULL);
251   GNUNET_assert (GNUNET_YES ==
252                  GNUNET_CONTAINER_multipeermap_remove (sessions,
253                                                        session->peer,
254                                                        session));
255   GNUNET_STATISTICS_set (GSC_stats,
256                          gettext_noop ("# peers connected"),
257                          GNUNET_CONTAINER_multipeermap_size (sessions),
258                          GNUNET_NO);
259   GSC_TYPEMAP_destroy (session->tmap);
260   session->tmap = NULL;
261   GNUNET_free (session);
262 }
263
264
265 /**
266  * Transmit our current typemap message to the other peer.
267  * (Done periodically until the typemap is confirmed).
268  *
269  * @param cls the `struct Session *`
270  */
271 static void
272 transmit_typemap_task (void *cls)
273 {
274   struct Session *session = cls;
275   struct GNUNET_MessageHeader *hdr;
276   struct GNUNET_TIME_Relative delay;
277
278   session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay);
279   delay = session->typemap_delay;
280   /* randomize a bit to avoid spont. sync */
281   delay.rel_value_us +=
282       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000);
283   session->typemap_task =
284       GNUNET_SCHEDULER_add_delayed (delay,
285                                     &transmit_typemap_task, session);
286   GNUNET_STATISTICS_update (GSC_stats,
287                             gettext_noop ("# type map refreshes sent"),
288                             1,
289                             GNUNET_NO);
290   hdr = GSC_TYPEMAP_compute_type_map_message ();
291   GSC_KX_encrypt_and_transmit (session->kx,
292                                hdr,
293                                ntohs (hdr->size));
294   GNUNET_free (hdr);
295 }
296
297
298 /**
299  * Restart the typemap task for the given session.
300  *
301  * @param session session to restart typemap transmission for
302  */
303 static void
304 start_typemap_task (struct Session *session)
305 {
306   if (NULL != session->typemap_task)
307     GNUNET_SCHEDULER_cancel (session->typemap_task);
308   session->typemap_delay = GNUNET_TIME_UNIT_SECONDS;
309   session->typemap_task =
310     GNUNET_SCHEDULER_add_delayed (session->typemap_delay,
311                                   &transmit_typemap_task,
312                                   session);
313 }
314
315
316 /**
317  * Create a session, a key exchange was just completed.
318  *
319  * @param peer peer that is now connected
320  * @param kx key exchange that completed
321  */
322 void
323 GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
324                      struct GSC_KeyExchangeInfo *kx)
325 {
326   struct Session *session;
327
328   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329               "Creating session for peer `%4s'\n",
330               GNUNET_i2s (peer));
331   session = GNUNET_new (struct Session);
332   session->tmap = GSC_TYPEMAP_create ();
333   session->peer = peer;
334   session->kx = kx;
335   GNUNET_assert (GNUNET_OK ==
336                  GNUNET_CONTAINER_multipeermap_put (sessions,
337                                                     session->peer,
338                                                     session,
339                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
340   GNUNET_STATISTICS_set (GSC_stats,
341                          gettext_noop ("# peers connected"),
342                          GNUNET_CONTAINER_multipeermap_size (sessions),
343                          GNUNET_NO);
344   GSC_CLIENTS_notify_clients_about_neighbour (peer,
345                                               NULL,
346                                               session->tmap);
347   start_typemap_task (session);
348 }
349
350
351 /**
352  * The other peer has indicated that he 'lost' the session
353  * (KX down), reinitialize the session on our end, in particular
354  * this means to restart the typemap transmission.
355  *
356  * @param peer peer that is now connected
357  */
358 void
359 GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer)
360 {
361   struct Session *session;
362
363   session = find_session (peer);
364   if (NULL == session)
365   {
366     /* KX/session is new for both sides; thus no need to restart what
367        has not yet begun */
368     return;
369   }
370   start_typemap_task (session);
371 }
372
373
374 /**
375  * The other peer has confirmed receiving our type map,
376  * check if it is current and if so, stop retransmitting it.
377  *
378  * @param peer peer that confirmed the type map
379  * @param msg confirmation message we received
380  */
381 void
382 GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer,
383                               const struct GNUNET_MessageHeader *msg)
384 {
385   const struct TypeMapConfirmationMessage *cmsg;
386   struct Session *session;
387
388   session = find_session (peer);
389   if (NULL == session)
390   {
391     GNUNET_break (0);
392     return;
393   }
394   if (ntohs (msg->size) != sizeof (struct TypeMapConfirmationMessage))
395   {
396     GNUNET_break_op (0);
397     return;
398   }
399   cmsg = (const struct TypeMapConfirmationMessage *) msg;
400   if (GNUNET_YES !=
401       GSC_TYPEMAP_check_hash (&cmsg->tm_hash))
402   {
403     /* our typemap has changed in the meantime, do not
404        accept confirmation */
405     GNUNET_STATISTICS_update (GSC_stats,
406                               gettext_noop
407                               ("# outdated typemap confirmations received"),
408                               1, GNUNET_NO);
409     return;
410   }
411   if (NULL != session->typemap_task)
412   {
413     GNUNET_SCHEDULER_cancel (session->typemap_task);
414     session->typemap_task = NULL;
415   }
416   GNUNET_STATISTICS_update (GSC_stats,
417                             gettext_noop
418                             ("# valid typemap confirmations received"),
419                             1, GNUNET_NO);
420 }
421
422
423 /**
424  * Notify the given client about the session (client is new).
425  *
426  * @param cls the `struct GSC_Client`
427  * @param key peer identity
428  * @param value the `struct Session`
429  * @return #GNUNET_OK (continue to iterate)
430  */
431 static int
432 notify_client_about_session (void *cls,
433                              const struct GNUNET_PeerIdentity *key,
434                              void *value)
435 {
436   struct GSC_Client *client = cls;
437   struct Session *session = value;
438
439   GSC_CLIENTS_notify_client_about_neighbour (client,
440                                              session->peer,
441                                              NULL,      /* old TMAP: none */
442                                              session->tmap);
443   return GNUNET_OK;
444 }
445
446
447 /**
448  * We have a new client, notify it about all current sessions.
449  *
450  * @param client the new client
451  */
452 void
453 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
454 {
455   /* notify new client about existing sessions */
456   GNUNET_CONTAINER_multipeermap_iterate (sessions,
457                                          &notify_client_about_session,
458                                          client);
459 }
460
461
462 /**
463  * Try to perform a transmission on the given session.  Will solicit
464  * additional messages if the 'sme' queue is not full enough.
465  *
466  * @param session session to transmit messages from
467  */
468 static void
469 try_transmission (struct Session *session);
470
471
472 /**
473  * Queue a request from a client for transmission to a particular peer.
474  *
475  * @param car request to queue; this handle is then shared between
476  *         the caller (CLIENTS subsystem) and SESSIONS and must not
477  *         be released by either until either #GSC_SESSIONS_dequeue(),
478  *         #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed()
479  *         have been invoked on it
480  */
481 void
482 GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
483 {
484   struct Session *session;
485
486   session = find_session (&car->target);
487   if (NULL == session)
488   {
489     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
490                 "Dropped client request for transmission (am disconnected)\n");
491     GNUNET_break (0);           /* should have been rejected earlier */
492     GSC_CLIENTS_reject_request (car,
493                                 GNUNET_NO);
494     return;
495   }
496   if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
497   {
498     GNUNET_break (0);
499     GSC_CLIENTS_reject_request (car,
500                                 GNUNET_YES);
501     return;
502   }
503   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504               "Received client transmission request. queueing\n");
505   GNUNET_CONTAINER_DLL_insert (session->active_client_request_head,
506                                session->active_client_request_tail,
507                                car);
508   try_transmission (session);
509 }
510
511
512 /**
513  * Dequeue a request from a client from transmission to a particular peer.
514  *
515  * @param car request to dequeue; this handle will then be 'owned' by
516  *        the caller (CLIENTS sysbsystem)
517  */
518 void
519 GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
520 {
521   struct Session *session;
522
523   if (0 ==
524       memcmp (&car->target,
525               &GSC_my_identity,
526               sizeof (struct GNUNET_PeerIdentity)))
527     return;
528   session = find_session (&car->target);
529   GNUNET_assert (NULL != session);
530   GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
531                                session->active_client_request_tail,
532                                car);
533   /* dequeueing of 'high' priority messages may unblock
534      transmission for lower-priority messages, so we also
535      need to try in this case. */
536   try_transmission (session);
537 }
538
539
540 /**
541  * Solicit messages for transmission, starting with those of the highest
542  * priority.
543  *
544  * @param session session to solict messages for
545  * @param msize how many bytes do we have already
546  */
547 static void
548 solicit_messages (struct Session *session,
549                   size_t msize)
550 {
551   struct GSC_ClientActiveRequest *car;
552   struct GSC_ClientActiveRequest *nxt;
553   size_t so_size;
554   enum GNUNET_CORE_Priority pmax;
555
556   so_size = msize;
557   pmax = GNUNET_CORE_PRIO_BACKGROUND;
558   for (car = session->active_client_request_head; NULL != car; car = car->next)
559   {
560     if (GNUNET_YES == car->was_solicited)
561       continue;
562     pmax = GNUNET_MAX (pmax, car->priority);
563   }
564   nxt = session->active_client_request_head;
565   while (NULL != (car = nxt))
566   {
567     nxt = car->next;
568     if (car->priority < pmax)
569       continue;
570     if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
571       break;
572     so_size += car->msize;
573     if (GNUNET_YES == car->was_solicited)
574       continue;
575     car->was_solicited = GNUNET_YES;
576     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
577                 "Soliciting message with priority %u\n",
578                 car->priority);
579     GSC_CLIENTS_solicit_request (car);
580     /* The above call may *dequeue* requests and thereby
581        clobber 'nxt'. Hence we need to restart from the
582        head of the list. */
583     nxt = session->active_client_request_head;
584     so_size = msize;
585   }
586 }
587
588
589 /**
590  * Some messages were delayed (corked), but the timeout has now expired.
591  * Send them now.
592  *
593  * @param cls `struct Session` with the messages to transmit now
594  */
595 static void
596 pop_cork_task (void *cls)
597 {
598   struct Session *session = cls;
599
600   session->cork_task = NULL;
601   try_transmission (session);
602 }
603
604
605 /**
606  * Try to perform a transmission on the given session. Will solicit
607  * additional messages if the 'sme' queue is not full enough or has
608  * only low-priority messages.
609  *
610  * @param session session to transmit messages from
611  */
612 static void
613 try_transmission (struct Session *session)
614 {
615   struct SessionMessageEntry *pos;
616   size_t msize;
617   struct GNUNET_TIME_Absolute now;
618   struct GNUNET_TIME_Absolute min_deadline;
619   enum GNUNET_CORE_Priority maxp;
620   enum GNUNET_CORE_Priority maxpc;
621   struct GSC_ClientActiveRequest *car;
622   int excess;
623
624   if (GNUNET_YES != session->ready_to_transmit)
625   {
626     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
627                 "Not yet ready to transmit, not evaluating queue\n");
628     return;
629   }
630   msize = 0;
631   min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
632   /* if the peer has excess bandwidth, background traffic is allowed,
633      otherwise not */
634   if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <=
635       GSC_NEIGHBOURS_get_queue_length (session->kx))
636   {
637     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638                 "Transmission queue already very long, waiting...\n");
639     return; /* queue already too long */
640   }
641   excess = GSC_NEIGHBOURS_check_excess_bandwidth (session->kx);
642   if (GNUNET_YES == excess)
643     maxp = GNUNET_CORE_PRIO_BACKGROUND;
644   else
645     maxp = GNUNET_CORE_PRIO_BEST_EFFORT;
646   /* determine highest priority of 'ready' messages we already solicited from clients */
647   pos = session->sme_head;
648   while ((NULL != pos) &&
649          (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE))
650   {
651     GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
652     msize += pos->size;
653     maxp = GNUNET_MAX (maxp, pos->priority);
654     min_deadline = GNUNET_TIME_absolute_min (min_deadline,
655                                              pos->deadline);
656     pos = pos->next;
657   }
658   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659               "Calculating transmission set with %u priority (%s) and %s earliest deadline\n",
660               maxp,
661               (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth",
662               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline),
663                                                       GNUNET_YES));
664
665   if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL)
666   {
667     /* if highest already solicited priority from clients is not critical,
668        check if there are higher-priority messages to be solicited from clients */
669     if (GNUNET_YES == excess)
670       maxpc = GNUNET_CORE_PRIO_BACKGROUND;
671     else
672       maxpc = GNUNET_CORE_PRIO_BEST_EFFORT;
673     for (car = session->active_client_request_head; NULL != car; car = car->next)
674     {
675       if (GNUNET_YES == car->was_solicited)
676         continue;
677       maxpc = GNUNET_MAX (maxpc,
678                           car->priority);
679     }
680     if (maxpc > maxp)
681     {
682       /* we have messages waiting for solicitation that have a higher
683          priority than those that we already accepted; solicit the
684          high-priority messages first */
685       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686                   "Soliciting messages based on priority (%u > %u)\n",
687                   maxpc,
688                   maxp);
689       solicit_messages (session, 0);
690       return;
691     }
692   }
693   else
694   {
695     /* never solicit more, we have critical messages to process */
696     excess = GNUNET_NO;
697     maxpc = GNUNET_CORE_PRIO_BACKGROUND;
698   }
699   now = GNUNET_TIME_absolute_get ();
700   if ( ( (GNUNET_YES == excess) ||
701          (maxpc >= GNUNET_CORE_PRIO_BEST_EFFORT) ) &&
702        ( (0 == msize) ||
703          ( (msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) &&
704            (min_deadline.abs_value_us > now.abs_value_us))) )
705   {
706     /* not enough ready yet (tiny message & cork possible), or no messages at all,
707        and either excess bandwidth or best-effort or higher message waiting at
708        client; in this case, we try to solicit more */
709     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710                 "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n",
711                 excess,
712                 maxpc,
713                 (unsigned int) msize,
714                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline),
715                                                         GNUNET_YES));
716     solicit_messages (session,
717                       msize);
718     if (msize > 0)
719     {
720       /* if there is data to send, just not yet, make sure we do transmit
721        * it once the deadline is reached */
722       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
723                   "Corking until %s\n",
724                   GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline),
725                                                           GNUNET_YES));
726       if (NULL != session->cork_task)
727         GNUNET_SCHEDULER_cancel (session->cork_task);
728       session->cork_task =
729           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (min_deadline),
730                                         &pop_cork_task,
731                                         session);
732     }
733     else
734     {
735       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736                   "Queue empty, waiting for solicitations\n");
737     }
738     return;
739   }
740   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
741               "Building combined plaintext buffer to transmit message!\n");
742   /* create plaintext buffer of all messages (that fit), encrypt and
743      transmit */
744   {
745     static unsigned long long total_bytes;
746     static unsigned int total_msgs;
747     char pbuf[msize];           /* plaintext */
748     size_t used;
749
750     used = 0;
751     while ( (NULL != (pos = session->sme_head)) &&
752             (used + pos->size <= msize) )
753     {
754       GNUNET_memcpy (&pbuf[used], &pos[1], pos->size);
755       used += pos->size;
756       GNUNET_CONTAINER_DLL_remove (session->sme_head,
757                                    session->sme_tail,
758                                    pos);
759       GNUNET_free (pos);
760     }
761     /* compute average payload size */
762     total_bytes += used;
763     total_msgs++;
764     if (0 == total_msgs)
765     {
766       /* 2^32 messages, wrap around... */
767       total_msgs = 1;
768       total_bytes = used;
769     }
770     GNUNET_STATISTICS_set (GSC_stats,
771                            "# avg payload per encrypted message",
772                            total_bytes / total_msgs,
773                            GNUNET_NO);
774     /* now actually transmit... */
775     session->ready_to_transmit = GNUNET_NO;
776     GSC_KX_encrypt_and_transmit (session->kx,
777                                  pbuf,
778                                  used);
779   }
780 }
781
782
783 /**
784  * Send an updated typemap message to the neighbour now,
785  * and restart typemap transmissions.
786  *
787  * @param cls the message
788  * @param key neighbour's identity
789  * @param value `struct Neighbour` of the target
790  * @return always #GNUNET_OK
791  */
792 static int
793 do_restart_typemap_message (void *cls,
794                             const struct GNUNET_PeerIdentity *key,
795                             void *value)
796 {
797   const struct GNUNET_MessageHeader *hdr = cls;
798   struct Session *session = value;
799   struct SessionMessageEntry *sme;
800   uint16_t size;
801
802   size = ntohs (hdr->size);
803   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size);
804   GNUNET_memcpy (&sme[1],
805                  hdr,
806                  size);
807   sme->size = size;
808   sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
809   GNUNET_CONTAINER_DLL_insert (session->sme_head,
810                                session->sme_tail,
811                                sme);
812   try_transmission (session);
813   start_typemap_task (session);
814   return GNUNET_OK;
815 }
816
817
818 /**
819  * Broadcast an updated typemap message to all neighbours.
820  * Restarts the retransmissions until the typemaps are confirmed.
821  *
822  * @param msg message to transmit
823  */
824 void
825 GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg)
826 {
827   if (NULL == sessions)
828     return;
829   GNUNET_CONTAINER_multipeermap_iterate (sessions,
830                                          &do_restart_typemap_message,
831                                          (void *) msg);
832 }
833
834
835 /**
836  * Traffic is being solicited for the given peer.  This means that the
837  * message queue on the transport-level (NEIGHBOURS subsystem) is now
838  * empty and it is now OK to transmit another (non-control) message.
839  *
840  * @param pid identity of peer ready to receive data
841  */
842 void
843 GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
844 {
845   struct Session *session;
846
847   session = find_session (pid);
848   if (NULL == session)
849     return;
850   session->ready_to_transmit = GNUNET_YES;
851   try_transmission (session);
852 }
853
854
855 /**
856  * Transmit a message to a particular peer.
857  *
858  * @param car original request that was queued and then solicited;
859  *            this handle will now be 'owned' by the SESSIONS subsystem
860  * @param msg message to transmit
861  * @param cork is corking allowed?
862  * @param priority how important is this message
863  */
864 void
865 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
866                        const struct GNUNET_MessageHeader *msg,
867                        int cork,
868                        enum GNUNET_CORE_Priority priority)
869 {
870   struct Session *session;
871   struct SessionMessageEntry *sme;
872   struct SessionMessageEntry *pos;
873   size_t msize;
874
875   session = find_session (&car->target);
876   if (NULL == session)
877     return;
878   msize = ntohs (msg->size);
879   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize);
880   GNUNET_memcpy (&sme[1],
881                  msg,
882                  msize);
883   sme->size = msize;
884   sme->priority = priority;
885   if (GNUNET_YES == cork)
886   {
887     sme->deadline =
888         GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
889     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890                 "Mesage corked, delaying transmission\n");
891   }
892   pos = session->sme_head;
893   while ( (NULL != pos) &&
894           (pos->priority >= sme->priority) )
895     pos = pos->next;
896   if (NULL == pos)
897     GNUNET_CONTAINER_DLL_insert_tail (session->sme_head,
898                                       session->sme_tail,
899                                       sme);
900   else
901     GNUNET_CONTAINER_DLL_insert_after (session->sme_head,
902                                        session->sme_tail,
903                                        pos->prev,
904                                        sme);
905   try_transmission (session);
906 }
907
908
909 /**
910  * We have received a typemap message from a peer, update ours.
911  * Notifies clients about the session.
912  *
913  * @param peer peer this is about
914  * @param msg typemap update message
915  */
916 void
917 GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer,
918                           const struct GNUNET_MessageHeader *msg)
919 {
920   struct Session *session;
921   struct GSC_TypeMap *nmap;
922   struct SessionMessageEntry *sme;
923   struct TypeMapConfirmationMessage *tmc;
924
925   nmap = GSC_TYPEMAP_get_from_message (msg);
926   if (NULL == nmap)
927     return;                     /* malformed */
928   session = find_session (peer);
929   if (NULL == session)
930   {
931     GNUNET_break (0);
932     return;
933   }
934   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) +
935                        sizeof (struct TypeMapConfirmationMessage));
936   sme->deadline = GNUNET_TIME_absolute_get ();
937   sme->size = sizeof (struct TypeMapConfirmationMessage);
938   sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
939   tmc = (struct TypeMapConfirmationMessage *) &sme[1];
940   tmc->header.size = htons (sizeof (struct TypeMapConfirmationMessage));
941   tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP);
942   tmc->reserved = htonl (0);
943   GSC_TYPEMAP_hash (nmap,
944                     &tmc->tm_hash);
945   GNUNET_CONTAINER_DLL_insert (session->sme_head,
946                                session->sme_tail,
947                                sme);
948   try_transmission (session);
949   GSC_CLIENTS_notify_clients_about_neighbour (peer,
950                                               session->tmap,
951                                               nmap);
952   GSC_TYPEMAP_destroy (session->tmap);
953   session->tmap = nmap;
954 }
955
956
957 /**
958  * The given peer send a message of the specified type.  Make sure the
959  * respective bit is set in its type-map and that clients are notified
960  * about the session.
961  *
962  * @param peer peer this is about
963  * @param type type of the message
964  */
965 void
966 GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer,
967                              uint16_t type)
968 {
969   struct Session *session;
970   struct GSC_TypeMap *nmap;
971
972   if (0 == memcmp (peer,
973                    &GSC_my_identity,
974                    sizeof (struct GNUNET_PeerIdentity)))
975     return;
976   session = find_session (peer);
977   GNUNET_assert (NULL != session);
978   if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1))
979     return;                     /* already in it */
980   nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1);
981   GSC_CLIENTS_notify_clients_about_neighbour (peer,
982                                               session->tmap, nmap);
983   GSC_TYPEMAP_destroy (session->tmap);
984   session->tmap = nmap;
985 }
986
987
988 /**
989  * Initialize sessions subsystem.
990  */
991 void
992 GSC_SESSIONS_init ()
993 {
994   sessions = GNUNET_CONTAINER_multipeermap_create (128,
995                                                    GNUNET_YES);
996 }
997
998
999 /**
1000  * Helper function for #GSC_SESSIONS_done() to free all
1001  * active sessions.
1002  *
1003  * @param cls NULL
1004  * @param key identity of the connected peer
1005  * @param value the `struct Session` for the peer
1006  * @return #GNUNET_OK (continue to iterate)
1007  */
1008 static int
1009 free_session_helper (void *cls,
1010                      const struct GNUNET_PeerIdentity *key,
1011                      void *value)
1012 {
1013   /* struct Session *session = value; */
1014
1015   GSC_SESSIONS_end (key);
1016   return GNUNET_OK;
1017 }
1018
1019
1020 /**
1021  * Shutdown sessions subsystem.
1022  */
1023 void
1024 GSC_SESSIONS_done ()
1025 {
1026   if (NULL != sessions)
1027   {
1028     GNUNET_CONTAINER_multipeermap_iterate (sessions,
1029                                            &free_session_helper,
1030                                            NULL);
1031     GNUNET_CONTAINER_multipeermap_destroy (sessions);
1032     sessions = NULL;
1033   }
1034 }
1035
1036 /* end of gnunet-service-core_sessions.c */