12d002da805382c0e2a931b6872db1848c71a352
[oweals/gnunet.git] / src / core / gnunet-service-core_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file core/gnunet-service-core_neighbours.c
23  * @brief code for managing low-level 'plaintext' connections with transport (key exchange may or may not be done yet)
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_transport_service.h"
29 #include "gnunet_service_core.h"
30 #include "gnunet_service_core-neighbours.h"
31 #include "gnunet_service_core-kx.h"
32
33
34 /**
35  * Message ready for transmission via transport service.  This struct
36  * is followed by the actual content of the message.
37  */
38 struct MessageEntry
39 {
40
41   /**
42    * We keep messages in a doubly linked list.
43    */
44   struct MessageEntry *next;
45
46   /**
47    * We keep messages in a doubly linked list.
48    */
49   struct MessageEntry *prev;
50
51   /**
52    * By when are we supposed to transmit this message?
53    */
54   struct GNUNET_TIME_Absolute deadline;
55
56   /**
57    * How long is the message? (number of bytes following the "struct
58    * MessageEntry", but not including the size of "struct
59    * MessageEntry" itself!)
60    */
61   size_t size;
62
63 };
64
65
66 /**
67  * Data kept per transport-connected peer.
68  */
69 struct Neighbour
70 {
71
72   /**
73    * Head of the batched message queue (already ordered, transmit
74    * starting with the head).
75    */
76   struct MessageEntry *message_head;
77
78   /**
79    * Tail of the batched message queue (already ordered, append new
80    * messages to tail).
81    */
82   struct MessageEntry *message_tail;
83
84   /**
85    * Handle for pending requests for transmission to this peer
86    * with the transport service.  NULL if no request is pending.
87    */
88   struct GNUNET_TRANSPORT_TransmitHandle *th;
89
90   /**
91    * Information about the key exchange with the other peer.
92    */
93   struct GSC_KeyExchangeInfo *kxinfo;
94
95   /**
96    * Identity of the other peer.
97    */
98   struct GNUNET_PeerIdentity peer;
99
100   /**
101    * ID of task used for re-trying plaintext scheduling.
102    */
103   GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
104
105   /**
106    * Tracking bandwidth for sending to this peer.
107    */
108   struct GNUNET_BANDWIDTH_Tracker available_send_window;
109
110   /**
111    * Tracking bandwidth for sending to this peer.
112    */
113   struct GNUNET_BANDWIDTH_Tracker available_recv_window;
114
115
116 };
117
118
119 /**
120  * Map of peer identities to 'struct Neighbour'.
121  */
122 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
123
124 /**
125  * Transport service.
126  */
127 static struct GNUNET_TRANSPORT_Handle *transport;
128
129
130
131 /**
132  * Find the entry for the given neighbour.
133  *
134  * @param peer identity of the neighbour
135  * @return NULL if we are not connected, otherwise the
136  *         neighbour's entry.
137  */
138 static struct Neighbour *
139 find_neighbour (const struct GNUNET_PeerIdentity *peer)
140 {
141   return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
142 }
143
144
145 /**
146  * Free the given entry for the neighbour.
147  *
148  * @param n neighbour to free
149  */
150 static void
151 free_neighbour (struct Neighbour *n)
152 {
153   struct MessageEntry *m;
154
155 #if DEBUG_CORE
156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
157               "Destroying neighbour entry for peer `%4s'\n",
158               GNUNET_i2s (&n->peer));
159 #endif
160   while (NULL != (m = n->message_head))
161   {
162     GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
163     GNUNET_free (m);
164   }
165   if (NULL != n->th)
166   {
167     GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
168     n->th = NULL;
169   }
170   if (NULL != n->kx)
171   {
172     GSC_KX_stop (n->kx);
173     n->kx = NULL;
174   }
175   if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
176   {
177     GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
178     n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
179   }
180   GNUNET_assert (GNUNET_OK ==
181                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
182                                                        &n->peer.hashPubKey, n));
183   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
184                          GNUNET_CONTAINER_multihashmap_size (neighbours),
185                          GNUNET_NO);
186   GNUNET_free (n);
187 }
188
189
190 /**
191  * Check if we have encrypted messages for the specified neighbour
192  * pending, and if so, check with the transport about sending them
193  * out.
194  *
195  * @param n neighbour to check.
196  */
197 static void
198 process_queue (struct Neighbour *n);
199
200
201 /**
202  * Function called when the transport service is ready to receive a
203  * message for the respective peer
204  *
205  * @param cls neighbour to use message from
206  * @param size number of bytes we can transmit
207  * @param buf where to copy the message
208  * @return number of bytes transmitted
209  */
210 static size_t
211 transmit_ready (void *cls, size_t size, void *buf)
212 {
213   struct Neighbour *n = cls;
214   struct MessageEntry *m;
215   size_t ret;
216   char *cbuf;
217
218   n->th = NULL;
219   m = n->message_head;
220   if (m == NULL)
221   {
222 #if DEBUG_CORE
223     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
224                 "Encrypted message queue empty, no messages added to buffer for `%4s'\n",
225                 GNUNET_i2s (&n->peer));
226 #endif
227     return 0;
228   }
229   GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
230   if (buf == NULL)
231   {
232 #if DEBUG_CORE
233     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
234                 "Transmission of message of type %u and size %u failed\n",
235                 (unsigned int)
236                 ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
237                 (unsigned int) m->size);
238 #endif
239     GNUNET_free (m);
240     process_queue (n);
241     return 0;
242   }
243   ret = 0;
244   cbuf = buf;
245   GNUNET_assert (size >= m->size);
246   memcpy (cbuf, &m[1], m->size);
247   ret = m->size;
248   GNUNET_BANDWIDTH_tracker_consume (&n->available_send_window, m->size);
249 #if DEBUG_CORE
250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
251               "Copied message of type %u and size %u into transport buffer for `%4s'\n",
252               (unsigned int)
253               ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
254               (unsigned int) ret, GNUNET_i2s (&n->peer));
255 #endif
256   GNUNET_free (m);
257   process_queue (n);
258   GNUNET_STATISTICS_update (GSC_stats,
259                             gettext_noop
260                             ("# encrypted bytes given to transport"), ret,
261                             GNUNET_NO);
262   return ret;
263 }
264
265
266 /**
267  * Check if we have messages for the specified neighbour pending, and
268  * if so, check with the transport about sending them out.
269  *
270  * @param n neighbour to check.
271  */
272 static void
273 process_queue (struct Neighbour *n)
274 {
275   struct MessageEntry *m;
276
277   if (n->th != NULL)
278     return;                     /* request already pending */
279   m = n->message_head;
280   if (m == NULL)
281     return;
282 #if DEBUG_CORE > 1
283   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
284               "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
285               (unsigned int) m->size, GNUNET_i2s (&n->peer),
286               (unsigned long long)
287               GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
288 #endif
289   n->th =
290        GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size,
291                                                m->priority,
292                                                GNUNET_TIME_absolute_get_remaining
293                                                (m->deadline),
294                                                &transmit_ready,
295                                                n);
296   if (n->th != NULL)
297     return;
298   /* message request too large or duplicate request */
299   GNUNET_break (0);
300   /* discard encrypted message */
301   GNUNET_CONTAINER_DLL_remove (n->encrypted_head, n->encrypted_tail, m);
302   GNUNET_free (m);
303   process_queue (n);
304 }
305
306
307
308 /**
309  * Function called by transport to notify us that
310  * a peer connected to us (on the network level).
311  *
312  * @param cls closure
313  * @param peer the peer that connected
314  * @param ats performance data
315  * @param ats_count number of entries in ats (excluding 0-termination)
316  */
317 static void
318 handle_transport_notify_connect (void *cls,
319                                  const struct GNUNET_PeerIdentity *peer,
320                                  const struct GNUNET_TRANSPORT_ATS_Information
321                                  *ats, uint32_t ats_count)
322 {
323   struct Neighbour *n;
324
325   if (0 == memcmp (peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
326   {
327     GNUNET_break (0);
328     return;
329   }
330   n = find_neighbour (peer);
331   if (n != NULL)
332   {
333     /* duplicate connect notification!? */
334     GNUNET_break (0);
335     return;
336   }
337 #if DEBUG_CORE
338   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received connection from `%4s'.\n",
339               GNUNET_i2s (peer));
340 #endif
341   n = GNUNET_malloc (sizeof (struct Neighbour));
342   n->peer = *pid;
343   GNUNET_BANDWIDTH_tracker_init (&n->available_send_window, 
344                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
345                                  MAX_WINDOW_TIME_S);
346   GNUNET_BANDWIDTH_tracker_init (&n->available_recv_window, 
347                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
348                                  MAX_WINDOW_TIME_S);
349   GNUNET_assert (GNUNET_OK ==
350                  GNUNET_CONTAINER_multihashmap_put (neighbours,
351                                                     &n->peer.hashPubKey, n,
352                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
353   GNUNET_STATISTICS_set (stats, gettext_noop ("# neighbour entries allocated"),
354                          GNUNET_CONTAINER_multihashmap_size (neighbours),
355                          GNUNET_NO);
356   GNUNET_TRANSPORT_set_quota (transport, peer, 
357                               GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, 
358                               GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT);
359   n->kx = GSC_KX_start (pid);
360 }
361
362
363 /**
364  * Function called by transport telling us that a peer
365  * disconnected.
366  *
367  * @param cls closure
368  * @param peer the peer that disconnected
369  */
370 static void
371 handle_transport_notify_disconnect (void *cls,
372                                     const struct GNUNET_PeerIdentity *peer)
373 {
374   struct Neighbour *n;
375
376 #if DEBUG_CORE
377   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
378               "Peer `%4s' disconnected from us; received notification from transport.\n",
379               GNUNET_i2s (peer));
380 #endif
381   n = find_neighbour (peer);
382   if (n == NULL)
383   {
384     GNUNET_break (0);
385     return;
386   }
387   free_neighbour (n);
388 }
389
390
391 /**
392  * Function called by the transport for each received message.
393  *
394  * @param cls closure
395  * @param peer (claimed) identity of the other peer
396  * @param message the message
397  * @param ats performance data
398  * @param ats_count number of entries in ats (excluding 0-termination)
399  */
400 static void
401 handle_transport_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
402                           const struct GNUNET_MessageHeader *message,
403                           const struct GNUNET_TRANSPORT_ATS_Information *ats,
404                           uint32_t ats_count)
405 {
406   struct Neighbour *n;
407   struct GNUNET_TIME_Absolute now;
408   int up;
409   uint16_t type;
410   uint16_t size;
411   int changed;
412
413 #if DEBUG_CORE > 1
414   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
415               "Received message of type %u from `%4s', demultiplexing.\n",
416               (unsigned int) ntohs (message->type), GNUNET_i2s (peer));
417 #endif
418   if (0 == memcmp (peer, &my_identity, sizeof (struct GNUNET_PeerIdentity)))
419   {
420     GNUNET_break (0);
421     return;
422   }
423   n = find_neighbour (peer);
424   if (n == NULL)
425   {
426     /* received message from peer that is not connected!? */
427     GNUNET_break (0);
428     return;
429   }
430
431
432   changed = GNUNET_NO;
433   up = (n->status == PEER_STATE_KEY_CONFIRMED);
434   type = ntohs (message->type);
435   size = ntohs (message->size);
436   switch (type)
437   {
438   case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
439     if (size != sizeof (struct SetKeyMessage))
440     {
441       GNUNET_break_op (0);
442       return;
443     }
444     GNUNET_STATISTICS_update (stats, gettext_noop ("# session keys received"),
445                               1, GNUNET_NO);
446     handle_set_key (n, (const struct SetKeyMessage *) message, ats, ats_count);
447     break;
448   case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
449     if (size <
450         sizeof (struct EncryptedMessage) + sizeof (struct GNUNET_MessageHeader))
451     {
452       GNUNET_break_op (0);
453       return;
454     }
455     if ((n->status != PEER_STATE_KEY_RECEIVED) &&
456         (n->status != PEER_STATE_KEY_CONFIRMED))
457     {
458       GNUNET_STATISTICS_update (stats,
459                                 gettext_noop
460                                 ("# failed to decrypt message (no session key)"),
461                                 1, GNUNET_NO);
462       send_key (n);
463       return;
464     }
465     handle_encrypted_message (n, (const struct EncryptedMessage *) message, ats,
466                               ats_count);
467     break;
468   case GNUNET_MESSAGE_TYPE_CORE_PING:
469     if (size != sizeof (struct PingMessage))
470     {
471       GNUNET_break_op (0);
472       return;
473     }
474     GNUNET_STATISTICS_update (stats, gettext_noop ("# PING messages received"),
475                               1, GNUNET_NO);
476     if ((n->status != PEER_STATE_KEY_RECEIVED) &&
477         (n->status != PEER_STATE_KEY_CONFIRMED))
478     {
479 #if DEBUG_CORE > 1
480       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
481                   "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
482                   "PING", GNUNET_i2s (&n->peer));
483 #endif
484       GNUNET_free_non_null (n->pending_ping);
485       n->pending_ping = GNUNET_malloc (sizeof (struct PingMessage));
486       memcpy (n->pending_ping, message, sizeof (struct PingMessage));
487       return;
488     }
489     handle_ping (n, (const struct PingMessage *) message, ats, ats_count);
490     break;
491   case GNUNET_MESSAGE_TYPE_CORE_PONG:
492     if (size != sizeof (struct PongMessage))
493     {
494       GNUNET_break_op (0);
495       return;
496     }
497     GNUNET_STATISTICS_update (stats, gettext_noop ("# PONG messages received"),
498                               1, GNUNET_NO);
499     if ((n->status != PEER_STATE_KEY_RECEIVED) &&
500         (n->status != PEER_STATE_KEY_CONFIRMED))
501     {
502 #if DEBUG_CORE > 1
503       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504                   "Core service receives `%s' request from `%4s' but have not processed key; marking as pending.\n",
505                   "PONG", GNUNET_i2s (&n->peer));
506 #endif
507       GNUNET_free_non_null (n->pending_pong);
508       n->pending_pong = GNUNET_malloc (sizeof (struct PongMessage));
509       memcpy (n->pending_pong, message, sizeof (struct PongMessage));
510       return;
511     }
512     handle_pong (n, (const struct PongMessage *) message, ats, ats_count);
513     break;
514   default:
515     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
516                 _("Unsupported message of type %u received.\n"),
517                 (unsigned int) type);
518     return;
519   }
520   if (n->status == PEER_STATE_KEY_CONFIRMED)
521   {
522     now = GNUNET_TIME_absolute_get ();
523     n->last_activity = now;
524     changed = GNUNET_YES;
525     if (!up)
526     {
527       GNUNET_STATISTICS_update (stats, gettext_noop ("# established sessions"),
528                                 1, GNUNET_NO);
529       n->time_established = now;
530     }
531     if (n->keep_alive_task != GNUNET_SCHEDULER_NO_TASK)
532       GNUNET_SCHEDULER_cancel (n->keep_alive_task);
533     n->keep_alive_task =
534         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide
535                                       (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
536                                        2), &send_keep_alive, n);
537   }
538   if (changed)
539     handle_peer_status_change (n);
540 }
541
542
543 /**
544  * Transmit the given message to the given target.
545  * 
546  * @param target peer that should receive the message (must be connected)
547  * @param msg message to transmit
548  * @param timeout by when should the transmission be done?
549  */
550 void
551 GDS_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
552                          const struct GNUNET_MessageHeader *msg,
553                          struct GNUNET_TIME_Relative timeout)
554 {
555   
556 }
557
558
559 /**
560  * Initialize neighbours subsystem.
561  */
562 int
563 GSC_NEIGHBOURS_init ()
564 {
565   neighbours = GNUNET_CONTAINER_multihashmap_create (128);
566   transport =
567       GNUNET_TRANSPORT_connect (GSC_cfg, 
568                                 &GSC_my_identity, NULL,
569                                 &handle_transport_receive,
570                                 &handle_transport_notify_connect,
571                                 &handle_transport_notify_disconnect);
572   if (NULL == transport)
573   {
574     GNUNET_CONTAINER_multihashmap_destroy (neighbours);
575     neighbours = NULL;
576     return GNUNET_SYSERR;
577   }
578   return GNUNET_OK;
579 }
580
581
582 /**
583  * Wrapper around 'free_neighbour'.
584  *
585  * @param cls unused
586  * @param key peer identity
587  * @param value the 'struct Neighbour' to free
588  * @return GNUNET_OK (continue to iterate)
589  */
590 static int
591 free_neighbour_helper (void *cls, const GNUNET_HashCode * key, void *value)
592 {
593   struct Neighbour *n = value;
594
595   free_neighbour (n);
596   return GNUNET_OK;
597 }
598
599
600 /**
601  * Shutdown neighbours subsystem.
602  */
603 void
604 GSC_NEIGHBOURS_done ()
605 {
606   if (NULL == transport)
607     return;
608   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
609                                          NULL);
610   GNUNET_TRANSPORT_disconnect (transport);
611   transport = NULL;
612   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
613   neighbours = NULL;
614 }
615
616 /* end of gnunet-service-core_neighbours.c */
617