774c173dca6d1056f5116d43d545b7812f4ecc17
[oweals/gnunet.git] / src / core / gnunet-service-core_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_neighbours.c
23  * @brief code for managing low-level 'plaintext' connections with transport (key exchange may or may not be done yet)
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet_transport_service.h"
30 #include "gnunet-service-core.h"
31 #include "gnunet-service-core_neighbours.h"
32 #include "gnunet-service-core_kx.h"
33 #include "gnunet-service-core_sessions.h"
34 #include "gnunet_constants.h"
35
36
37 /**
38  * Receive and send buffer windows grow over time.  For
39  * how long can 'unused' bandwidth accumulate before we
40  * need to cap it?  (specified in seconds).
41  */
42 #define MAX_WINDOW_TIME_S (5 * 60)
43
44
45 /**
46  * Message ready for transmission via transport service.  This struct
47  * is followed by the actual content of the message.
48  */
49 struct NeighbourMessageEntry
50 {
51
52   /**
53    * We keep messages in a doubly linked list.
54    */
55   struct NeighbourMessageEntry *next;
56
57   /**
58    * We keep messages in a doubly linked list.
59    */
60   struct NeighbourMessageEntry *prev;
61
62   /**
63    * By when are we supposed to transmit this message?
64    */
65   struct GNUNET_TIME_Absolute deadline;
66
67   /**
68    * How long is the message? (number of bytes following the "struct
69    * MessageEntry", but not including the size of "struct
70    * MessageEntry" itself!)
71    */
72   size_t size;
73
74 };
75
76
77 /**
78  * Data kept per transport-connected peer.
79  */
80 struct Neighbour
81 {
82
83   /**
84    * Head of the batched message queue (already ordered, transmit
85    * starting with the head).
86    */
87   struct NeighbourMessageEntry *message_head;
88
89   /**
90    * Tail of the batched message queue (already ordered, append new
91    * messages to tail).
92    */
93   struct NeighbourMessageEntry *message_tail;
94
95   /**
96    * Handle for pending requests for transmission to this peer
97    * with the transport service.  NULL if no request is pending.
98    */
99   struct GNUNET_TRANSPORT_TransmitHandle *th;
100
101   /**
102    * Information about the key exchange with the other peer.
103    */
104   struct GSC_KeyExchangeInfo *kxinfo;
105
106   /**
107    * Identity of the other peer.
108    */
109   struct GNUNET_PeerIdentity peer;
110
111   /**
112    * ID of task used for re-trying plaintext scheduling.
113    */
114   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
115
116   /**
117    * Tracking bandwidth for sending to this peer.
118    */
119   struct GNUNET_BANDWIDTH_Tracker available_recv_window;
120
121 };
122
123
124 /**
125  * Map of peer identities to 'struct Neighbour'.
126  */
127 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
128
129 /**
130  * Transport service.
131  */
132 static struct GNUNET_TRANSPORT_Handle *transport;
133
134
135 /**
136  * Find the entry for the given neighbour.
137  *
138  * @param peer identity of the neighbour
139  * @return NULL if we are not connected, otherwise the
140  *         neighbour's entry.
141  */
142 static struct Neighbour *
143 find_neighbour (const struct GNUNET_PeerIdentity *peer)
144 {
145   return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
146 }
147
148
149 /**
150  * Free the given entry for the neighbour.
151  *
152  * @param n neighbour to free
153  */
154 static void
155 free_neighbour (struct Neighbour *n)
156 {
157   struct NeighbourMessageEntry *m;
158
159 #if DEBUG_CORE
160   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
161               "Destroying neighbour entry for peer `%4s'\n",
162               GNUNET_i2s (&n->peer));
163 #endif
164   while (NULL != (m = n->message_head))
165   {
166     GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
167     GNUNET_free (m);
168   }
169   if (NULL != n->th)
170   {
171     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
172     n->th = NULL;
173   }
174   GSC_SESSIONS_end (&n->peer);
175   if (NULL != n->kxinfo)
176   {
177     GSC_KX_stop (n->kxinfo);
178     n->kxinfo = NULL;
179   }
180   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
181   {
182     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
183     n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
184   }
185   GNUNET_assert (GNUNET_OK ==
186                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
187                                                        &n->peer.hashPubKey, n));
188   GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# neighbour entries allocated"),
189                          GNUNET_CONTAINER_multihashmap_size (neighbours),
190                          GNUNET_NO);
191   GNUNET_free (n);
192 }
193
194
195 /**
196  * Check if we have encrypted messages for the specified neighbour
197  * pending, and if so, check with the transport about sending them
198  * out.
199  *
200  * @param n neighbour to check.
201  */
202 static void
203 process_queue (struct Neighbour *n);
204
205
206 /**
207  * Function called when the transport service is ready to receive a
208  * message for the respective peer
209  *
210  * @param cls neighbour to use message from
211  * @param size number of bytes we can transmit
212  * @param buf where to copy the message
213  * @return number of bytes transmitted
214  */
215 static size_t
216 transmit_ready (void *cls, size_t size, void *buf)
217 {
218   struct Neighbour *n = cls;
219   struct NeighbourMessageEntry *m;
220   size_t ret;
221   char *cbuf;
222
223   n->th = NULL;
224   m = n->message_head;
225   if (m == NULL)
226   {
227     GNUNET_break (0);
228     return 0;
229   }
230   GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
231   if (buf == NULL)
232   {
233 #if DEBUG_CORE
234     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
235                 "Transmission of message of type %u and size %u failed\n",
236                 (unsigned int)
237                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
238                 (unsigned int) m->size);
239 #endif
240     GNUNET_free (m);
241     process_queue (n);
242     return 0;
243   }
244   ret = 0;
245   cbuf = buf;
246   GNUNET_assert (size >= m->size);
247   memcpy (cbuf, &m[1], m->size);
248   ret = m->size;
249 #if DEBUG_CORE
250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
251               "Copied message of type %u and size %u into transport buffer for `%4s'\n",
252               (unsigned int)
253               ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
254               (unsigned int) ret, GNUNET_i2s (&n->peer));
255 #endif
256   GNUNET_free (m);
257   process_queue (n);
258   GNUNET_STATISTICS_update (GSC_stats,
259                             gettext_noop
260                             ("# encrypted bytes given to transport"), ret,
261                             GNUNET_NO);
262   return ret;
263 }
264
265
266 /**
267  * Check if we have messages for the specified neighbour pending, and
268  * if so, check with the transport about sending them out.
269  *
270  * @param n neighbour to check.
271  */
272 static void
273 process_queue (struct Neighbour *n)
274 {
275   struct NeighbourMessageEntry *m;
276
277   if (n->th != NULL)
278     return;                     /* request already pending */
279   m = n->message_head;
280   if (m == NULL)
281   {
282     /* notify sessions that the queue is empty and more messages
283        could thus be queued now */
284     GSC_SESSIONS_solicit (&n->peer);
285     return;
286   }
287 #if DEBUG_CORE > 1
288   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
289               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
290               (unsigned int) m->size, GNUNET_i2s (&n->peer),
291               (unsigned long long)
292               GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
293 #endif
294   n->th =
295        GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size,
296                                                0,
297                                                GNUNET_TIME_absolute_get_remaining
298                                                (m->deadline),
299                                                &transmit_ready,
300                                                n);
301   if (n->th != NULL)
302     return;
303   /* message request too large or duplicate request */
304   GNUNET_break (0);
305   /* discard encrypted message */
306   GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
307   GNUNET_free (m);
308   process_queue (n);
309 }
310
311
312
313 /**
314  * Function called by transport to notify us that
315  * a peer connected to us (on the network level).
316  *
317  * @param cls closure
318  * @param peer the peer that connected
319  * @param ats performance data
320  * @param ats_count number of entries in ats (excluding 0-termination)
321  */
322 static void
323 handle_transport_notify_connect (void *cls,
324                                  const struct GNUNET_PeerIdentity *peer,
325                                  const struct GNUNET_TRANSPORT_ATS_Information
326                                  *ats, uint32_t ats_count)
327 {
328   struct Neighbour *n;
329
330   if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
331   {
332     GNUNET_break (0);
333     return;
334   }
335   n = find_neighbour (peer);
336   if (n != NULL)
337   {
338     /* duplicate connect notification!? */
339     GNUNET_break (0);
340     return;
341   }
342 #if DEBUG_CORE
343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received connection from `%4s'.\n",
344               GNUNET_i2s (peer));
345 #endif
346   n = GNUNET_malloc (sizeof (struct Neighbour));
347   n->peer = *peer;
348   GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window, 
349                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
350                                  MAX_WINDOW_TIME_S);
351   GNUNET_assert (GNUNET_OK ==
352                  GNUNET_CONTAINER_multihashmap_put (neighbours,
353                                                     &n->peer.hashPubKey, n,
354                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
355   GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# neighbour entries allocated"),
356                          GNUNET_CONTAINER_multihashmap_size (neighbours),
357                          GNUNET_NO);
358   GNUNET_TRANSPORT_set_quota (transport, peer, 
359                               GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, 
360                               GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT);
361   n->kxinfo = GSC_KX_start (peer);
362 }
363
364
365 /**
366  * Function called by transport telling us that a peer
367  * disconnected.
368  *
369  * @param cls closure
370  * @param peer the peer that disconnected
371  */
372 static void
373 handle_transport_notify_disconnect (void *cls,
374                                     const struct GNUNET_PeerIdentity *peer)
375 {
376   struct Neighbour *n;
377
378 #if DEBUG_CORE
379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
380               "Peer `%4s' disconnected from us; received notification from transport.\n",
381               GNUNET_i2s (peer));
382 #endif
383   n = find_neighbour (peer);
384   if (n == NULL)
385   {
386     GNUNET_break (0);
387     return;
388   }
389   free_neighbour (n);
390 }
391
392
393 /**
394  * Function called by the transport for each received message.
395  *
396  * @param cls closure
397  * @param peer (claimed) identity of the other peer
398  * @param message the message
399  * @param ats performance data
400  * @param ats_count number of entries in ats (excluding 0-termination)
401  */
402 static void
403 handle_transport_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
404                           const struct GNUNET_MessageHeader *message,
405                           const struct GNUNET_TRANSPORT_ATS_Information *ats,
406                           uint32_t ats_count)
407 {
408   struct Neighbour *n;
409   uint16_t type;
410
411 #if DEBUG_CORE > 1
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413               "Received message of type %u from `%4s', demultiplexing.\n",
414               (unsigned int) ntohs (message->type), GNUNET_i2s (peer));
415 #endif
416   if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
417   {
418     GNUNET_break (0);
419     return;
420   }
421   n = find_neighbour (peer);
422   if (n == NULL)
423   {
424     /* received message from peer that is not connected!? */
425     GNUNET_break (0);
426     return;
427   }
428   type = ntohs (message->type);
429   switch (type)
430   {
431   case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
432     GSC_KX_handle_set_key (n->kxinfo, message);
433     break;
434   case GNUNET_MESSAGE_TYPE_CORE_PING:
435     GSC_KX_handle_ping (n->kxinfo, message);
436     break;
437   case GNUNET_MESSAGE_TYPE_CORE_PONG:
438     GSC_KX_handle_pong (n->kxinfo, message);
439     break;
440   case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
441     GSC_KX_handle_encrypted_message (n->kxinfo,
442                                      message, ats,
443                                      ats_count);
444     break;
445   default:
446     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
447                 _("Unsupported message of type %u received.\n"),
448                 (unsigned int) type);
449     return;
450   }
451 }
452
453
454 /**
455  * Transmit the given message to the given target.
456  * 
457  * @param target peer that should receive the message (must be connected)
458  * @param msg message to transmit
459  * @param timeout by when should the transmission be done?
460  */
461 void
462 GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
463                          const struct GNUNET_MessageHeader *msg,
464                          struct GNUNET_TIME_Relative timeout)
465 {
466   struct NeighbourMessageEntry *me;
467   struct Neighbour *n;
468   size_t msize;
469
470   n = find_neighbour (target);
471   if (NULL == n)
472   {
473     GNUNET_break (0);
474     return;
475   }
476   msize = ntohs (msg->size);
477   me = GNUNET_malloc (sizeof (struct NeighbourMessageEntry) + msize);
478   me->deadline = GNUNET_TIME_relative_to_absolute (timeout);
479   me->size = msize;
480   memcpy (&me[1], msg, msize);
481   GNUNET_CONTAINER_DLL_insert (n->message_head,
482                                n->message_tail,
483                                me);
484   process_queue (n);
485 }
486
487
488 /**
489  * Initialize neighbours subsystem.
490  */
491 int
492 GSC_NEIGHBOURS_init ()
493 {
494   neighbours = GNUNET_CONTAINER_multihashmap_create (128);
495   transport =
496       GNUNET_TRANSPORT_connect (GSC_cfg, 
497                                 &GSC_my_identity, NULL,
498                                 &handle_transport_receive,
499                                 &handle_transport_notify_connect,
500                                 &handle_transport_notify_disconnect);
501   if (NULL == transport)
502   {
503     GNUNET_CONTAINER_multihashmap_destroy (neighbours);
504     neighbours = NULL;
505     return GNUNET_SYSERR;
506   }
507   return GNUNET_OK;
508 }
509
510
511 /**
512  * Wrapper around 'free_neighbour'.
513  *
514  * @param cls unused
515  * @param key peer identity
516  * @param value the 'struct Neighbour' to free
517  * @return GNUNET_OK (continue to iterate)
518  */
519 static int
520 free_neighbour_helper (void *cls, const GNUNET_HashCode * key, void *value)
521 {
522   struct Neighbour *n = value;
523
524   free_neighbour (n);
525   return GNUNET_OK;
526 }
527
528
529 /**
530  * Shutdown neighbours subsystem.
531  */
532 void
533 GSC_NEIGHBOURS_done ()
534 {
535   if (NULL == transport)
536     return;
537   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
538                                          NULL);
539   GNUNET_TRANSPORT_disconnect (transport);
540   transport = NULL;
541   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
542   neighbours = NULL;
543 }
544
545 /* end of gnunet-service-core_neighbours.c */
546