fix #3709: bound encrypted message queue
[oweals/gnunet.git] / src / core / gnunet-service-core_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_neighbours.c
23  * @brief code for managing low-level 'plaintext' connections with transport (key exchange may or may not be done yet)
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet_transport_service.h"
30 #include "gnunet-service-core.h"
31 #include "gnunet-service-core_neighbours.h"
32 #include "gnunet-service-core_kx.h"
33 #include "gnunet-service-core_sessions.h"
34 #include "gnunet_constants.h"
35
36
37 /**
38  * Message ready for transmission via transport service.  This struct
39  * is followed by the actual content of the message.
40  */
41 struct NeighbourMessageEntry
42 {
43
44   /**
45    * We keep messages in a doubly linked list.
46    */
47   struct NeighbourMessageEntry *next;
48
49   /**
50    * We keep messages in a doubly linked list.
51    */
52   struct NeighbourMessageEntry *prev;
53
54   /**
55    * By when are we supposed to transmit this message?
56    */
57   struct GNUNET_TIME_Absolute deadline;
58
59   /**
60    * How long is the message? (number of bytes following the "struct
61    * MessageEntry", but not including the size of "struct
62    * MessageEntry" itself!)
63    */
64   size_t size;
65
66 };
67
68
69 /**
70  * Data kept per transport-connected peer.
71  */
72 struct Neighbour
73 {
74
75   /**
76    * Head of the batched message queue (already ordered, transmit
77    * starting with the head).
78    */
79   struct NeighbourMessageEntry *message_head;
80
81   /**
82    * Tail of the batched message queue (already ordered, append new
83    * messages to tail).
84    */
85   struct NeighbourMessageEntry *message_tail;
86
87   /**
88    * Handle for pending requests for transmission to this peer
89    * with the transport service.  NULL if no request is pending.
90    */
91   struct GNUNET_TRANSPORT_TransmitHandle *th;
92
93   /**
94    * Information about the key exchange with the other peer.
95    */
96   struct GSC_KeyExchangeInfo *kxinfo;
97
98   /**
99    * Identity of the other peer.
100    */
101   struct GNUNET_PeerIdentity peer;
102
103   /**
104    * ID of task used for re-trying plaintext scheduling.
105    */
106   struct GNUNET_SCHEDULER_Task *retry_plaintext_task;
107
108   /**
109    * How many messages are in the queue for this neighbour?
110    */
111   unsigned int queue_size;
112
113   /**
114    * #GNUNET_YES if this peer currently has excess bandwidth.
115    */
116   int has_excess_bandwidth;
117
118 };
119
120
121 /**
122  * Map of peer identities to 'struct Neighbour'.
123  */
124 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
125
126 /**
127  * Transport service.
128  */
129 static struct GNUNET_TRANSPORT_Handle *transport;
130
131
132 /**
133  * Find the entry for the given neighbour.
134  *
135  * @param peer identity of the neighbour
136  * @return NULL if we are not connected, otherwise the
137  *         neighbour's entry.
138  */
139 static struct Neighbour *
140 find_neighbour (const struct GNUNET_PeerIdentity *peer)
141 {
142   if (NULL == neighbours)
143     return NULL;
144   return GNUNET_CONTAINER_multipeermap_get (neighbours, peer);
145 }
146
147
148 /**
149  * Free the given entry for the neighbour.
150  *
151  * @param n neighbour to free
152  */
153 static void
154 free_neighbour (struct Neighbour *n)
155 {
156   struct NeighbourMessageEntry *m;
157
158   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
159               "Destroying neighbour entry for peer `%4s'\n",
160               GNUNET_i2s (&n->peer));
161   while (NULL != (m = n->message_head))
162   {
163     GNUNET_CONTAINER_DLL_remove (n->message_head,
164                                  n->message_tail,
165                                  m);
166     n->queue_size--;
167     GNUNET_free (m);
168   }
169   GNUNET_assert (0 == n->queue_size);
170   if (NULL != n->th)
171   {
172     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
173     n->th = NULL;
174   }
175   GNUNET_STATISTICS_update (GSC_stats,
176                             gettext_noop
177                             ("# sessions terminated by transport disconnect"),
178                             1, GNUNET_NO);
179   if (NULL != n->kxinfo)
180   {
181     GSC_KX_stop (n->kxinfo);
182     n->kxinfo = NULL;
183   }
184   if (NULL != n->retry_plaintext_task)
185   {
186     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
187     n->retry_plaintext_task = NULL;
188   }
189   GNUNET_assert (GNUNET_OK ==
190                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
191                                                        &n->peer, n));
192   GNUNET_STATISTICS_set (GSC_stats,
193                          gettext_noop ("# neighbour entries allocated"),
194                          GNUNET_CONTAINER_multipeermap_size (neighbours),
195                          GNUNET_NO);
196   GNUNET_free (n);
197 }
198
199
200 /**
201  * Check if we have encrypted messages for the specified neighbour
202  * pending, and if so, check with the transport about sending them
203  * out.
204  *
205  * @param n neighbour to check.
206  */
207 static void
208 process_queue (struct Neighbour *n);
209
210
211 /**
212  * Function called when the transport service is ready to receive a
213  * message for the respective peer
214  *
215  * @param cls neighbour to use message from
216  * @param size number of bytes we can transmit
217  * @param buf where to copy the message
218  * @return number of bytes transmitted
219  */
220 static size_t
221 transmit_ready (void *cls, size_t size, void *buf)
222 {
223   struct Neighbour *n = cls;
224   struct NeighbourMessageEntry *m;
225   size_t ret;
226   char *cbuf;
227
228   n->th = NULL;
229   m = n->message_head;
230   if (NULL == m)
231   {
232     GNUNET_break (0);
233     return 0;
234   }
235   GNUNET_CONTAINER_DLL_remove (n->message_head,
236                                n->message_tail,
237                                m);
238   n->queue_size--;
239   if (NULL == buf)
240   {
241     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
242                 "Transmission of message of type %u and size %u failed\n",
243                 (unsigned int)
244                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
245                 (unsigned int) m->size);
246     GNUNET_free (m);
247     process_queue (n);
248     return 0;
249   }
250   cbuf = buf;
251   GNUNET_assert (size >= m->size);
252   memcpy (cbuf, &m[1], m->size);
253   ret = m->size;
254   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
255               "Copied message of type %u and size %u into transport buffer for `%4s'\n",
256               (unsigned int)
257               ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
258               (unsigned int) ret, GNUNET_i2s (&n->peer));
259   GNUNET_free (m);
260   n->has_excess_bandwidth = GNUNET_NO;
261   process_queue (n);
262   GNUNET_STATISTICS_update (GSC_stats,
263                             gettext_noop
264                             ("# encrypted bytes given to transport"), ret,
265                             GNUNET_NO);
266   return ret;
267 }
268
269
270 /**
271  * Check if we have messages for the specified neighbour pending, and
272  * if so, check with the transport about sending them out.
273  *
274  * @param n neighbour to check.
275  */
276 static void
277 process_queue (struct Neighbour *n)
278 {
279   struct NeighbourMessageEntry *m;
280
281   if (NULL != n->th)
282     return;                     /* request already pending */
283   m = n->message_head;
284   if (NULL == m)
285   {
286     /* notify sessions that the queue is empty and more messages
287      * could thus be queued now */
288     GSC_SESSIONS_solicit (&n->peer);
289     return;
290   }
291   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
292               "Asking transport for transmission of %u bytes to `%4s' in next %s\n",
293               (unsigned int) m->size,
294               GNUNET_i2s (&n->peer),
295               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (m->deadline),
296                                                       GNUNET_NO));
297   n->th
298     = GNUNET_TRANSPORT_notify_transmit_ready (transport,
299                                               &n->peer,
300                                               m->size,
301                                               GNUNET_TIME_absolute_get_remaining (m->deadline),
302                                               &transmit_ready,
303                                               n);
304   if (NULL != n->th)
305     return;
306   /* message request too large or duplicate request */
307   GNUNET_break (0);
308   /* discard encrypted message */
309   GNUNET_CONTAINER_DLL_remove (n->message_head,
310                                n->message_tail,
311                                m);
312   n->queue_size--;
313   GNUNET_free (m);
314   process_queue (n);
315 }
316
317
318 /**
319  * Function called by transport to notify us that
320  * a peer connected to us (on the network level).
321  *
322  * @param cls closure
323  * @param peer the peer that connected
324  */
325 static void
326 handle_transport_notify_connect (void *cls,
327                                  const struct GNUNET_PeerIdentity *peer)
328 {
329   struct Neighbour *n;
330
331   if (0 == memcmp (peer,
332                    &GSC_my_identity,
333                    sizeof (struct GNUNET_PeerIdentity)))
334   {
335     GNUNET_break (0);
336     return;
337   }
338   n = find_neighbour (peer);
339   if (n != NULL)
340   {
341     /* duplicate connect notification!? */
342     GNUNET_break (0);
343     return;
344   }
345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
346               "Received connection from `%4s'.\n",
347               GNUNET_i2s (peer));
348   n = GNUNET_new (struct Neighbour);
349   n->peer = *peer;
350   GNUNET_assert (GNUNET_OK ==
351                  GNUNET_CONTAINER_multipeermap_put (neighbours,
352                                                     &n->peer, n,
353                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
354   GNUNET_STATISTICS_set (GSC_stats,
355                          gettext_noop ("# neighbour entries allocated"),
356                          GNUNET_CONTAINER_multipeermap_size (neighbours),
357                          GNUNET_NO);
358   n->kxinfo = GSC_KX_start (peer);
359 }
360
361
362 /**
363  * Function called by transport telling us that a peer
364  * disconnected.
365  *
366  * @param cls closure
367  * @param peer the peer that disconnected
368  */
369 static void
370 handle_transport_notify_disconnect (void *cls,
371                                     const struct GNUNET_PeerIdentity *peer)
372 {
373   struct Neighbour *n;
374
375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
376               "Peer `%4s' disconnected from us; received notification from transport.\n",
377               GNUNET_i2s (peer));
378   n = find_neighbour (peer);
379   if (NULL == n)
380   {
381     GNUNET_break (0);
382     return;
383   }
384   free_neighbour (n);
385 }
386
387
388 /**
389  * Function called by the transport for each received message.
390  *
391  * @param cls closure
392  * @param peer (claimed) identity of the other peer
393  * @param message the message
394  */
395 static void
396 handle_transport_receive (void *cls,
397                           const struct GNUNET_PeerIdentity *peer,
398                           const struct GNUNET_MessageHeader *message)
399 {
400   struct Neighbour *n;
401   uint16_t type;
402
403   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
404               "Received message of type %u from `%4s', demultiplexing.\n",
405               (unsigned int) ntohs (message->type), GNUNET_i2s (peer));
406   if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
407   {
408     GNUNET_break (0);
409     return;
410   }
411   n = find_neighbour (peer);
412   if (n == NULL)
413   {
414     /* received message from peer that is not connected!? */
415     GNUNET_break (0);
416     return;
417   }
418   type = ntohs (message->type);
419   switch (type)
420   {
421   case GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY:
422     GSC_KX_handle_ephemeral_key (n->kxinfo, message);
423     break;
424   case GNUNET_MESSAGE_TYPE_CORE_PING:
425     GSC_KX_handle_ping (n->kxinfo, message);
426     break;
427   case GNUNET_MESSAGE_TYPE_CORE_PONG:
428     GSC_KX_handle_pong (n->kxinfo, message);
429     break;
430   case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
431     GSC_KX_handle_encrypted_message (n->kxinfo, message);
432     break;
433   case GNUNET_MESSAGE_TYPE_DUMMY:
434     /*  Dummy messages for testing / benchmarking, just discard */
435     break;
436   default:
437     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
438                 _("Unsupported message of type %u (%u bytes) received from peer `%s'\n"),
439                 (unsigned int) type,
440                 (unsigned int) ntohs (message->size),
441                 GNUNET_i2s (peer));
442     return;
443   }
444 }
445
446
447 /**
448  * Transmit the given message to the given target.
449  *
450  * @param target peer that should receive the message (must be connected)
451  * @param msg message to transmit
452  * @param timeout by when should the transmission be done?
453  */
454 void
455 GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
456                          const struct GNUNET_MessageHeader *msg,
457                          struct GNUNET_TIME_Relative timeout)
458 {
459   struct NeighbourMessageEntry *me;
460   struct Neighbour *n;
461   size_t msize;
462
463   n = find_neighbour (target);
464   if (NULL == n)
465   {
466     GNUNET_break (0);
467     return;
468   }
469   msize = ntohs (msg->size);
470   me = GNUNET_malloc (sizeof (struct NeighbourMessageEntry) + msize);
471   me->deadline = GNUNET_TIME_relative_to_absolute (timeout);
472   me->size = msize;
473   memcpy (&me[1],
474           msg,
475           msize);
476   GNUNET_CONTAINER_DLL_insert_tail (n->message_head,
477                                     n->message_tail,
478                                     me);
479   n->queue_size++;
480   process_queue (n);
481 }
482
483
484 /**
485  * One of our neighbours has excess bandwidth,
486  * remember this.
487  *
488  * @param cls NULL
489  * @param pid identity of the peer with excess bandwidth
490  */
491 static void
492 handle_transport_notify_excess_bw (void *cls,
493                                    const struct GNUNET_PeerIdentity *pid)
494 {
495   struct Neighbour *n;
496
497   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498               "Peer %s has excess bandwidth available\n",
499               GNUNET_i2s (pid));
500   n = find_neighbour (pid);
501   if (NULL == n)
502   {
503     GNUNET_break (0);
504     return;
505   }
506   n->has_excess_bandwidth = GNUNET_YES;
507   GSC_SESSIONS_solicit (pid);
508 }
509
510
511 /**
512  * Check how many messages are queued for the given neighbour.
513  *
514  * @param target neighbour to check
515  * @return number of items in the message queue
516  */
517 unsigned int
518 GSC_NEIGHBOURS_get_queue_size (const struct GNUNET_PeerIdentity *target)
519 {
520   struct Neighbour *n;
521
522   n = find_neighbour (target);
523   if (NULL == n)
524   {
525     GNUNET_break (0);
526     return UINT_MAX;
527   }
528   return n->queue_size;
529 }
530
531
532
533 /**
534  * Check if the given neighbour has excess bandwidth available.
535  *
536  * @param target neighbour to check
537  * @return #GNUNET_YES if excess bandwidth is available, #GNUNET_NO if not
538  */
539 int
540 GSC_NEIGHBOURS_check_excess_bandwidth (const struct GNUNET_PeerIdentity *target)
541 {
542   struct Neighbour *n;
543
544   n = find_neighbour (target);
545   if (NULL == n)
546   {
547     GNUNET_break (0);
548     return GNUNET_SYSERR;
549   }
550   return n->has_excess_bandwidth;
551 }
552
553
554 /**
555  * Initialize neighbours subsystem.
556  */
557 int
558 GSC_NEIGHBOURS_init ()
559 {
560   neighbours = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO);
561   transport =
562       GNUNET_TRANSPORT_connect2 (GSC_cfg, &GSC_my_identity, NULL,
563                                  &handle_transport_receive,
564                                  &handle_transport_notify_connect,
565                                  &handle_transport_notify_disconnect,
566                                  &handle_transport_notify_excess_bw);
567   if (NULL == transport)
568   {
569     GNUNET_CONTAINER_multipeermap_destroy (neighbours);
570     neighbours = NULL;
571     return GNUNET_SYSERR;
572   }
573   return GNUNET_OK;
574 }
575
576
577 /**
578  * Wrapper around 'free_neighbour'.
579  *
580  * @param cls unused
581  * @param key peer identity
582  * @param value the `struct Neighbour` to free
583  * @return #GNUNET_OK (continue to iterate)
584  */
585 static int
586 free_neighbour_helper (void *cls,
587                        const struct GNUNET_PeerIdentity * key,
588                        void *value)
589 {
590   struct Neighbour *n = value;
591
592   /* transport should have 'disconnected' all neighbours... */
593   GNUNET_break (0);
594   free_neighbour (n);
595   return GNUNET_OK;
596 }
597
598
599 /**
600  * Shutdown neighbours subsystem.
601  */
602 void
603 GSC_NEIGHBOURS_done ()
604 {
605   if (NULL != transport)
606   {
607     GNUNET_TRANSPORT_disconnect (transport);
608     transport = NULL;
609   }
610   if (NULL != neighbours)
611   {
612     GNUNET_CONTAINER_multipeermap_iterate (neighbours,
613                                            &free_neighbour_helper,
614                                            NULL);
615     GNUNET_CONTAINER_multipeermap_destroy (neighbours);
616     neighbours = NULL;
617   }
618 }
619
620 /* end of gnunet-service-core_neighbours.c */