include element type in hash
[oweals/gnunet.git] / src / core / gnunet-service-core_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_neighbours.c
23  * @brief code for managing low-level 'plaintext' connections with transport (key exchange may or may not be done yet)
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet_transport_service.h"
30 #include "gnunet-service-core.h"
31 #include "gnunet-service-core_neighbours.h"
32 #include "gnunet-service-core_kx.h"
33 #include "gnunet-service-core_sessions.h"
34 #include "gnunet_constants.h"
35
36
37 /**
38  * Message ready for transmission via transport service.  This struct
39  * is followed by the actual content of the message.
40  */
41 struct NeighbourMessageEntry
42 {
43
44   /**
45    * We keep messages in a doubly linked list.
46    */
47   struct NeighbourMessageEntry *next;
48
49   /**
50    * We keep messages in a doubly linked list.
51    */
52   struct NeighbourMessageEntry *prev;
53
54   /**
55    * By when are we supposed to transmit this message?
56    */
57   struct GNUNET_TIME_Absolute deadline;
58
59   /**
60    * What time did we submit the request?
61    */
62   struct GNUNET_TIME_Absolute submission_time;
63
64   /**
65    * How long is the message? (number of bytes following the `struct
66    * MessageEntry`, but not including the size of `struct
67    * MessageEntry` itself!)
68    */
69   size_t size;
70
71 };
72
73
74 /**
75  * Data kept per transport-connected peer.
76  */
77 struct Neighbour
78 {
79
80   /**
81    * Head of the batched message queue (already ordered, transmit
82    * starting with the head).
83    */
84   struct NeighbourMessageEntry *message_head;
85
86   /**
87    * Tail of the batched message queue (already ordered, append new
88    * messages to tail).
89    */
90   struct NeighbourMessageEntry *message_tail;
91
92   /**
93    * Handle for pending requests for transmission to this peer
94    * with the transport service.  NULL if no request is pending.
95    */
96   struct GNUNET_TRANSPORT_TransmitHandle *th;
97
98   /**
99    * Information about the key exchange with the other peer.
100    */
101   struct GSC_KeyExchangeInfo *kxinfo;
102
103   /**
104    * Identity of the other peer.
105    */
106   struct GNUNET_PeerIdentity peer;
107
108   /**
109    * ID of task used for re-trying plaintext scheduling.
110    */
111   struct GNUNET_SCHEDULER_Task *retry_plaintext_task;
112
113   /**
114    * How many messages are in the queue for this neighbour?
115    */
116   unsigned int queue_size;
117
118   /**
119    * #GNUNET_YES if this peer currently has excess bandwidth.
120    */
121   int has_excess_bandwidth;
122
123 };
124
125
126 /**
127  * Map of peer identities to 'struct Neighbour'.
128  */
129 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
130
131 /**
132  * Transport service.
133  */
134 static struct GNUNET_TRANSPORT_Handle *transport;
135
136
137 /**
138  * Find the entry for the given neighbour.
139  *
140  * @param peer identity of the neighbour
141  * @return NULL if we are not connected, otherwise the
142  *         neighbour's entry.
143  */
144 static struct Neighbour *
145 find_neighbour (const struct GNUNET_PeerIdentity *peer)
146 {
147   if (NULL == neighbours)
148     return NULL;
149   return GNUNET_CONTAINER_multipeermap_get (neighbours,
150                                             peer);
151 }
152
153
154 /**
155  * Free the given entry for the neighbour.
156  *
157  * @param n neighbour to free
158  */
159 static void
160 free_neighbour (struct Neighbour *n)
161 {
162   struct NeighbourMessageEntry *m;
163
164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
165               "Destroying neighbour entry for peer `%s'\n",
166               GNUNET_i2s (&n->peer));
167   while (NULL != (m = n->message_head))
168   {
169     GNUNET_CONTAINER_DLL_remove (n->message_head,
170                                  n->message_tail,
171                                  m);
172     n->queue_size--;
173     GNUNET_free (m);
174   }
175   GNUNET_assert (0 == n->queue_size);
176   if (NULL != n->th)
177   {
178     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
179     n->th = NULL;
180   }
181   GNUNET_STATISTICS_update (GSC_stats,
182                             gettext_noop
183                             ("# sessions terminated by transport disconnect"),
184                             1, GNUNET_NO);
185   if (NULL != n->kxinfo)
186   {
187     GSC_KX_stop (n->kxinfo);
188     n->kxinfo = NULL;
189   }
190   if (NULL != n->retry_plaintext_task)
191   {
192     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
193     n->retry_plaintext_task = NULL;
194   }
195   GNUNET_assert (GNUNET_OK ==
196                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
197                                                        &n->peer, n));
198   GNUNET_STATISTICS_set (GSC_stats,
199                          gettext_noop ("# neighbour entries allocated"),
200                          GNUNET_CONTAINER_multipeermap_size (neighbours),
201                          GNUNET_NO);
202   GNUNET_free (n);
203 }
204
205
206 /**
207  * Check if we have encrypted messages for the specified neighbour
208  * pending, and if so, check with the transport about sending them
209  * out.
210  *
211  * @param n neighbour to check.
212  */
213 static void
214 process_queue (struct Neighbour *n);
215
216
217 /**
218  * Function called when the transport service is ready to receive a
219  * message for the respective peer
220  *
221  * @param cls neighbour to use message from
222  * @param size number of bytes we can transmit
223  * @param buf where to copy the message
224  * @return number of bytes transmitted
225  */
226 static size_t
227 transmit_ready (void *cls,
228                 size_t size,
229                 void *buf)
230 {
231   struct Neighbour *n = cls;
232   struct NeighbourMessageEntry *m;
233   size_t ret;
234   char *cbuf;
235   struct GNUNET_TIME_Relative delay;
236
237   n->th = NULL;
238   m = n->message_head;
239   if (NULL == m)
240   {
241     GNUNET_break (0);
242     return 0;
243   }
244   GNUNET_CONTAINER_DLL_remove (n->message_head,
245                                n->message_tail,
246                                m);
247   n->queue_size--;
248   if (NULL == buf)
249   {
250     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
251                 "Transmission of message of type %u and size %u failed\n",
252                 (unsigned int)
253                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
254                 (unsigned int) m->size);
255     GNUNET_free (m);
256     process_queue (n);
257     return 0;
258   }
259   delay = GNUNET_TIME_absolute_get_duration (m->submission_time);
260   cbuf = buf;
261   GNUNET_assert (size >= m->size);
262   memcpy (cbuf,
263           &m[1],
264           m->size);
265   ret = m->size;
266   if (delay.rel_value_us > GNUNET_TIME_UNIT_SECONDS.rel_value_us)
267   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
268               "Copied message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
269               (unsigned int)
270               ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
271               (unsigned int) ret,
272               GNUNET_i2s (&n->peer),
273               GNUNET_STRINGS_relative_time_to_string (delay,
274                                                       GNUNET_YES));
275   else
276     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
277                 "Copied message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
278                 (unsigned int)
279                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
280                 (unsigned int) ret,
281                 GNUNET_i2s (&n->peer),
282                 GNUNET_STRINGS_relative_time_to_string (delay,
283                                                         GNUNET_YES));
284   GNUNET_free (m);
285   n->has_excess_bandwidth = GNUNET_NO;
286   process_queue (n);
287   GNUNET_STATISTICS_update (GSC_stats,
288                             gettext_noop
289                             ("# encrypted bytes given to transport"), ret,
290                             GNUNET_NO);
291   return ret;
292 }
293
294
295 /**
296  * Check if we have messages for the specified neighbour pending, and
297  * if so, check with the transport about sending them out.
298  *
299  * @param n neighbour to check.
300  */
301 static void
302 process_queue (struct Neighbour *n)
303 {
304   struct NeighbourMessageEntry *m;
305
306   if (NULL != n->th)
307     return;                     /* request already pending */
308   m = n->message_head;
309   if (NULL == m)
310   {
311     /* notify sessions that the queue is empty and more messages
312      * could thus be queued now */
313     GSC_SESSIONS_solicit (&n->peer);
314     return;
315   }
316   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
317               "Asking transport for transmission of %u bytes to `%s' in next %s\n",
318               (unsigned int) m->size,
319               GNUNET_i2s (&n->peer),
320               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (m->deadline),
321                                                       GNUNET_NO));
322   m->submission_time = GNUNET_TIME_absolute_get ();
323   n->th
324     = GNUNET_TRANSPORT_notify_transmit_ready (transport,
325                                               &n->peer,
326                                               m->size,
327                                               GNUNET_TIME_absolute_get_remaining (m->deadline),
328                                               &transmit_ready,
329                                               n);
330   if (NULL != n->th)
331     return;
332   /* message request too large or duplicate request */
333   GNUNET_break (0);
334   /* discard encrypted message */
335   GNUNET_CONTAINER_DLL_remove (n->message_head,
336                                n->message_tail,
337                                m);
338   n->queue_size--;
339   GNUNET_free (m);
340   process_queue (n);
341 }
342
343
344 /**
345  * Function called by transport to notify us that
346  * a peer connected to us (on the network level).
347  *
348  * @param cls closure
349  * @param peer the peer that connected
350  */
351 static void
352 handle_transport_notify_connect (void *cls,
353                                  const struct GNUNET_PeerIdentity *peer)
354 {
355   struct Neighbour *n;
356
357   if (0 == memcmp (peer,
358                    &GSC_my_identity,
359                    sizeof (struct GNUNET_PeerIdentity)))
360   {
361     GNUNET_break (0);
362     return;
363   }
364   n = find_neighbour (peer);
365   if (NULL != n)
366   {
367     /* duplicate connect notification!? */
368     GNUNET_break (0);
369     return;
370   }
371   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
372               "Received connection from `%s'.\n",
373               GNUNET_i2s (peer));
374   n = GNUNET_new (struct Neighbour);
375   n->peer = *peer;
376   GNUNET_assert (GNUNET_OK ==
377                  GNUNET_CONTAINER_multipeermap_put (neighbours,
378                                                     &n->peer,
379                                                     n,
380                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
381   GNUNET_STATISTICS_set (GSC_stats,
382                          gettext_noop ("# neighbour entries allocated"),
383                          GNUNET_CONTAINER_multipeermap_size (neighbours),
384                          GNUNET_NO);
385   n->kxinfo = GSC_KX_start (peer);
386 }
387
388
389 /**
390  * Function called by transport telling us that a peer
391  * disconnected.
392  *
393  * @param cls closure
394  * @param peer the peer that disconnected
395  */
396 static void
397 handle_transport_notify_disconnect (void *cls,
398                                     const struct GNUNET_PeerIdentity *peer)
399 {
400   struct Neighbour *n;
401
402   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403               "Peer `%s' disconnected from us; received notification from transport.\n",
404               GNUNET_i2s (peer));
405   n = find_neighbour (peer);
406   if (NULL == n)
407   {
408     GNUNET_break (0);
409     return;
410   }
411   free_neighbour (n);
412 }
413
414
415 /**
416  * Function called by the transport for each received message.
417  *
418  * @param cls closure
419  * @param peer (claimed) identity of the other peer
420  * @param message the message
421  */
422 static void
423 handle_transport_receive (void *cls,
424                           const struct GNUNET_PeerIdentity *peer,
425                           const struct GNUNET_MessageHeader *message)
426 {
427   struct Neighbour *n;
428   uint16_t type;
429
430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
431               "Received message of type %u from `%s', demultiplexing.\n",
432               (unsigned int) ntohs (message->type),
433               GNUNET_i2s (peer));
434   if (0 == memcmp (peer,
435                    &GSC_my_identity,
436                    sizeof (struct GNUNET_PeerIdentity)))
437   {
438     GNUNET_break (0);
439     return;
440   }
441   n = find_neighbour (peer);
442   if (NULL == n)
443   {
444     /* received message from peer that is not connected!? */
445     GNUNET_break (0);
446     return;
447   }
448   type = ntohs (message->type);
449   switch (type)
450   {
451   case GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY:
452     GSC_KX_handle_ephemeral_key (n->kxinfo, message);
453     break;
454   case GNUNET_MESSAGE_TYPE_CORE_PING:
455     GSC_KX_handle_ping (n->kxinfo, message);
456     break;
457   case GNUNET_MESSAGE_TYPE_CORE_PONG:
458     GSC_KX_handle_pong (n->kxinfo, message);
459     break;
460   case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
461     GSC_KX_handle_encrypted_message (n->kxinfo, message);
462     break;
463   case GNUNET_MESSAGE_TYPE_DUMMY:
464     /*  Dummy messages for testing / benchmarking, just discard */
465     break;
466   default:
467     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
468                 _("Unsupported message of type %u (%u bytes) received from peer `%s'\n"),
469                 (unsigned int) type,
470                 (unsigned int) ntohs (message->size),
471                 GNUNET_i2s (peer));
472     return;
473   }
474 }
475
476
477 /**
478  * Transmit the given message to the given target.
479  *
480  * @param target peer that should receive the message (must be connected)
481  * @param msg message to transmit
482  * @param timeout by when should the transmission be done?
483  */
484 void
485 GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
486                          const struct GNUNET_MessageHeader *msg,
487                          struct GNUNET_TIME_Relative timeout)
488 {
489   struct NeighbourMessageEntry *me;
490   struct Neighbour *n;
491   size_t msize;
492
493   n = find_neighbour (target);
494   if (NULL == n)
495   {
496     GNUNET_break (0);
497     return;
498   }
499   msize = ntohs (msg->size);
500   me = GNUNET_malloc (sizeof (struct NeighbourMessageEntry) + msize);
501   me->deadline = GNUNET_TIME_relative_to_absolute (timeout);
502   me->size = msize;
503   memcpy (&me[1],
504           msg,
505           msize);
506   GNUNET_CONTAINER_DLL_insert_tail (n->message_head,
507                                     n->message_tail,
508                                     me);
509   n->queue_size++;
510   process_queue (n);
511 }
512
513
514 /**
515  * One of our neighbours has excess bandwidth,
516  * remember this.
517  *
518  * @param cls NULL
519  * @param pid identity of the peer with excess bandwidth
520  */
521 static void
522 handle_transport_notify_excess_bw (void *cls,
523                                    const struct GNUNET_PeerIdentity *pid)
524 {
525   struct Neighbour *n;
526
527   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
528               "Peer %s has excess bandwidth available\n",
529               GNUNET_i2s (pid));
530   n = find_neighbour (pid);
531   if (NULL == n)
532   {
533     GNUNET_break (0);
534     return;
535   }
536   n->has_excess_bandwidth = GNUNET_YES;
537   GSC_SESSIONS_solicit (pid);
538 }
539
540
541 /**
542  * Check how many messages are queued for the given neighbour.
543  *
544  * @param target neighbour to check
545  * @return number of items in the message queue
546  */
547 unsigned int
548 GSC_NEIGHBOURS_get_queue_size (const struct GNUNET_PeerIdentity *target)
549 {
550   struct Neighbour *n;
551
552   n = find_neighbour (target);
553   if (NULL == n)
554   {
555     GNUNET_break (0);
556     return UINT_MAX;
557   }
558   return n->queue_size;
559 }
560
561
562 /**
563  * Check if the given neighbour has excess bandwidth available.
564  *
565  * @param target neighbour to check
566  * @return #GNUNET_YES if excess bandwidth is available, #GNUNET_NO if not
567  */
568 int
569 GSC_NEIGHBOURS_check_excess_bandwidth (const struct GNUNET_PeerIdentity *target)
570 {
571   struct Neighbour *n;
572
573   n = find_neighbour (target);
574   if (NULL == n)
575   {
576     GNUNET_break (0);
577     return GNUNET_SYSERR;
578   }
579   return n->has_excess_bandwidth;
580 }
581
582
583 /**
584  * Initialize neighbours subsystem.
585  */
586 int
587 GSC_NEIGHBOURS_init ()
588 {
589   neighbours = GNUNET_CONTAINER_multipeermap_create (128,
590                                                      GNUNET_YES);
591   transport =
592       GNUNET_TRANSPORT_connect2 (GSC_cfg,
593                                  &GSC_my_identity,
594                                  NULL,
595                                  &handle_transport_receive,
596                                  &handle_transport_notify_connect,
597                                  &handle_transport_notify_disconnect,
598                                  &handle_transport_notify_excess_bw);
599   if (NULL == transport)
600   {
601     GNUNET_CONTAINER_multipeermap_destroy (neighbours);
602     neighbours = NULL;
603     return GNUNET_SYSERR;
604   }
605   return GNUNET_OK;
606 }
607
608
609 /**
610  * Wrapper around #free_neighbour().
611  *
612  * @param cls unused
613  * @param key peer identity
614  * @param value the `struct Neighbour` to free
615  * @return #GNUNET_OK (continue to iterate)
616  */
617 static int
618 free_neighbour_helper (void *cls,
619                        const struct GNUNET_PeerIdentity * key,
620                        void *value)
621 {
622   struct Neighbour *n = value;
623
624   /* transport should have 'disconnected' all neighbours... */
625   GNUNET_break (0);
626   free_neighbour (n);
627   return GNUNET_OK;
628 }
629
630
631 /**
632  * Shutdown neighbours subsystem.
633  */
634 void
635 GSC_NEIGHBOURS_done ()
636 {
637   if (NULL != transport)
638   {
639     GNUNET_TRANSPORT_disconnect (transport);
640     transport = NULL;
641   }
642   if (NULL != neighbours)
643   {
644     GNUNET_CONTAINER_multipeermap_iterate (neighbours,
645                                            &free_neighbour_helper,
646                                            NULL);
647     GNUNET_CONTAINER_multipeermap_destroy (neighbours);
648     neighbours = NULL;
649   }
650 }
651
652 /* end of gnunet-service-core_neighbours.c */