convert fs publish to MQ
[oweals/gnunet.git] / src / core / gnunet-service-core_neighbours.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009, 2010, 2011 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_neighbours.c
23  * @brief code for managing low-level 'plaintext' connections with transport (key exchange may or may not be done yet)
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet_transport_service.h"
30 #include "gnunet-service-core.h"
31 #include "gnunet-service-core_neighbours.h"
32 #include "gnunet-service-core_kx.h"
33 #include "gnunet-service-core_sessions.h"
34 #include "gnunet_constants.h"
35
36
37 /**
38  * Message ready for transmission via transport service.  This struct
39  * is followed by the actual content of the message.
40  */
41 struct NeighbourMessageEntry
42 {
43
44   /**
45    * We keep messages in a doubly linked list.
46    */
47   struct NeighbourMessageEntry *next;
48
49   /**
50    * We keep messages in a doubly linked list.
51    */
52   struct NeighbourMessageEntry *prev;
53
54   /**
55    * By when are we supposed to transmit this message?
56    */
57   struct GNUNET_TIME_Absolute deadline;
58
59   /**
60    * What time did we submit the request?
61    */
62   struct GNUNET_TIME_Absolute submission_time;
63
64   /**
65    * How long is the message? (number of bytes following the `struct
66    * MessageEntry`, but not including the size of `struct
67    * MessageEntry` itself!)
68    */
69   size_t size;
70
71 };
72
73
74 /**
75  * Data kept per transport-connected peer.
76  */
77 struct Neighbour
78 {
79
80   /**
81    * Head of the batched message queue (already ordered, transmit
82    * starting with the head).
83    */
84   struct NeighbourMessageEntry *message_head;
85
86   /**
87    * Tail of the batched message queue (already ordered, append new
88    * messages to tail).
89    */
90   struct NeighbourMessageEntry *message_tail;
91
92   /**
93    * Handle for pending requests for transmission to this peer
94    * with the transport service.  NULL if no request is pending.
95    */
96   struct GNUNET_TRANSPORT_TransmitHandle *th;
97
98   /**
99    * Information about the key exchange with the other peer.
100    */
101   struct GSC_KeyExchangeInfo *kxinfo;
102
103   /**
104    * Identity of the other peer.
105    */
106   struct GNUNET_PeerIdentity peer;
107
108   /**
109    * ID of task used for re-trying plaintext scheduling.
110    */
111   struct GNUNET_SCHEDULER_Task *retry_plaintext_task;
112
113   /**
114    * How many messages are in the queue for this neighbour?
115    */
116   unsigned int queue_size;
117
118   /**
119    * #GNUNET_YES if this peer currently has excess bandwidth.
120    */
121   int has_excess_bandwidth;
122
123 };
124
125
126 /**
127  * Map of peer identities to `struct Neighbour`.
128  */
129 static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
130
131 /**
132  * Transport service.
133  */
134 static struct GNUNET_TRANSPORT_Handle *transport;
135
136
137 /**
138  * Find the entry for the given neighbour.
139  *
140  * @param peer identity of the neighbour
141  * @return NULL if we are not connected, otherwise the
142  *         neighbour's entry.
143  */
144 static struct Neighbour *
145 find_neighbour (const struct GNUNET_PeerIdentity *peer)
146 {
147   if (NULL == neighbours)
148     return NULL;
149   return GNUNET_CONTAINER_multipeermap_get (neighbours,
150                                             peer);
151 }
152
153
154 /**
155  * Free the given entry for the neighbour.
156  *
157  * @param n neighbour to free
158  */
159 static void
160 free_neighbour (struct Neighbour *n)
161 {
162   struct NeighbourMessageEntry *m;
163
164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
165               "Destroying neighbour entry for peer `%s'\n",
166               GNUNET_i2s (&n->peer));
167   while (NULL != (m = n->message_head))
168   {
169     GNUNET_CONTAINER_DLL_remove (n->message_head,
170                                  n->message_tail,
171                                  m);
172     n->queue_size--;
173     GNUNET_free (m);
174   }
175   GNUNET_assert (0 == n->queue_size);
176   if (NULL != n->th)
177   {
178     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
179     n->th = NULL;
180   }
181   GNUNET_STATISTICS_update (GSC_stats,
182                             gettext_noop
183                             ("# sessions terminated by transport disconnect"),
184                             1, GNUNET_NO);
185   if (NULL != n->kxinfo)
186   {
187     GSC_KX_stop (n->kxinfo);
188     n->kxinfo = NULL;
189   }
190   if (NULL != n->retry_plaintext_task)
191   {
192     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
193     n->retry_plaintext_task = NULL;
194   }
195   GNUNET_assert (GNUNET_OK ==
196                  GNUNET_CONTAINER_multipeermap_remove (neighbours,
197                                                        &n->peer, n));
198   GNUNET_STATISTICS_set (GSC_stats,
199                          gettext_noop ("# neighbour entries allocated"),
200                          GNUNET_CONTAINER_multipeermap_size (neighbours),
201                          GNUNET_NO);
202   GNUNET_free (n);
203 }
204
205
206 /**
207  * Check if we have encrypted messages for the specified neighbour
208  * pending, and if so, check with the transport about sending them
209  * out.
210  *
211  * @param n neighbour to check.
212  */
213 static void
214 process_queue (struct Neighbour *n);
215
216
217 /**
218  * Function called when the transport service is ready to receive a
219  * message for the respective peer
220  *
221  * @param cls neighbour to use message from
222  * @param size number of bytes we can transmit
223  * @param buf where to copy the message
224  * @return number of bytes transmitted
225  */
226 static size_t
227 transmit_ready (void *cls,
228                 size_t size,
229                 void *buf)
230 {
231   struct Neighbour *n = cls;
232   struct NeighbourMessageEntry *m;
233   size_t ret;
234   char *cbuf;
235   struct GNUNET_TIME_Relative delay;
236   struct GNUNET_TIME_Relative overdue;
237
238   n->th = NULL;
239   m = n->message_head;
240   if (NULL == m)
241   {
242     GNUNET_break (0);
243     return 0;
244   }
245   GNUNET_CONTAINER_DLL_remove (n->message_head,
246                                n->message_tail,
247                                m);
248   n->queue_size--;
249   if (NULL == buf)
250   {
251     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
252                 "Transmission of message of type %u and size %u failed\n",
253                 (unsigned int)
254                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
255                 (unsigned int) m->size);
256     GNUNET_free (m);
257     process_queue (n);
258     return 0;
259   }
260   delay = GNUNET_TIME_absolute_get_duration (m->submission_time);
261   overdue = GNUNET_TIME_absolute_get_duration (m->deadline);
262   cbuf = buf;
263   GNUNET_assert (size >= m->size);
264   memcpy (cbuf,
265           &m[1],
266           m->size);
267   ret = m->size;
268   if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
269     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
270                 "Copied overdue message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
271                 (unsigned int)
272                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
273                 (unsigned int) ret,
274                 GNUNET_i2s (&n->peer),
275                 GNUNET_STRINGS_relative_time_to_string (delay,
276                                                         GNUNET_YES));
277   else
278     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
279                 "Copied message of type %u and size %u into transport buffer for `%s' with delay of %s\n",
280                 (unsigned int)
281                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
282                 (unsigned int) ret,
283                 GNUNET_i2s (&n->peer),
284                 GNUNET_STRINGS_relative_time_to_string (delay,
285                                                         GNUNET_YES));
286   GNUNET_free (m);
287   n->has_excess_bandwidth = GNUNET_NO;
288   process_queue (n);
289   GNUNET_STATISTICS_update (GSC_stats,
290                             gettext_noop
291                             ("# encrypted bytes given to transport"), ret,
292                             GNUNET_NO);
293   return ret;
294 }
295
296
297 /**
298  * Check if we have messages for the specified neighbour pending, and
299  * if so, check with the transport about sending them out.
300  *
301  * @param n neighbour to check.
302  */
303 static void
304 process_queue (struct Neighbour *n)
305 {
306   struct NeighbourMessageEntry *m;
307
308   if (NULL != n->th)
309     return;                     /* request already pending */
310   m = n->message_head;
311   if (NULL == m)
312   {
313     /* notify sessions that the queue is empty and more messages
314      * could thus be queued now */
315     GSC_SESSIONS_solicit (&n->peer);
316     return;
317   }
318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
319               "Asking transport for transmission of %u bytes to `%s' in next %s\n",
320               (unsigned int) m->size,
321               GNUNET_i2s (&n->peer),
322               GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (m->deadline),
323                                                       GNUNET_NO));
324   m->submission_time = GNUNET_TIME_absolute_get ();
325   n->th
326     = GNUNET_TRANSPORT_notify_transmit_ready (transport,
327                                               &n->peer,
328                                               m->size,
329                                               GNUNET_TIME_absolute_get_remaining (m->deadline),
330                                               &transmit_ready,
331                                               n);
332   if (NULL != n->th)
333     return;
334   /* message request too large or duplicate request */
335   GNUNET_break (0);
336   /* discard encrypted message */
337   GNUNET_CONTAINER_DLL_remove (n->message_head,
338                                n->message_tail,
339                                m);
340   n->queue_size--;
341   GNUNET_free (m);
342   process_queue (n);
343 }
344
345
346 /**
347  * Function called by transport to notify us that
348  * a peer connected to us (on the network level).
349  *
350  * @param cls closure
351  * @param peer the peer that connected
352  */
353 static void
354 handle_transport_notify_connect (void *cls,
355                                  const struct GNUNET_PeerIdentity *peer)
356 {
357   struct Neighbour *n;
358
359   if (0 == memcmp (peer,
360                    &GSC_my_identity,
361                    sizeof (struct GNUNET_PeerIdentity)))
362   {
363     GNUNET_break (0);
364     return;
365   }
366   n = find_neighbour (peer);
367   if (NULL != n)
368   {
369     /* duplicate connect notification!? */
370     GNUNET_break (0);
371     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
372                 "Peer %s exists already\n",
373                 GNUNET_i2s (peer));
374     return;
375   }
376   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
377               "Received connection from `%s'.\n",
378               GNUNET_i2s (peer));
379   n = GNUNET_new (struct Neighbour);
380   n->peer = *peer;
381   GNUNET_assert (GNUNET_OK ==
382                  GNUNET_CONTAINER_multipeermap_put (neighbours,
383                                                     &n->peer,
384                                                     n,
385                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
386   GNUNET_STATISTICS_set (GSC_stats,
387                          gettext_noop ("# neighbour entries allocated"),
388                          GNUNET_CONTAINER_multipeermap_size (neighbours),
389                          GNUNET_NO);
390   n->kxinfo = GSC_KX_start (peer);
391 }
392
393
394 /**
395  * Function called by transport telling us that a peer
396  * disconnected.
397  *
398  * @param cls closure
399  * @param peer the peer that disconnected
400  */
401 static void
402 handle_transport_notify_disconnect (void *cls,
403                                     const struct GNUNET_PeerIdentity *peer)
404 {
405   struct Neighbour *n;
406
407   if (0 == memcmp (peer,
408                    &GSC_my_identity,
409                    sizeof (struct GNUNET_PeerIdentity)))
410   {
411     GNUNET_break (0);
412     return;
413   }
414   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
415               "Peer `%s' disconnected from us; received notification from transport.\n",
416               GNUNET_i2s (peer));
417   n = find_neighbour (peer);
418   if (NULL == n)
419   {
420     GNUNET_break (0);
421     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
422                 "Peer %s not found\n",
423                 GNUNET_i2s (peer));
424     return;
425   }
426   free_neighbour (n);
427 }
428
429
430 /**
431  * Function called by the transport for each received message.
432  *
433  * @param cls closure
434  * @param peer (claimed) identity of the other peer
435  * @param message the message
436  */
437 static void
438 handle_transport_receive (void *cls,
439                           const struct GNUNET_PeerIdentity *peer,
440                           const struct GNUNET_MessageHeader *message)
441 {
442   struct Neighbour *n;
443   uint16_t type;
444
445   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
446               "Received message of type %u from `%s', demultiplexing.\n",
447               (unsigned int) ntohs (message->type),
448               GNUNET_i2s (peer));
449   if (0 == memcmp (peer,
450                    &GSC_my_identity,
451                    sizeof (struct GNUNET_PeerIdentity)))
452   {
453     GNUNET_break (0);
454     return;
455   }
456   n = find_neighbour (peer);
457   if (NULL == n)
458   {
459     /* received message from peer that is not connected!? */
460     GNUNET_break (0);
461     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
462                 "Peer %s not found\n",
463                 GNUNET_i2s (peer));
464     return;
465   }
466   type = ntohs (message->type);
467   switch (type)
468   {
469   case GNUNET_MESSAGE_TYPE_CORE_EPHEMERAL_KEY:
470     GSC_KX_handle_ephemeral_key (n->kxinfo, message);
471     break;
472   case GNUNET_MESSAGE_TYPE_CORE_PING:
473     GSC_KX_handle_ping (n->kxinfo, message);
474     break;
475   case GNUNET_MESSAGE_TYPE_CORE_PONG:
476     GSC_KX_handle_pong (n->kxinfo, message);
477     break;
478   case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
479     GSC_KX_handle_encrypted_message (n->kxinfo, message);
480     break;
481   case GNUNET_MESSAGE_TYPE_DUMMY:
482     /*  Dummy messages for testing / benchmarking, just discard */
483     break;
484   default:
485     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
486                 _("Unsupported message of type %u (%u bytes) received from peer `%s'\n"),
487                 (unsigned int) type,
488                 (unsigned int) ntohs (message->size),
489                 GNUNET_i2s (peer));
490     return;
491   }
492 }
493
494
495 /**
496  * Transmit the given message to the given target.
497  *
498  * @param target peer that should receive the message (must be connected)
499  * @param msg message to transmit
500  * @param timeout by when should the transmission be done?
501  */
502 void
503 GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
504                          const struct GNUNET_MessageHeader *msg,
505                          struct GNUNET_TIME_Relative timeout)
506 {
507   struct NeighbourMessageEntry *me;
508   struct Neighbour *n;
509   size_t msize;
510
511   n = find_neighbour (target);
512   if (NULL == n)
513   {
514     GNUNET_break (0);
515     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
516                 "Peer %s not found\n",
517                 GNUNET_i2s (target));
518     return;
519   }
520   msize = ntohs (msg->size);
521   me = GNUNET_malloc (sizeof (struct NeighbourMessageEntry) + msize);
522   me->deadline = GNUNET_TIME_relative_to_absolute (timeout);
523   me->size = msize;
524   memcpy (&me[1],
525           msg,
526           msize);
527   GNUNET_CONTAINER_DLL_insert_tail (n->message_head,
528                                     n->message_tail,
529                                     me);
530   n->queue_size++;
531   process_queue (n);
532 }
533
534
535 /**
536  * One of our neighbours has excess bandwidth, remember this.
537  *
538  * @param cls NULL
539  * @param pid identity of the peer with excess bandwidth
540  */
541 static void
542 handle_transport_notify_excess_bw (void *cls,
543                                    const struct GNUNET_PeerIdentity *pid)
544 {
545   struct Neighbour *n;
546
547   n = find_neighbour (pid);
548   if (NULL == n)
549   {
550     GNUNET_break (0);
551     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
552                 "Peer %s not found\n",
553                 GNUNET_i2s (pid));
554     return;
555   }
556   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
557               "Peer %s has excess bandwidth available\n",
558               GNUNET_i2s (pid));
559   n->has_excess_bandwidth = GNUNET_YES;
560   GSC_SESSIONS_solicit (pid);
561 }
562
563
564 /**
565  * Check how many messages are queued for the given neighbour.
566  *
567  * @param target neighbour to check
568  * @return number of items in the message queue
569  */
570 unsigned int
571 GSC_NEIGHBOURS_get_queue_size (const struct GNUNET_PeerIdentity *target)
572 {
573   struct Neighbour *n;
574
575   n = find_neighbour (target);
576   if (NULL == n)
577   {
578     GNUNET_break (0);
579     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
580                 "Peer %s not found\n",
581                 GNUNET_i2s (target));
582     return UINT_MAX;
583   }
584   return n->queue_size;
585 }
586
587
588 /**
589  * Check if the given neighbour has excess bandwidth available.
590  *
591  * @param target neighbour to check
592  * @return #GNUNET_YES if excess bandwidth is available, #GNUNET_NO if not
593  */
594 int
595 GSC_NEIGHBOURS_check_excess_bandwidth (const struct GNUNET_PeerIdentity *target)
596 {
597   struct Neighbour *n;
598
599   n = find_neighbour (target);
600   if (NULL == n)
601   {
602     GNUNET_break (0);
603     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
604                 "Peer %s not found\n",
605                 GNUNET_i2s (target));
606     return GNUNET_SYSERR;
607   }
608   return n->has_excess_bandwidth;
609 }
610
611
612 /**
613  * Initialize neighbours subsystem.
614  */
615 int
616 GSC_NEIGHBOURS_init ()
617 {
618   neighbours = GNUNET_CONTAINER_multipeermap_create (128,
619                                                      GNUNET_YES);
620   transport =
621       GNUNET_TRANSPORT_connect2 (GSC_cfg,
622                                  &GSC_my_identity,
623                                  NULL,
624                                  &handle_transport_receive,
625                                  &handle_transport_notify_connect,
626                                  &handle_transport_notify_disconnect,
627                                  &handle_transport_notify_excess_bw);
628   if (NULL == transport)
629   {
630     GNUNET_CONTAINER_multipeermap_destroy (neighbours);
631     neighbours = NULL;
632     return GNUNET_SYSERR;
633   }
634   return GNUNET_OK;
635 }
636
637
638 /**
639  * Wrapper around #free_neighbour().
640  *
641  * @param cls unused
642  * @param key peer identity
643  * @param value the `struct Neighbour` to free
644  * @return #GNUNET_OK (continue to iterate)
645  */
646 static int
647 free_neighbour_helper (void *cls,
648                        const struct GNUNET_PeerIdentity * key,
649                        void *value)
650 {
651   struct Neighbour *n = value;
652
653   /* transport should have 'disconnected' all neighbours... */
654   GNUNET_break (0);
655   free_neighbour (n);
656   return GNUNET_OK;
657 }
658
659
660 /**
661  * Shutdown neighbours subsystem.
662  */
663 void
664 GSC_NEIGHBOURS_done ()
665 {
666   if (NULL != transport)
667   {
668     GNUNET_TRANSPORT_disconnect (transport);
669     transport = NULL;
670   }
671   if (NULL != neighbours)
672   {
673     GNUNET_CONTAINER_multipeermap_iterate (neighbours,
674                                            &free_neighbour_helper,
675                                            NULL);
676     GNUNET_CONTAINER_multipeermap_destroy (neighbours);
677     neighbours = NULL;
678   }
679 }
680
681 /* end of gnunet-service-core_neighbours.c */