convert fs publish to MQ
[oweals/gnunet.git] / src / core / gnunet-service-core_sessions.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_sessions.c
23  * @brief code for managing of 'encrypted' sessions (key exchange done)
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-core.h"
28 #include "gnunet-service-core_neighbours.h"
29 #include "gnunet-service-core_kx.h"
30 #include "gnunet-service-core_typemap.h"
31 #include "gnunet-service-core_sessions.h"
32 #include "gnunet-service-core_clients.h"
33 #include "gnunet_constants.h"
34 #include "core.h"
35
36
37 /**
38  * How many encrypted messages do we queue at most?
39  * Needed to bound memory consumption.
40  */
41 #define MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE 4
42
43
44 /**
45  * Message ready for encryption.  This struct is followed by the
46  * actual content of the message.
47  */
48 struct SessionMessageEntry
49 {
50
51   /**
52    * We keep messages in a doubly linked list.
53    */
54   struct SessionMessageEntry *next;
55
56   /**
57    * We keep messages in a doubly linked list.
58    */
59   struct SessionMessageEntry *prev;
60
61   /**
62    * Deadline for transmission, 1s after we received it (if we
63    * are not corking), otherwise "now".  Note that this message
64    * does NOT expire past its deadline.
65    */
66   struct GNUNET_TIME_Absolute deadline;
67
68   /**
69    * How long is the message? (number of bytes following the `struct
70    * MessageEntry`, but not including the size of `struct
71    * MessageEntry` itself!)
72    */
73   size_t size;
74
75   /**
76    * How important is this message.
77    */
78   enum GNUNET_CORE_Priority priority;
79
80 };
81
82
83 /**
84  * Data kept per session.
85  */
86 struct Session
87 {
88   /**
89    * Identity of the other peer.
90    */
91   struct GNUNET_PeerIdentity peer;
92
93   /**
94    * Head of list of requests from clients for transmission to
95    * this peer.
96    */
97   struct GSC_ClientActiveRequest *active_client_request_head;
98
99   /**
100    * Tail of list of requests from clients for transmission to
101    * this peer.
102    */
103   struct GSC_ClientActiveRequest *active_client_request_tail;
104
105   /**
106    * Head of list of messages ready for encryption.
107    */
108   struct SessionMessageEntry *sme_head;
109
110   /**
111    * Tail of list of messages ready for encryption.
112    */
113   struct SessionMessageEntry *sme_tail;
114
115   /**
116    * Information about the key exchange with the other peer.
117    */
118   struct GSC_KeyExchangeInfo *kxinfo;
119
120   /**
121    * Current type map for this peer.
122    */
123   struct GSC_TypeMap *tmap;
124
125   /**
126    * Task to transmit corked messages with a delay.
127    */
128   struct GNUNET_SCHEDULER_Task *cork_task;
129
130   /**
131    * Task to transmit our type map.
132    */
133   struct GNUNET_SCHEDULER_Task *typemap_task;
134
135   /**
136    * Retransmission delay we currently use for the typemap
137    * transmissions (if not confirmed).
138    */
139   struct GNUNET_TIME_Relative typemap_delay;
140
141   /**
142    * Is the neighbour queue empty and thus ready for us
143    * to transmit an encrypted message?
144    */
145   int ready_to_transmit;
146
147   /**
148    * Is this the first time we're sending the typemap? If so,
149    * we want to send it a bit faster the second time.  0 if
150    * we are sending for the first time, 1 if not.
151    */
152   int first_typemap;
153 };
154
155
156 GNUNET_NETWORK_STRUCT_BEGIN
157
158 /**
159  * Message sent to confirm that a typemap was received.
160  */
161 struct TypeMapConfirmationMessage
162 {
163
164   /**
165    * Header with type #GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP.
166    */
167   struct GNUNET_MessageHeader header;
168
169   /**
170    * Reserved, always zero.
171    */
172   uint32_t reserved GNUNET_PACKED;
173
174   /**
175    * Hash of the (decompressed) type map that was received.
176    */
177   struct GNUNET_HashCode tm_hash;
178
179 };
180
181 GNUNET_NETWORK_STRUCT_END
182
183
184 /**
185  * Map of peer identities to `struct Session`.
186  */
187 static struct GNUNET_CONTAINER_MultiPeerMap *sessions;
188
189
190 /**
191  * Find the session for the given peer.
192  *
193  * @param peer identity of the peer
194  * @return NULL if we are not connected, otherwise the
195  *         session handle
196  */
197 static struct Session *
198 find_session (const struct GNUNET_PeerIdentity *peer)
199 {
200   return GNUNET_CONTAINER_multipeermap_get (sessions, peer);
201 }
202
203
204 /**
205  * End the session with the given peer (we are no longer
206  * connected).
207  *
208  * @param pid identity of peer to kill session with
209  */
210 void
211 GSC_SESSIONS_end (const struct GNUNET_PeerIdentity *pid)
212 {
213   struct Session *session;
214   struct GSC_ClientActiveRequest *car;
215   struct SessionMessageEntry *sme;
216
217   session = find_session (pid);
218   if (NULL == session)
219     return;
220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
221               "Destroying session for peer `%4s'\n",
222               GNUNET_i2s (&session->peer));
223   if (NULL != session->cork_task)
224   {
225     GNUNET_SCHEDULER_cancel (session->cork_task);
226     session->cork_task = NULL;
227   }
228   while (NULL != (car = session->active_client_request_head))
229   {
230     GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
231                                  session->active_client_request_tail, car);
232     GSC_CLIENTS_reject_request (car,
233                                 GNUNET_NO);
234   }
235   while (NULL != (sme = session->sme_head))
236   {
237     GNUNET_CONTAINER_DLL_remove (session->sme_head,
238                                  session->sme_tail,
239                                  sme);
240     GNUNET_free (sme);
241   }
242   if (NULL != session->typemap_task)
243   {
244     GNUNET_SCHEDULER_cancel (session->typemap_task);
245     session->typemap_task = NULL;
246   }
247   GSC_CLIENTS_notify_clients_about_neighbour (&session->peer,
248                                               session->tmap, NULL);
249   GNUNET_assert (GNUNET_YES ==
250                  GNUNET_CONTAINER_multipeermap_remove (sessions,
251                                                        &session->peer,
252                                                        session));
253   GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"),
254                          GNUNET_CONTAINER_multipeermap_size (sessions),
255                          GNUNET_NO);
256   GSC_TYPEMAP_destroy (session->tmap);
257   session->tmap = NULL;
258   GNUNET_free (session);
259 }
260
261
262 /**
263  * Transmit our current typemap message to the other peer.
264  * (Done periodically until the typemap is confirmed).
265  *
266  * @param cls the `struct Session *`
267  */
268 static void
269 transmit_typemap_task (void *cls)
270 {
271   struct Session *session = cls;
272   struct GNUNET_MessageHeader *hdr;
273   struct GNUNET_TIME_Relative delay;
274
275   session->typemap_delay = GNUNET_TIME_STD_BACKOFF (session->typemap_delay);
276   delay = session->typemap_delay;
277   /* randomize a bit to avoid spont. sync */
278   delay.rel_value_us +=
279       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1000 * 1000);
280   session->typemap_task =
281       GNUNET_SCHEDULER_add_delayed (delay,
282                                     &transmit_typemap_task, session);
283   GNUNET_STATISTICS_update (GSC_stats,
284                             gettext_noop ("# type map refreshes sent"),
285                             1,
286                             GNUNET_NO);
287   hdr = GSC_TYPEMAP_compute_type_map_message ();
288   GSC_KX_encrypt_and_transmit (session->kxinfo,
289                                hdr,
290                                ntohs (hdr->size));
291   GNUNET_free (hdr);
292 }
293
294
295 /**
296  * Restart the typemap task for the given session.
297  *
298  * @param session session to restart typemap transmission for
299  */
300 static void
301 start_typemap_task (struct Session *session)
302 {
303   if (NULL != session->typemap_task)
304     GNUNET_SCHEDULER_cancel (session->typemap_task);
305   session->typemap_delay = GNUNET_TIME_UNIT_SECONDS;
306   session->typemap_task =
307     GNUNET_SCHEDULER_add_delayed (session->typemap_delay,
308                                   &transmit_typemap_task,
309                                   session);
310 }
311
312
313 /**
314  * Create a session, a key exchange was just completed.
315  *
316  * @param peer peer that is now connected
317  * @param kx key exchange that completed
318  */
319 void
320 GSC_SESSIONS_create (const struct GNUNET_PeerIdentity *peer,
321                      struct GSC_KeyExchangeInfo *kx)
322 {
323   struct Session *session;
324
325   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
326               "Creating session for peer `%4s'\n",
327               GNUNET_i2s (peer));
328   session = GNUNET_new (struct Session);
329   session->tmap = GSC_TYPEMAP_create ();
330   session->peer = *peer;
331   session->kxinfo = kx;
332   GNUNET_assert (GNUNET_OK ==
333                  GNUNET_CONTAINER_multipeermap_put (sessions,
334                                                     &session->peer,
335                                                     session,
336                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
337   GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# peers connected"),
338                          GNUNET_CONTAINER_multipeermap_size (sessions),
339                          GNUNET_NO);
340   GSC_CLIENTS_notify_clients_about_neighbour (peer,
341                                               NULL,
342                                               session->tmap);
343   start_typemap_task (session);
344 }
345
346
347 /**
348  * The other peer has indicated that he 'lost' the session
349  * (KX down), reinitialize the session on our end, in particular
350  * this means to restart the typemap transmission.
351  *
352  * @param peer peer that is now connected
353  */
354 void
355 GSC_SESSIONS_reinit (const struct GNUNET_PeerIdentity *peer)
356 {
357   struct Session *session;
358
359   session = find_session (peer);
360   if (NULL == session)
361   {
362     /* KX/session is new for both sides; thus no need to restart what
363        has not yet begun */
364     return;
365   }
366   start_typemap_task (session);
367 }
368
369
370 /**
371  * The other peer has confirmed receiving our type map,
372  * check if it is current and if so, stop retransmitting it.
373  *
374  * @param peer peer that confirmed the type map
375  * @param msg confirmation message we received
376  */
377 void
378 GSC_SESSIONS_confirm_typemap (const struct GNUNET_PeerIdentity *peer,
379                               const struct GNUNET_MessageHeader *msg)
380 {
381   const struct TypeMapConfirmationMessage *cmsg;
382   struct Session *session;
383
384   session = find_session (peer);
385   if (NULL == session)
386   {
387     GNUNET_break (0);
388     return;
389   }
390   if (ntohs (msg->size) != sizeof (struct TypeMapConfirmationMessage))
391   {
392     GNUNET_break_op (0);
393     return;
394   }
395   cmsg = (const struct TypeMapConfirmationMessage *) msg;
396   if (GNUNET_YES !=
397       GSC_TYPEMAP_check_hash (&cmsg->tm_hash))
398   {
399     /* our typemap has changed in the meantime, do not
400        accept confirmation */
401     GNUNET_STATISTICS_update (GSC_stats,
402                               gettext_noop
403                               ("# outdated typemap confirmations received"),
404                               1, GNUNET_NO);
405     return;
406   }
407   if (NULL != session->typemap_task)
408   {
409     GNUNET_SCHEDULER_cancel (session->typemap_task);
410     session->typemap_task = NULL;
411   }
412   GNUNET_STATISTICS_update (GSC_stats,
413                             gettext_noop
414                             ("# valid typemap confirmations received"),
415                             1, GNUNET_NO);
416 }
417
418
419 /**
420  * Notify the given client about the session (client is new).
421  *
422  * @param cls the `struct GSC_Client`
423  * @param key peer identity
424  * @param value the `struct Session`
425  * @return #GNUNET_OK (continue to iterate)
426  */
427 static int
428 notify_client_about_session (void *cls,
429                              const struct GNUNET_PeerIdentity *key,
430                              void *value)
431 {
432   struct GSC_Client *client = cls;
433   struct Session *session = value;
434
435   GSC_CLIENTS_notify_client_about_neighbour (client,
436                                              &session->peer,
437                                              NULL,      /* old TMAP: none */
438                                              session->tmap);
439   return GNUNET_OK;
440 }
441
442
443 /**
444  * We have a new client, notify it about all current sessions.
445  *
446  * @param client the new client
447  */
448 void
449 GSC_SESSIONS_notify_client_about_sessions (struct GSC_Client *client)
450 {
451   /* notify new client about existing sessions */
452   GNUNET_CONTAINER_multipeermap_iterate (sessions,
453                                          &notify_client_about_session,
454                                          client);
455 }
456
457
458 /**
459  * Try to perform a transmission on the given session.  Will solicit
460  * additional messages if the 'sme' queue is not full enough.
461  *
462  * @param session session to transmit messages from
463  */
464 static void
465 try_transmission (struct Session *session);
466
467
468 /**
469  * Queue a request from a client for transmission to a particular peer.
470  *
471  * @param car request to queue; this handle is then shared between
472  *         the caller (CLIENTS subsystem) and SESSIONS and must not
473  *         be released by either until either #GSC_SESSIONS_dequeue(),
474  *         #GSC_SESSIONS_transmit() or #GSC_CLIENTS_failed()
475  *         have been invoked on it
476  */
477 void
478 GSC_SESSIONS_queue_request (struct GSC_ClientActiveRequest *car)
479 {
480   struct Session *session;
481
482   session = find_session (&car->target);
483   if (NULL == session)
484   {
485     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
486                 "Dropped client request for transmission (am disconnected)\n");
487     GNUNET_break (0);           /* should have been rejected earlier */
488     GSC_CLIENTS_reject_request (car,
489                                 GNUNET_NO);
490     return;
491   }
492   if (car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
493   {
494     GNUNET_break (0);
495     GSC_CLIENTS_reject_request (car,
496                                 GNUNET_YES);
497     return;
498   }
499   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500               "Received client transmission request. queueing\n");
501   GNUNET_CONTAINER_DLL_insert (session->active_client_request_head,
502                                session->active_client_request_tail,
503                                car);
504   try_transmission (session);
505 }
506
507
508 /**
509  * Dequeue a request from a client from transmission to a particular peer.
510  *
511  * @param car request to dequeue; this handle will then be 'owned' by
512  *        the caller (CLIENTS sysbsystem)
513  */
514 void
515 GSC_SESSIONS_dequeue_request (struct GSC_ClientActiveRequest *car)
516 {
517   struct Session *session;
518
519   if (0 ==
520       memcmp (&car->target,
521               &GSC_my_identity,
522               sizeof (struct GNUNET_PeerIdentity)))
523     return;
524   session = find_session (&car->target);
525   GNUNET_assert (NULL != session);
526   GNUNET_CONTAINER_DLL_remove (session->active_client_request_head,
527                                session->active_client_request_tail,
528                                car);
529   /* dequeueing of 'high' priority messages may unblock
530      transmission for lower-priority messages, so we also
531      need to try in this case. */
532   try_transmission (session);
533 }
534
535
536 /**
537  * Solicit messages for transmission, starting with those of the highest
538  * priority.
539  *
540  * @param session session to solict messages for
541  * @param msize how many bytes do we have already
542  */
543 static void
544 solicit_messages (struct Session *session,
545                   size_t msize)
546 {
547   struct GSC_ClientActiveRequest *car;
548   struct GSC_ClientActiveRequest *nxt;
549   size_t so_size;
550   enum GNUNET_CORE_Priority pmax;
551
552   so_size = msize;
553   pmax = GNUNET_CORE_PRIO_BACKGROUND;
554   for (car = session->active_client_request_head; NULL != car; car = car->next)
555   {
556     if (GNUNET_YES == car->was_solicited)
557       continue;
558     pmax = GNUNET_MAX (pmax, car->priority);
559   }
560   nxt = session->active_client_request_head;
561   while (NULL != (car = nxt))
562   {
563     nxt = car->next;
564     if (car->priority < pmax)
565       continue;
566     if (so_size + car->msize > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
567       break;
568     so_size += car->msize;
569     if (GNUNET_YES == car->was_solicited)
570       continue;
571     car->was_solicited = GNUNET_YES;
572     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573                 "Soliciting message with priority %u\n",
574                 car->priority);
575     GSC_CLIENTS_solicit_request (car);
576     /* The above call may *dequeue* requests and thereby
577        clobber 'nxt'. Hence we need to restart from the
578        head of the list. */
579     nxt = session->active_client_request_head;
580     so_size = msize;
581   }
582 }
583
584
585 /**
586  * Some messages were delayed (corked), but the timeout has now expired.
587  * Send them now.
588  *
589  * @param cls `struct Session` with the messages to transmit now
590  */
591 static void
592 pop_cork_task (void *cls)
593 {
594   struct Session *session = cls;
595
596   session->cork_task = NULL;
597   try_transmission (session);
598 }
599
600
601 /**
602  * Try to perform a transmission on the given session. Will solicit
603  * additional messages if the 'sme' queue is not full enough or has
604  * only low-priority messages.
605  *
606  * @param session session to transmit messages from
607  */
608 static void
609 try_transmission (struct Session *session)
610 {
611   struct SessionMessageEntry *pos;
612   size_t msize;
613   struct GNUNET_TIME_Absolute now;
614   struct GNUNET_TIME_Absolute min_deadline;
615   enum GNUNET_CORE_Priority maxp;
616   enum GNUNET_CORE_Priority maxpc;
617   struct GSC_ClientActiveRequest *car;
618   int excess;
619
620   if (GNUNET_YES != session->ready_to_transmit)
621   {
622     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623                 "Already ready to transmit, not evaluating queue\n");
624     return;
625   }
626   msize = 0;
627   min_deadline = GNUNET_TIME_UNIT_FOREVER_ABS;
628   /* if the peer has excess bandwidth, background traffic is allowed,
629      otherwise not */
630   if (MAX_ENCRYPTED_MESSAGE_QUEUE_SIZE <=
631       GSC_NEIGHBOURS_get_queue_size (&session->peer))
632   {
633     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634                 "Transmission queue already very long, waiting...\n");
635     return; /* queue already too long */
636   }
637   excess = GSC_NEIGHBOURS_check_excess_bandwidth (&session->peer);
638   if (GNUNET_YES == excess)
639     maxp = GNUNET_CORE_PRIO_BACKGROUND;
640   else
641     maxp = GNUNET_CORE_PRIO_BEST_EFFORT;
642   /* determine highest priority of 'ready' messages we already solicited from clients */
643   pos = session->sme_head;
644   while ((NULL != pos) &&
645          (msize + pos->size <= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE))
646   {
647     GNUNET_assert (pos->size < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE);
648     msize += pos->size;
649     maxp = GNUNET_MAX (maxp, pos->priority);
650     min_deadline = GNUNET_TIME_absolute_min (min_deadline,
651                                              pos->deadline);
652     pos = pos->next;
653   }
654   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
655               "Calculating transmission set with %u priority (%s) and %s earliest deadline\n",
656               maxp,
657               (GNUNET_YES == excess) ? "excess bandwidth" : "limited bandwidth",
658               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline),
659                                                       GNUNET_YES));
660
661   if (maxp < GNUNET_CORE_PRIO_CRITICAL_CONTROL)
662   {
663     /* if highest already solicited priority from clients is not critical,
664        check if there are higher-priority messages to be solicited from clients */
665     if (GNUNET_YES == excess)
666       maxpc = GNUNET_CORE_PRIO_BACKGROUND;
667     else
668       maxpc = GNUNET_CORE_PRIO_BEST_EFFORT;
669     for (car = session->active_client_request_head; NULL != car; car = car->next)
670     {
671       if (GNUNET_YES == car->was_solicited)
672         continue;
673       maxpc = GNUNET_MAX (maxpc,
674                           car->priority);
675     }
676     if (maxpc > maxp)
677     {
678       /* we have messages waiting for solicitation that have a higher
679          priority than those that we already accepted; solicit the
680          high-priority messages first */
681       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
682                   "Soliciting messages based on priority (%u > %u)\n",
683                   maxpc,
684                   maxp);
685       solicit_messages (session, 0);
686       return;
687     }
688   }
689   else
690   {
691     /* never solicit more, we have critical messages to process */
692     excess = GNUNET_NO;
693     maxpc = GNUNET_CORE_PRIO_BACKGROUND;
694   }
695   now = GNUNET_TIME_absolute_get ();
696   if ( ( (GNUNET_YES == excess) ||
697          (maxpc >= GNUNET_CORE_PRIO_BEST_EFFORT) ) &&
698        ( (0 == msize) ||
699          ( (msize < GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE / 2) &&
700            (min_deadline.abs_value_us > now.abs_value_us))) )
701   {
702     /* not enough ready yet (tiny message & cork possible), or no messages at all,
703        and either excess bandwidth or best-effort or higher message waiting at
704        client; in this case, we try to solicit more */
705     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
706                 "Soliciting messages (excess %d, maxpc %d, message size %u, deadline %s)\n",
707                 excess,
708                 maxpc,
709                 (unsigned int) msize,
710                 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline),
711                                                         GNUNET_YES));
712     solicit_messages (session,
713                       msize);
714     if (msize > 0)
715     {
716       /* if there is data to send, just not yet, make sure we do transmit
717        * it once the deadline is reached */
718       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719                   "Corking until %s\n",
720                   GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_deadline),
721                                                           GNUNET_YES));
722       if (NULL != session->cork_task)
723         GNUNET_SCHEDULER_cancel (session->cork_task);
724       session->cork_task =
725           GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (min_deadline),
726                                         &pop_cork_task,
727                                         session);
728     }
729     else
730     {
731       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
732                   "Queue empty, waiting for solicitations\n");
733     }
734     return;
735   }
736   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
737               "Building combined plaintext buffer to transmit message!\n");
738   /* create plaintext buffer of all messages (that fit), encrypt and
739      transmit */
740   {
741     static unsigned long long total_bytes;
742     static unsigned int total_msgs;
743     char pbuf[msize];           /* plaintext */
744     size_t used;
745
746     used = 0;
747     while ( (NULL != (pos = session->sme_head)) &&
748             (used + pos->size <= msize) )
749     {
750       memcpy (&pbuf[used], &pos[1], pos->size);
751       used += pos->size;
752       GNUNET_CONTAINER_DLL_remove (session->sme_head,
753                                    session->sme_tail,
754                                    pos);
755       GNUNET_free (pos);
756     }
757     /* compute average payload size */
758     total_bytes += used;
759     total_msgs++;
760     if (0 == total_msgs)
761     {
762       /* 2^32 messages, wrap around... */
763       total_msgs = 1;
764       total_bytes = used;
765     }
766     GNUNET_STATISTICS_set (GSC_stats,
767                            "# avg payload per encrypted message",
768                            total_bytes / total_msgs,
769                            GNUNET_NO);
770     /* now actually transmit... */
771     session->ready_to_transmit = GNUNET_NO;
772     GSC_KX_encrypt_and_transmit (session->kxinfo,
773                                  pbuf,
774                                  used);
775   }
776 }
777
778
779 /**
780  * Send an updated typemap message to the neighbour now,
781  * and restart typemap transmissions.
782  *
783  * @param cls the message
784  * @param key neighbour's identity
785  * @param value `struct Neighbour` of the target
786  * @return always #GNUNET_OK
787  */
788 static int
789 do_restart_typemap_message (void *cls,
790                             const struct GNUNET_PeerIdentity *key,
791                             void *value)
792 {
793   const struct GNUNET_MessageHeader *hdr = cls;
794   struct Session *session = value;
795   struct SessionMessageEntry *sme;
796   uint16_t size;
797
798   size = ntohs (hdr->size);
799   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + size);
800   memcpy (&sme[1], hdr, size);
801   sme->size = size;
802   sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
803   GNUNET_CONTAINER_DLL_insert (session->sme_head,
804                                session->sme_tail,
805                                sme);
806   try_transmission (session);
807   start_typemap_task (session);
808   return GNUNET_OK;
809 }
810
811
812 /**
813  * Broadcast an updated typemap message to all neighbours.
814  * Restarts the retransmissions until the typemaps are confirmed.
815  *
816  * @param msg message to transmit
817  */
818 void
819 GSC_SESSIONS_broadcast_typemap (const struct GNUNET_MessageHeader *msg)
820 {
821   if (NULL == sessions)
822     return;
823   GNUNET_CONTAINER_multipeermap_iterate (sessions,
824                                          &do_restart_typemap_message,
825                                          (void *) msg);
826 }
827
828
829 /**
830  * Traffic is being solicited for the given peer.  This means that the
831  * message queue on the transport-level (NEIGHBOURS subsystem) is now
832  * empty and it is now OK to transmit another (non-control) message.
833  *
834  * @param pid identity of peer ready to receive data
835  */
836 void
837 GSC_SESSIONS_solicit (const struct GNUNET_PeerIdentity *pid)
838 {
839   struct Session *session;
840
841   session = find_session (pid);
842   if (NULL == session)
843     return;
844   session->ready_to_transmit = GNUNET_YES;
845   try_transmission (session);
846 }
847
848
849 /**
850  * Transmit a message to a particular peer.
851  *
852  * @param car original request that was queued and then solicited;
853  *            this handle will now be 'owned' by the SESSIONS subsystem
854  * @param msg message to transmit
855  * @param cork is corking allowed?
856  * @param priority how important is this message
857  */
858 void
859 GSC_SESSIONS_transmit (struct GSC_ClientActiveRequest *car,
860                        const struct GNUNET_MessageHeader *msg,
861                        int cork,
862                        enum GNUNET_CORE_Priority priority)
863 {
864   struct Session *session;
865   struct SessionMessageEntry *sme;
866   struct SessionMessageEntry *pos;
867   size_t msize;
868
869   session = find_session (&car->target);
870   if (NULL == session)
871     return;
872   msize = ntohs (msg->size);
873   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) + msize);
874   memcpy (&sme[1], msg, msize);
875   sme->size = msize;
876   sme->priority = priority;
877   if (GNUNET_YES == cork)
878     sme->deadline =
879         GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_MAX_CORK_DELAY);
880   pos = session->sme_head;
881   while ( (NULL != pos) &&
882           (pos->priority >= sme->priority) )
883     pos = pos->next;
884   if (NULL == pos)
885     GNUNET_CONTAINER_DLL_insert_tail (session->sme_head,
886                                       session->sme_tail,
887                                       sme);
888   else
889     GNUNET_CONTAINER_DLL_insert_after (session->sme_head,
890                                        session->sme_tail,
891                                        pos->prev,
892                                        sme);
893   try_transmission (session);
894 }
895
896
897 /**
898  * We have received a typemap message from a peer, update ours.
899  * Notifies clients about the session.
900  *
901  * @param peer peer this is about
902  * @param msg typemap update message
903  */
904 void
905 GSC_SESSIONS_set_typemap (const struct GNUNET_PeerIdentity *peer,
906                           const struct GNUNET_MessageHeader *msg)
907 {
908   struct Session *session;
909   struct GSC_TypeMap *nmap;
910   struct SessionMessageEntry *sme;
911   struct TypeMapConfirmationMessage *tmc;
912
913   nmap = GSC_TYPEMAP_get_from_message (msg);
914   if (NULL == nmap)
915     return;                     /* malformed */
916   session = find_session (peer);
917   if (NULL == session)
918   {
919     GNUNET_break (0);
920     return;
921   }
922   sme = GNUNET_malloc (sizeof (struct SessionMessageEntry) +
923                        sizeof (struct TypeMapConfirmationMessage));
924   sme->deadline = GNUNET_TIME_absolute_get ();
925   sme->size = sizeof (struct TypeMapConfirmationMessage);
926   sme->priority = GNUNET_CORE_PRIO_CRITICAL_CONTROL;
927   tmc = (struct TypeMapConfirmationMessage *) &sme[1];
928   tmc->header.size = htons (sizeof (struct TypeMapConfirmationMessage));
929   tmc->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_CONFIRM_TYPE_MAP);
930   tmc->reserved = htonl (0);
931   GSC_TYPEMAP_hash (nmap,
932                     &tmc->tm_hash);
933   GNUNET_CONTAINER_DLL_insert (session->sme_head,
934                                session->sme_tail,
935                                sme);
936   try_transmission (session);
937   GSC_CLIENTS_notify_clients_about_neighbour (peer,
938                                               session->tmap,
939                                               nmap);
940   GSC_TYPEMAP_destroy (session->tmap);
941   session->tmap = nmap;
942 }
943
944
945 /**
946  * The given peer send a message of the specified type.  Make sure the
947  * respective bit is set in its type-map and that clients are notified
948  * about the session.
949  *
950  * @param peer peer this is about
951  * @param type type of the message
952  */
953 void
954 GSC_SESSIONS_add_to_typemap (const struct GNUNET_PeerIdentity *peer,
955                              uint16_t type)
956 {
957   struct Session *session;
958   struct GSC_TypeMap *nmap;
959
960   if (0 == memcmp (peer,
961                    &GSC_my_identity,
962                    sizeof (struct GNUNET_PeerIdentity)))
963     return;
964   session = find_session (peer);
965   GNUNET_assert (NULL != session);
966   if (GNUNET_YES == GSC_TYPEMAP_test_match (session->tmap, &type, 1))
967     return;                     /* already in it */
968   nmap = GSC_TYPEMAP_extend (session->tmap, &type, 1);
969   GSC_CLIENTS_notify_clients_about_neighbour (peer,
970                                               session->tmap, nmap);
971   GSC_TYPEMAP_destroy (session->tmap);
972   session->tmap = nmap;
973 }
974
975
976 /**
977  * Initialize sessions subsystem.
978  */
979 void
980 GSC_SESSIONS_init ()
981 {
982   sessions = GNUNET_CONTAINER_multipeermap_create (128,
983                                                    GNUNET_YES);
984 }
985
986
987 /**
988  * Helper function for #GSC_SESSIONS_done() to free all
989  * active sessions.
990  *
991  * @param cls NULL
992  * @param key identity of the connected peer
993  * @param value the `struct Session` for the peer
994  * @return #GNUNET_OK (continue to iterate)
995  */
996 static int
997 free_session_helper (void *cls,
998                      const struct GNUNET_PeerIdentity *key,
999                      void *value)
1000 {
1001   struct Session *session = value;
1002
1003   GSC_SESSIONS_end (&session->peer);
1004   return GNUNET_OK;
1005 }
1006
1007
1008 /**
1009  * Shutdown sessions subsystem.
1010  */
1011 void
1012 GSC_SESSIONS_done ()
1013 {
1014   if (NULL != sessions)
1015   {
1016     GNUNET_CONTAINER_multipeermap_iterate (sessions,
1017                                            &free_session_helper, NULL);
1018     GNUNET_CONTAINER_multipeermap_destroy (sessions);
1019     sessions = NULL;
1020   }
1021 }
1022
1023 /* end of gnunet-service-core_sessions.c */