hxing
[oweals/gnunet.git] / src / core / gnunet-service-core_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_neighbours.c
23  * @brief code for managing low-level 'plaintext' connections with transport (key exchange may or may not be done yet)
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet_transport_service.h"
30 #include "gnunet-service-core.h"
31 #include "gnunet-service-core_neighbours.h"
32 #include "gnunet-service-core_kx.h"
33 #include "gnunet-service-core_sessions.h"
34 #include "gnunet_constants.h"
35
36
37 /**
38  * Receive and send buffer windows grow over time.  For
39  * how long can 'unused' bandwidth accumulate before we
40  * need to cap it?  (specified in seconds).
41  */
42 #define MAX_WINDOW_TIME_S (5 * 60)
43
44
45 /**
46  * Message ready for transmission via transport service.  This struct
47  * is followed by the actual content of the message.
48  */
49 struct NeighbourMessageEntry
50 {
51
52   /**
53    * We keep messages in a doubly linked list.
54    */
55   struct NeighbourMessageEntry *next;
56
57   /**
58    * We keep messages in a doubly linked list.
59    */
60   struct NeighbourMessageEntry *prev;
61
62   /**
63    * By when are we supposed to transmit this message?
64    */
65   struct GNUNET_TIME_Absolute deadline;
66
67   /**
68    * How long is the message? (number of bytes following the "struct
69    * MessageEntry", but not including the size of "struct
70    * MessageEntry" itself!)
71    */
72   size_t size;
73
74 };
75
76
77 /**
78  * Data kept per transport-connected peer.
79  */
80 struct Neighbour
81 {
82
83   /**
84    * Head of the batched message queue (already ordered, transmit
85    * starting with the head).
86    */
87   struct NeighbourMessageEntry *message_head;
88
89   /**
90    * Tail of the batched message queue (already ordered, append new
91    * messages to tail).
92    */
93   struct NeighbourMessageEntry *message_tail;
94
95   /**
96    * Handle for pending requests for transmission to this peer
97    * with the transport service.  NULL if no request is pending.
98    */
99   struct GNUNET_TRANSPORT_TransmitHandle *th;
100
101   /**
102    * Information about the key exchange with the other peer.
103    */
104   struct GSC_KeyExchangeInfo *kxinfo;
105
106   /**
107    * Identity of the other peer.
108    */
109   struct GNUNET_PeerIdentity peer;
110
111   /**
112    * ID of task used for re-trying plaintext scheduling.
113    */
114   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
115
116   /**
117    * Tracking bandwidth for sending to this peer.
118    */
119   struct GNUNET_BANDWIDTH_Tracker available_send_window;
120
121   /**
122    * Tracking bandwidth for sending to this peer.
123    */
124   struct GNUNET_BANDWIDTH_Tracker available_recv_window;
125
126 };
127
128
129 /**
130  * Map of peer identities to 'struct Neighbour'.
131  */
132 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
133
134 /**
135  * Transport service.
136  */
137 static struct GNUNET_TRANSPORT_Handle *transport;
138
139
140 /**
141  * Find the entry for the given neighbour.
142  *
143  * @param peer identity of the neighbour
144  * @return NULL if we are not connected, otherwise the
145  *         neighbour's entry.
146  */
147 static struct Neighbour *
148 find_neighbour (const struct GNUNET_PeerIdentity *peer)
149 {
150   return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
151 }
152
153
154 /**
155  * Free the given entry for the neighbour.
156  *
157  * @param n neighbour to free
158  */
159 static void
160 free_neighbour (struct Neighbour *n)
161 {
162   struct NeighbourMessageEntry *m;
163
164 #if DEBUG_CORE
165   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
166               "Destroying neighbour entry for peer `%4s'\n",
167               GNUNET_i2s (&n->peer));
168 #endif
169   while (NULL != (m = n->message_head))
170   {
171     GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
172     GNUNET_free (m);
173   }
174   if (NULL != n->th)
175   {
176     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
177     n->th = NULL;
178   }
179   GSC_SESSIONS_end (&n->peer);
180   if (NULL != n->kxinfo)
181   {
182     GSC_KX_stop (n->kxinfo);
183     n->kxinfo = NULL;
184   }
185   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
186   {
187     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
188     n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
189   }
190   GNUNET_assert (GNUNET_OK ==
191                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
192                                                        &n->peer.hashPubKey, n));
193   GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# neighbour entries allocated"),
194                          GNUNET_CONTAINER_multihashmap_size (neighbours),
195                          GNUNET_NO);
196   GNUNET_free (n);
197 }
198
199
200 /**
201  * Check if we have encrypted messages for the specified neighbour
202  * pending, and if so, check with the transport about sending them
203  * out.
204  *
205  * @param n neighbour to check.
206  */
207 static void
208 process_queue (struct Neighbour *n);
209
210
211 /**
212  * Function called when the transport service is ready to receive a
213  * message for the respective peer
214  *
215  * @param cls neighbour to use message from
216  * @param size number of bytes we can transmit
217  * @param buf where to copy the message
218  * @return number of bytes transmitted
219  */
220 static size_t
221 transmit_ready (void *cls, size_t size, void *buf)
222 {
223   struct Neighbour *n = cls;
224   struct NeighbourMessageEntry *m;
225   size_t ret;
226   char *cbuf;
227
228   n->th = NULL;
229   m = n->message_head;
230   if (m == NULL)
231   {
232     GNUNET_break (0);
233     return 0;
234   }
235   GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
236   if (buf == NULL)
237   {
238 #if DEBUG_CORE
239     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
240                 "Transmission of message of type %u and size %u failed\n",
241                 (unsigned int)
242                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
243                 (unsigned int) m->size);
244 #endif
245     GNUNET_free (m);
246     process_queue (n);
247     return 0;
248   }
249   ret = 0;
250   cbuf = buf;
251   GNUNET_assert (size >= m->size);
252   memcpy (cbuf, &m[1], m->size);
253   ret = m->size;
254   GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size);
255 #if DEBUG_CORE
256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
257               "Copied message of type %u and size %u into transport buffer for `%4s'\n",
258               (unsigned int)
259               ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
260               (unsigned int) ret, GNUNET_i2s (&n->peer));
261 #endif
262   GNUNET_free (m);
263   process_queue (n);
264   GNUNET_STATISTICS_update (GSC_stats,
265                             gettext_noop
266                             ("# encrypted bytes given to transport"), ret,
267                             GNUNET_NO);
268   return ret;
269 }
270
271
272 /**
273  * Check if we have messages for the specified neighbour pending, and
274  * if so, check with the transport about sending them out.
275  *
276  * @param n neighbour to check.
277  */
278 static void
279 process_queue (struct Neighbour *n)
280 {
281   struct NeighbourMessageEntry *m;
282
283   if (n->th != NULL)
284     return;                     /* request already pending */
285   m = n->message_head;
286   if (m == NULL)
287   {
288     /* notify sessions that the queue is empty and more messages
289        could thus be queued now */
290     GSC_SESSIONS_solicit (&n->peer);
291     return;
292   }
293 #if DEBUG_CORE > 1
294   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
296               (unsigned int) m->size, GNUNET_i2s (&n->peer),
297               (unsigned long long)
298               GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
299 #endif
300   n->th =
301        GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size,
302                                                0,
303                                                GNUNET_TIME_absolute_get_remaining
304                                                (m->deadline),
305                                                &transmit_ready,
306                                                n);
307   if (n->th != NULL)
308     return;
309   /* message request too large or duplicate request */
310   GNUNET_break (0);
311   /* discard encrypted message */
312   GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
313   GNUNET_free (m);
314   process_queue (n);
315 }
316
317
318
319 /**
320  * Function called by transport to notify us that
321  * a peer connected to us (on the network level).
322  *
323  * @param cls closure
324  * @param peer the peer that connected
325  * @param ats performance data
326  * @param ats_count number of entries in ats (excluding 0-termination)
327  */
328 static void
329 handle_transport_notify_connect (void *cls,
330                                  const struct GNUNET_PeerIdentity *peer,
331                                  const struct GNUNET_TRANSPORT_ATS_Information
332                                  *ats, uint32_t ats_count)
333 {
334   struct Neighbour *n;
335
336   if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
337   {
338     GNUNET_break (0);
339     return;
340   }
341   n = find_neighbour (peer);
342   if (n != NULL)
343   {
344     /* duplicate connect notification!? */
345     GNUNET_break (0);
346     return;
347   }
348 #if DEBUG_CORE
349   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received connection from `%4s'.\n",
350               GNUNET_i2s (peer));
351 #endif
352   n = GNUNET_malloc (sizeof (struct Neighbour));
353   n->peer = *peer;
354   GNUNET_BANDWIDTH_tracker_init (&n->available_send_window, 
355                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
356                                  MAX_WINDOW_TIME_S);
357   GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window, 
358                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
359                                  MAX_WINDOW_TIME_S);
360   GNUNET_assert (GNUNET_OK ==
361                  GNUNET_CONTAINER_multihashmap_put (neighbours,
362                                                     &n->peer.hashPubKey, n,
363                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
364   GNUNET_STATISTICS_set (GSC_stats, gettext_noop ("# neighbour entries allocated"),
365                          GNUNET_CONTAINER_multihashmap_size (neighbours),
366                          GNUNET_NO);
367   GNUNET_TRANSPORT_set_quota (transport, peer, 
368                               GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, 
369                               GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT);
370   n->kxinfo = GSC_KX_start (peer);
371 }
372
373
374 /**
375  * Function called by transport telling us that a peer
376  * disconnected.
377  *
378  * @param cls closure
379  * @param peer the peer that disconnected
380  */
381 static void
382 handle_transport_notify_disconnect (void *cls,
383                                     const struct GNUNET_PeerIdentity *peer)
384 {
385   struct Neighbour *n;
386
387 #if DEBUG_CORE
388   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
389               "Peer `%4s' disconnected from us; received notification from transport.\n",
390               GNUNET_i2s (peer));
391 #endif
392   n = find_neighbour (peer);
393   if (n == NULL)
394   {
395     GNUNET_break (0);
396     return;
397   }
398   free_neighbour (n);
399 }
400
401
402 /**
403  * Function called by the transport for each received message.
404  *
405  * @param cls closure
406  * @param peer (claimed) identity of the other peer
407  * @param message the message
408  * @param ats performance data
409  * @param ats_count number of entries in ats (excluding 0-termination)
410  */
411 static void
412 handle_transport_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
413                           const struct GNUNET_MessageHeader *message,
414                           const struct GNUNET_TRANSPORT_ATS_Information *ats,
415                           uint32_t ats_count)
416 {
417   struct Neighbour *n;
418   uint16_t type;
419
420 #if DEBUG_CORE > 1
421   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
422               "Received message of type %u from `%4s', demultiplexing.\n",
423               (unsigned int) ntohs (message->type), GNUNET_i2s (peer));
424 #endif
425   if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
426   {
427     GNUNET_break (0);
428     return;
429   }
430   n = find_neighbour (peer);
431   if (n == NULL)
432   {
433     /* received message from peer that is not connected!? */
434     GNUNET_break (0);
435     return;
436   }
437   type = ntohs (message->type);
438   switch (type)
439   {
440   case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
441     GSC_KX_handle_set_key (n->kxinfo, message);
442     break;
443   case GNUNET_MESSAGE_TYPE_CORE_PING:
444     GSC_KX_handle_ping (n->kxinfo, message);
445     break;
446   case GNUNET_MESSAGE_TYPE_CORE_PONG:
447     GSC_KX_handle_pong (n->kxinfo, message);
448     break;
449   case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
450     GSC_KX_handle_encrypted_message (n->kxinfo,
451                                      message, ats,
452                                      ats_count);
453     break;
454   default:
455     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
456                 _("Unsupported message of type %u received.\n"),
457                 (unsigned int) type);
458     return;
459   }
460 }
461
462
463 /**
464  * Transmit the given message to the given target.
465  * 
466  * @param target peer that should receive the message (must be connected)
467  * @param msg message to transmit
468  * @param timeout by when should the transmission be done?
469  */
470 void
471 GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
472                          const struct GNUNET_MessageHeader *msg,
473                          struct GNUNET_TIME_Relative timeout)
474 {
475   struct NeighbourMessageEntry *me;
476   struct Neighbour *n;
477   size_t msize;
478
479   n = find_neighbour (target);
480   if (NULL == n)
481   {
482     GNUNET_break (0);
483     return;
484   }
485   msize = ntohs (msg->size);
486   me = GNUNET_malloc (sizeof (struct NeighbourMessageEntry) + msize);
487   me->deadline = GNUNET_TIME_relative_to_absolute (timeout);
488   me->size = msize;
489   memcpy (&me[1], msg, msize);
490   GNUNET_CONTAINER_DLL_insert (n->message_head,
491                                n->message_tail,
492                                me);
493   process_queue (n);
494 }
495
496
497 /**
498  * Initialize neighbours subsystem.
499  */
500 int
501 GSC_NEIGHBOURS_init ()
502 {
503   neighbours = GNUNET_CONTAINER_multihashmap_create (128);
504   transport =
505       GNUNET_TRANSPORT_connect (GSC_cfg, 
506                                 &GSC_my_identity, NULL,
507                                 &handle_transport_receive,
508                                 &handle_transport_notify_connect,
509                                 &handle_transport_notify_disconnect);
510   if (NULL == transport)
511   {
512     GNUNET_CONTAINER_multihashmap_destroy (neighbours);
513     neighbours = NULL;
514     return GNUNET_SYSERR;
515   }
516   return GNUNET_OK;
517 }
518
519
520 /**
521  * Wrapper around 'free_neighbour'.
522  *
523  * @param cls unused
524  * @param key peer identity
525  * @param value the 'struct Neighbour' to free
526  * @return GNUNET_OK (continue to iterate)
527  */
528 static int
529 free_neighbour_helper (void *cls, const GNUNET_HashCode * key, void *value)
530 {
531   struct Neighbour *n = value;
532
533   free_neighbour (n);
534   return GNUNET_OK;
535 }
536
537
538 /**
539  * Shutdown neighbours subsystem.
540  */
541 void
542 GSC_NEIGHBOURS_done ()
543 {
544   if (NULL == transport)
545     return;
546   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
547                                          NULL);
548   GNUNET_TRANSPORT_disconnect (transport);
549   transport = NULL;
550   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
551   neighbours = NULL;
552 }
553
554 /* end of gnunet-service-core_neighbours.c */
555