complete state reset functionality
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet_constants.h"
35 #include "transport.h"
36
37
38 /**
39  * Size of the neighbour hash map.
40  */
41 #define NEIGHBOUR_TABLE_SIZE 256
42
43 /**
44  * How often must a peer violate bandwidth quotas before we start
45  * to simply drop its messages?
46  */
47 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
48
49 /**
50  * How often do we send KEEPALIVE messages to each of our neighbours?
51  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
52  * send 3 keepalives in each interval, so 3 messages would need to be
53  * lost in a row for a disconnect).
54  */
55 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
56
57
58 /**
59  * Entry in neighbours.
60  */
61 struct NeighbourMapEntry;
62
63 /**
64  * Message a peer sends to another to indicate its
65  * preference for communicating via a particular
66  * session (and the desire to establish a real
67  * connection).
68  */
69 struct SessionConnectMessage
70 {
71   /**
72    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
73    */
74   struct GNUNET_MessageHeader header;
75
76   /**
77    * Always zero.
78    */
79   uint32_t reserved GNUNET_PACKED;
80
81   /**
82    * Absolute time at the sender.  Only the most recent connect
83    * message implies which session is preferred by the sender.
84    */
85   struct GNUNET_TIME_AbsoluteNBO timestamp;
86
87 };
88
89
90 struct SessionDisconnectMessage
91 {
92   /**
93    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
94    */
95   struct GNUNET_MessageHeader header;
96
97   /**
98    * Always zero.
99    */
100   uint32_t reserved GNUNET_PACKED;
101
102   /**
103    * Purpose of the signature.  Extends over the timestamp.
104    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
105    */
106   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
107
108   /**
109    * Absolute time at the sender.  Only the most recent connect
110    * message implies which session is preferred by the sender.
111    */
112   struct GNUNET_TIME_AbsoluteNBO timestamp;
113
114   /**
115    * Public key of the sender.
116    */
117   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
118   
119   /**
120    * Signature of the peer that sends us the disconnect.  Only
121    * valid if the timestamp is AFTER the timestamp from the
122    * corresponding 'CONNECT' message.
123    */
124   struct GNUNET_CRYPTO_RsaSignature signature;
125
126 };
127
128
129 /**
130  * For each neighbour we keep a list of messages
131  * that we still want to transmit to the neighbour.
132  */
133 struct MessageQueue
134 {
135
136   /**
137    * This is a doubly linked list.
138    */
139   struct MessageQueue *next;
140
141   /**
142    * This is a doubly linked list.
143    */
144   struct MessageQueue *prev;
145
146   /**
147    * Once this message is actively being transmitted, which
148    * neighbour is it associated with?
149    */
150   struct NeighbourMapEntry *n;
151
152   /**
153    * Function to call once we're done.
154    */
155   GST_NeighbourSendContinuation cont;
156
157   /**
158    * Closure for 'cont'
159    */
160   void *cont_cls;
161
162   /**
163    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
164    * stuck together in memory.  Allocated at the end of this struct.
165    */
166   const char *message_buf;
167
168   /**
169    * Size of the message buf
170    */
171   size_t message_buf_size;
172
173   /**
174    * At what time should we fail?
175    */
176   struct GNUNET_TIME_Absolute timeout;
177
178 };
179
180
181 /**
182  * Entry in neighbours.
183  */
184 struct NeighbourMapEntry
185 {
186
187   /**
188    * Head of list of messages we would like to send to this peer;
189    * must contain at most one message per client.
190    */
191   struct MessageQueue *messages_head;
192
193   /**
194    * Tail of list of messages we would like to send to this peer; must
195    * contain at most one message per client.
196    */
197   struct MessageQueue *messages_tail;
198
199   /**
200    * Performance data for the peer.
201    */
202   //struct GNUNET_ATS_Information *ats;
203
204   /**
205    * Are we currently trying to send a message? If so, which one?
206    */
207   struct MessageQueue *is_active;
208
209   /**
210    * Active session for communicating with the peer.
211    */
212   struct Session *session;
213
214   /**
215    * Name of the plugin we currently use.
216    */
217   char *plugin_name;
218
219   /**
220    * Address used for communicating with the peer, NULL for inbound connections.
221    */
222   void *addr;
223
224   /**
225    * Number of bytes in 'addr'.
226    */
227   size_t addrlen;
228
229   /**
230    * Identity of this neighbour.
231    */
232   struct GNUNET_PeerIdentity id;
233
234   /**
235    * ID of task scheduled to run when this peer is about to
236    * time out (will free resources associated with the peer).
237    */
238   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
239
240   /**
241    * ID of task scheduled to send keepalives.
242    */
243   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
244
245   /**
246    * ID of task scheduled to run when we should try transmitting
247    * the head of the message queue.
248    */
249   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
250
251   /**
252    * Tracker for inbound bandwidth.
253    */
254   struct GNUNET_BANDWIDTH_Tracker in_tracker;
255
256   /**
257    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
258    */
259   struct GNUNET_TIME_Absolute connect_ts;
260
261   /**
262    * How often has the other peer (recently) violated the inbound
263    * traffic limit?  Incremented by 10 per violation, decremented by 1
264    * per non-violation (for each time interval).
265    */
266   unsigned int quota_violation_count;
267
268   /**
269    * Number of values in 'ats' array.
270    */
271   //unsigned int ats_count;
272
273   /**
274    * Are we already in the process of disconnecting this neighbour?
275    */
276   int in_disconnect;
277
278   /**
279    * Do we currently consider this neighbour connected? (as far as
280    * the connect/disconnect callbacks are concerned)?
281    */
282   int is_connected;
283
284 };
285
286
287 /**
288  * All known neighbours and their HELLOs.
289  */
290 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
291
292 /**
293  * Closure for connect_notify_cb and disconnect_notify_cb
294  */
295 static void *callback_cls;
296
297 /**
298  * Function to call when we connected to a neighbour.
299  */
300 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
301
302 /**
303  * Function to call when we disconnected from a neighbour.
304  */
305 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
306
307 /**
308  * counter for connected neighbours
309  */
310 static int neighbours_connected;
311
312 /**
313  * Lookup a neighbour entry in the neighbours hash map.
314  *
315  * @param pid identity of the peer to look up
316  * @return the entry, NULL if there is no existing record
317  */
318 static struct NeighbourMapEntry *
319 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
320 {
321   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
322 }
323
324
325 /**
326  * Task invoked to start a transmission to another peer.
327  *
328  * @param cls the 'struct NeighbourMapEntry'
329  * @param tc scheduler context
330  */
331 static void
332 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
333
334
335 /**
336  * We're done with our transmission attempt, continue processing.
337  *
338  * @param cls the 'struct MessageQueue' of the message
339  * @param receiver intended receiver
340  * @param success whether it worked or not
341  */
342 static void
343 transmit_send_continuation (void *cls,
344                             const struct GNUNET_PeerIdentity *receiver,
345                             int success)
346 {
347   struct MessageQueue *mq;
348   struct NeighbourMapEntry *n;
349
350   mq = cls;
351   n = mq->n;
352   if (NULL != n)
353   {
354     GNUNET_assert (n->is_active == mq);
355     n->is_active = NULL;
356     if (success == GNUNET_YES)
357     {
358       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
359       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
360     }
361   }
362   if (NULL != mq->cont)
363     mq->cont (mq->cont_cls, success);
364   GNUNET_free (mq);
365 }
366
367
368 /**
369  * Check the ready list for the given neighbour and if a plugin is
370  * ready for transmission (and if we have a message), do so!
371  *
372  * @param n target peer for which to transmit
373  */
374 static void
375 try_transmission_to_peer (struct NeighbourMapEntry *n)
376 {
377   struct MessageQueue *mq;
378   struct GNUNET_TIME_Relative timeout;
379   ssize_t ret;
380   struct GNUNET_TRANSPORT_PluginFunctions *papi;
381
382   if (n->is_active != NULL)
383     return;                     /* transmission already pending */
384   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
385     return;                     /* currently waiting for bandwidth */
386   while (NULL != (mq = n->messages_head))
387   {
388     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
389     if (timeout.rel_value > 0)
390       break;
391     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
392     n->is_active = mq;
393     mq->n = n;
394     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
395   }
396   if (NULL == mq)
397     return;                     /* no more messages */
398
399   papi = GST_plugins_find (n->plugin_name);
400   if (papi == NULL)
401   {
402     GNUNET_break (0);
403     return;
404   }
405   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
406   n->is_active = mq;
407   mq->n = n;
408
409   if  ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
410   {
411     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
412                 GNUNET_i2s (&n->id));
413     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
414     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
415     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
416     return;
417   }
418
419   ret =
420       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
421                   0 /* priority -- remove from plugin API? */ ,
422                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
423                   &transmit_send_continuation, mq);
424   if (ret == -1)
425   {
426     /* failure, but 'send' would not call continuation in this case,
427      * so we need to do it here! */
428     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
429   }
430 }
431
432
433 /**
434  * Task invoked to start a transmission to another peer.
435  *
436  * @param cls the 'struct NeighbourMapEntry'
437  * @param tc scheduler context
438  */
439 static void
440 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
441 {
442   struct NeighbourMapEntry *n = cls;
443   GNUNET_assert (NULL != lookup_neighbour(&n->id));
444   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
445   try_transmission_to_peer (n);
446 }
447
448
449 /**
450  * Initialize the neighbours subsystem.
451  *
452  * @param cls closure for callbacks
453  * @param connect_cb function to call if we connect to a peer
454  * @param disconnect_cb function to call if we disconnect from a peer
455  */
456 void
457 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
458                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
459 {
460   callback_cls = cls;
461   connect_notify_cb = connect_cb;
462   disconnect_notify_cb = disconnect_cb;
463   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
464 }
465
466
467 /**
468  * Disconnect from the given neighbour, clean up the record.
469  *
470  * @param n neighbour to disconnect from
471  */
472 static void
473 disconnect_neighbour (struct NeighbourMapEntry *n)
474 {
475   struct MessageQueue *mq;
476
477   if (GNUNET_YES == n->in_disconnect)
478     return;
479   n->in_disconnect = GNUNET_YES;
480   while (NULL != (mq = n->messages_head))
481   {
482     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
483     if (NULL != mq->cont)
484       mq->cont (mq->cont_cls, GNUNET_SYSERR);
485     GNUNET_free (mq);
486   }
487   if (NULL != n->is_active)
488   {
489     n->is_active->n = NULL;
490     n->is_active = NULL;
491   }
492   if (GNUNET_YES == n->is_connected)
493   {
494     n->is_connected = GNUNET_NO;
495     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
496     GNUNET_SCHEDULER_cancel (n->keepalive_task);
497     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;  
498     GNUNET_assert (neighbours_connected > 0);
499     neighbours_connected--;
500     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
501                               GNUNET_NO);
502     disconnect_notify_cb (callback_cls, &n->id);
503   }
504   GNUNET_assert (GNUNET_YES ==
505                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
506                                                        &n->id.hashPubKey, n));
507   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
508   {
509     GNUNET_SCHEDULER_cancel (n->timeout_task);
510     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
511   }
512   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
513   {
514     GNUNET_SCHEDULER_cancel (n->transmission_task);
515     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
516   }
517   if (NULL != n->plugin_name)
518   {
519     GNUNET_free (n->plugin_name);
520     n->plugin_name = NULL;
521   }
522   if (NULL != n->addr)
523   {
524     GNUNET_free (n->addr);
525     n->addr = NULL;
526     n->addrlen = 0;
527   }
528   n->session = NULL;
529   GNUNET_free (n);
530 }
531
532
533 /**
534  * Peer has been idle for too long. Disconnect.
535  *
536  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
537  * @param tc scheduler context
538  */
539 static void
540 neighbour_timeout_task (void *cls,
541                         const struct GNUNET_SCHEDULER_TaskContext *tc)
542 {
543   struct NeighbourMapEntry *n = cls;
544
545   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
546   if (GNUNET_YES == n->is_connected)
547     GNUNET_STATISTICS_update (GST_stats,
548                             gettext_noop ("# peers disconnected due to timeout"), 1,
549                             GNUNET_NO);
550   disconnect_neighbour (n);
551 }
552
553
554 /**
555  * Send another keepalive message.
556  *
557  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
558  * @param tc scheduler context
559  */
560 static void
561 neighbour_keepalive_task (void *cls,
562                           const struct GNUNET_SCHEDULER_TaskContext *tc)
563 {
564   struct NeighbourMapEntry *n = cls;
565   struct GNUNET_MessageHeader m;
566   struct GNUNET_TRANSPORT_PluginFunctions *papi;
567
568   n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
569                                                     &neighbour_keepalive_task,
570                                                     n);
571   GNUNET_assert (GNUNET_YES == n->is_connected);
572   GNUNET_STATISTICS_update (GST_stats,
573                             gettext_noop ("# keepalives sent"), 1,
574                             GNUNET_NO);
575   m.size = htons (sizeof (struct GNUNET_MessageHeader));
576   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
577   papi = GST_plugins_find (n->plugin_name);
578   if (papi != NULL)
579     papi->send (papi->cls, 
580                 &n->id, (const void *) &m,
581                 sizeof (m),
582                 UINT32_MAX /* priority */ ,
583                 GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
584                 GNUNET_YES, NULL, NULL);
585 }
586
587
588 /**
589  * Disconnect from the given neighbour.
590  *
591  * @param cls unused
592  * @param key hash of neighbour's public key (not used)
593  * @param value the 'struct NeighbourMapEntry' of the neighbour
594  */
595 static int
596 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
597 {
598   struct NeighbourMapEntry *n = value;
599
600 #if DEBUG_TRANSPORT
601   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
602               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
603 #endif
604   if (GNUNET_YES == n->is_connected)
605     GNUNET_STATISTICS_update (GST_stats,
606                               gettext_noop ("# peers disconnected due to global disconnect"), 1,
607                               GNUNET_NO);
608   disconnect_neighbour (n);
609   return GNUNET_OK;
610 }
611
612
613 /**
614  * Cleanup the neighbours subsystem.
615  */
616 void
617 GST_neighbours_stop ()
618 {
619   GNUNET_assert (neighbours != NULL);
620
621   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
622                                          NULL);
623   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
624   GNUNET_assert (neighbours_connected == 0);
625   neighbours = NULL;
626   callback_cls = NULL;
627   connect_notify_cb = NULL;
628   disconnect_notify_cb = NULL;
629 }
630
631
632 /**
633  * We tried to send a SESSION_CONNECT message to another peer.  If this
634  * succeeded, we should mark the peer up.  If it failed, we should tell
635  * ATS to not use this address anymore (until it is re-validated).
636  *
637  * @param cls the 'struct NeighbourMapEntry'
638  * @param success GNUNET_OK on success
639  */
640 static void
641 send_connect_continuation (void *cls,
642                            int success)
643 {
644   struct NeighbourMapEntry *n = cls;
645
646   GNUNET_assert (n != NULL);
647   if (GNUNET_YES == n->in_disconnect)
648     return; /* neighbour is going away */
649   if (GNUNET_YES != success)
650   {
651     GNUNET_ATS_address_destroyed (GST_ats,
652                                   &n->id,
653                                   n->plugin_name, 
654                                   n->addr,
655                                   n->addrlen,
656                                   NULL);
657     disconnect_neighbour (n);
658     return;
659   }
660 }
661
662
663 /**
664  * For an existing neighbour record, set the active connection to
665  * the given address.
666  *
667  * @param peer identity of the peer to switch the address for
668  * @param plugin_name name of transport that delivered the PONG
669  * @param address address of the other peer, NULL if other peer
670  *                       connected to us
671  * @param address_len number of bytes in address
672  * @param session session to use (or NULL)
673  * @param ats performance data
674  * @param ats_count number of entries in ats (excluding 0-termination)
675  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
676  *         connection is not up (yet)
677  */
678 int
679 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
680                                   const char *plugin_name, const void *address,
681                                   size_t address_len, struct Session *session,
682                                   const struct GNUNET_ATS_Information
683                                   *ats, uint32_t ats_count)
684 {
685   struct NeighbourMapEntry *n;
686   struct SessionConnectMessage connect_msg;
687   int was_connected;
688
689   GNUNET_assert (neighbours != NULL);
690   n = lookup_neighbour (peer);
691   if (NULL == n)
692   {
693     if (NULL == session)
694       GNUNET_ATS_address_destroyed (GST_ats,
695                                     peer,
696                                     plugin_name, address,
697                                     address_len, NULL);    
698     return GNUNET_NO;
699   }
700   was_connected = n->is_connected;
701   n->is_connected = GNUNET_YES;
702   if (GNUNET_YES != was_connected)
703     n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
704                                                       &neighbour_keepalive_task,
705                                                       n);
706
707 #if DEBUG_TRANSPORT
708   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709               "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
710               GNUNET_i2s (peer), plugin_name,
711               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
712                                                                   address,
713                                                                   address_len),
714               session);
715 #endif
716   GNUNET_free_non_null (n->addr);
717   n->addr = GNUNET_malloc (address_len);
718   memcpy (n->addr, address, address_len);
719   n->addrlen = address_len;
720   n->session = session;
721   GNUNET_free_non_null (n->plugin_name);
722   n->plugin_name = GNUNET_strdup (plugin_name);
723   GNUNET_SCHEDULER_cancel (n->timeout_task);
724   n->timeout_task =
725       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
726                                     &neighbour_timeout_task, n);
727   connect_msg.header.size = htons (sizeof (struct SessionConnectMessage));
728   connect_msg.header.type =
729       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
730   connect_msg.reserved = htonl (0);
731   connect_msg.timestamp =
732       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
733   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
734                        GNUNET_TIME_UNIT_FOREVER_REL, 
735                        &send_connect_continuation, 
736                        n);
737   if (GNUNET_YES == was_connected)
738     return GNUNET_YES;
739   /* First tell clients about connected neighbours...*/
740   neighbours_connected++;
741   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
742                             GNUNET_NO);
743   connect_notify_cb (callback_cls, peer, ats, ats_count);
744   return GNUNET_YES;
745 }
746
747
748 /**
749  * Create an entry in the neighbour map for the given peer
750  * 
751  * @param peer peer to create an entry for
752  * @return new neighbour map entry
753  */
754 static struct NeighbourMapEntry *
755 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
756 {
757   struct NeighbourMapEntry *n;
758
759 #if DEBUG_TRANSPORT
760   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
761               "Unknown peer `%s', creating new neighbour\n",
762               GNUNET_i2s (peer));
763 #endif
764   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
765   n->id = *peer;
766   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
767                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
768                                  MAX_BANDWIDTH_CARRY_S);
769   n->timeout_task =
770     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
771                                   &neighbour_timeout_task, n);
772   GNUNET_assert (GNUNET_OK ==
773                  GNUNET_CONTAINER_multihashmap_put (neighbours,
774                                                     &n->id.hashPubKey, n,
775                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
776   return n;
777 }
778
779
780 /**
781  * Try to create a connection to the given target (eventually).
782  *
783  * @param target peer to try to connect to
784  */
785 void
786 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
787 {
788   struct NeighbourMapEntry *n;
789
790   GNUNET_assert (neighbours != NULL);
791 #if DEBUG_TRANSPORT
792   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
793               GNUNET_i2s (target));
794 #endif
795   GNUNET_assert (0 !=
796                  memcmp (target, &GST_my_identity,
797                          sizeof (struct GNUNET_PeerIdentity)));
798   n = lookup_neighbour (target);
799   if ((NULL != n) && (GNUNET_YES == n->is_connected))
800     return;                     /* already connected */
801   if (n == NULL)
802     n = setup_neighbour (target);
803 #if DEBUG_TRANSPORT
804   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
805               "Asking ATS for suggested address to connect to peer `%s'\n",
806               GNUNET_i2s (&n->id));
807 #endif
808    GNUNET_ATS_suggest_address (GST_ats, &n->id);
809 }
810
811
812 /**
813  * Test if we're connected to the given peer.
814  *
815  * @param target peer to test
816  * @return GNUNET_YES if we are connected, GNUNET_NO if not
817  */
818 int
819 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
820 {
821   struct NeighbourMapEntry *n;
822
823   GNUNET_assert (neighbours != NULL);
824
825   n = lookup_neighbour (target);
826   if ((NULL == n) || (n->is_connected != GNUNET_YES))
827     return GNUNET_NO;           /* not connected */
828   return GNUNET_YES;
829 }
830
831
832 /**
833  * A session was terminated. Take note.
834  *
835  * @param peer identity of the peer where the session died
836  * @param session session that is gone
837  */
838 void
839 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
840                                    struct Session *session)
841 {
842   struct NeighbourMapEntry *n;
843
844   GNUNET_assert (neighbours != NULL);
845
846 #if DEBUG_TRANSPORT
847   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848               "Session %X to peer `%s' ended \n",
849               session, GNUNET_i2s (peer));
850 #endif
851   n = lookup_neighbour (peer);
852   if (NULL == n)
853     return;
854   if (session != n->session)
855     return;                     /* doesn't affect us */
856
857   n->session = NULL;
858   GNUNET_free (n->addr);
859   n->addr = NULL;
860   n->addrlen = 0;
861
862
863   if (GNUNET_YES != n->is_connected)
864     return;                     /* not connected anymore anyway, shouldn't matter */
865   /* fast disconnect unless ATS suggests a new address */
866   GNUNET_SCHEDULER_cancel (n->timeout_task);
867   n->timeout_task =
868       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
869                                     &neighbour_timeout_task, n);
870   /* try QUICKLY to re-establish a connection, reduce timeout! */
871   GNUNET_ATS_suggest_address (GST_ats, peer);
872 }
873
874
875 /**
876  * Transmit a message to the given target using the active connection.
877  *
878  * @param target destination
879  * @param msg message to send
880  * @param msg_size number of bytes in msg
881  * @param timeout when to fail with timeout
882  * @param cont function to call when done
883  * @param cont_cls closure for 'cont'
884  */
885 void
886 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
887                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
888                      GST_NeighbourSendContinuation cont, void *cont_cls)
889 {
890   struct NeighbourMapEntry *n;
891   struct MessageQueue *mq;
892
893   GNUNET_assert (neighbours != NULL);
894
895   n = lookup_neighbour (target);
896   if ((n == NULL) || (GNUNET_YES != n->is_connected))
897   {
898     GNUNET_STATISTICS_update (GST_stats,
899                               gettext_noop
900                               ("# messages not sent (no such peer or not connected)"),
901                               1, GNUNET_NO);
902 #if DEBUG_TRANSPORT
903     if (n == NULL)
904       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
905                   "Could not send message to peer `%s': unknown neighbor",
906                   GNUNET_i2s (target));
907     else if (GNUNET_YES != n->is_connected)
908       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909                   "Could not send message to peer `%s': not connected\n",
910                   GNUNET_i2s (target));
911 #endif
912     if (NULL != cont)
913       cont (cont_cls, GNUNET_SYSERR);
914     return;
915   }
916
917   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0))
918   {
919     GNUNET_STATISTICS_update (GST_stats,
920                               gettext_noop
921                               ("# messages not sent (no such peer or not connected)"),
922                               1, GNUNET_NO);
923 #if DEBUG_TRANSPORT
924       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
925                   "Could not send message to peer `%s': no address available\n",
926                   GNUNET_i2s (target));
927 #endif
928
929     if (NULL != cont)
930       cont (cont_cls, GNUNET_SYSERR);
931     return;
932   }
933   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
934   GNUNET_STATISTICS_update (GST_stats,
935                             gettext_noop
936                             ("# bytes in message queue for other peers"),
937                             msg_size, GNUNET_NO);
938   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
939   mq->cont = cont;
940   mq->cont_cls = cont_cls;
941   /* FIXME: this memcpy can be up to 7% of our total runtime! */
942   memcpy (&mq[1], msg, msg_size);
943   mq->message_buf = (const char *) &mq[1];
944   mq->message_buf_size = msg_size;
945   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
946   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
947   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
948       (NULL == n->is_active))
949     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
950 }
951
952
953 /**
954  * We have received a message from the given sender.  How long should
955  * we delay before receiving more?  (Also used to keep the peer marked
956  * as live).
957  *
958  * @param sender sender of the message
959  * @param size size of the message
960  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
961  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
962  *                   GNUNET_SYSERR if the connection is not fully up yet
963  * @return how long to wait before reading more from this sender
964  */
965 struct GNUNET_TIME_Relative
966 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
967                                         *sender, ssize_t size, int *do_forward)
968 {
969   struct NeighbourMapEntry *n;
970   struct GNUNET_TIME_Relative ret;
971
972   GNUNET_assert (neighbours != NULL);
973
974   n = lookup_neighbour (sender);
975   if (n == NULL)
976   {
977     GST_neighbours_try_connect (sender);
978     n = lookup_neighbour (sender);
979     if (NULL == n)
980     {
981       GNUNET_STATISTICS_update (GST_stats,
982                                 gettext_noop
983                                 ("# messages discarded due to lack of neighbour record"),
984                                 1, GNUNET_NO);
985       *do_forward = GNUNET_NO;
986       return GNUNET_TIME_UNIT_ZERO;
987     }
988   }
989   if (GNUNET_YES != n->is_connected)
990   {
991     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
992                 _("Plugin gave us %d bytes of data but somehow the session is not marked as UP yet!\n"),
993                 (int) size);
994     *do_forward = GNUNET_SYSERR;
995     return GNUNET_TIME_UNIT_ZERO;
996   }
997   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
998   {
999     n->quota_violation_count++;
1000 #if DEBUG_TRANSPORT
1001     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1003                 n->in_tracker.available_bytes_per_s__,
1004                 n->quota_violation_count);
1005 #endif
1006     /* Discount 32k per violation */
1007     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1008   }
1009   else
1010   {
1011     if (n->quota_violation_count > 0)
1012     {
1013       /* try to add 32k back */
1014       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1015       n->quota_violation_count--;
1016     }
1017   }
1018   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1019   {
1020     GNUNET_STATISTICS_update (GST_stats,
1021                               gettext_noop
1022                               ("# bandwidth quota violations by other peers"),
1023                               1, GNUNET_NO);
1024     *do_forward = GNUNET_NO;
1025     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1026   }
1027   *do_forward = GNUNET_YES;
1028   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1029   if (ret.rel_value > 0)
1030   {
1031 #if DEBUG_TRANSPORT
1032     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1034                 (unsigned long long) n->in_tracker.
1035                 consumption_since_last_update__,
1036                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1037                 (unsigned long long) ret.rel_value);
1038 #endif
1039     GNUNET_STATISTICS_update (GST_stats,
1040                               gettext_noop ("# ms throttling suggested"),
1041                               (int64_t) ret.rel_value, GNUNET_NO);
1042   }
1043   return ret;
1044 }
1045
1046
1047 /**
1048  * Keep the connection to the given neighbour alive longer,
1049  * we received a KEEPALIVE (or equivalent).
1050  *
1051  * @param neighbour neighbour to keep alive
1052  */
1053 void
1054 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1055 {
1056   struct NeighbourMapEntry *n;
1057
1058   GNUNET_assert (neighbours != NULL);
1059
1060   n = lookup_neighbour (neighbour);
1061   if (NULL == n)
1062   {
1063     GNUNET_STATISTICS_update (GST_stats,
1064                               gettext_noop
1065                               ("# KEEPALIVE messages discarded (not connected)"),
1066                               1, GNUNET_NO);
1067     return;
1068   }
1069   GNUNET_SCHEDULER_cancel (n->timeout_task);
1070   n->timeout_task =
1071       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1072                                     &neighbour_timeout_task, n);
1073 }
1074
1075
1076 /**
1077  * Change the incoming quota for the given peer.
1078  *
1079  * @param neighbour identity of peer to change qutoa for
1080  * @param quota new quota
1081  */
1082 void
1083 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1084                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1085 {
1086   struct NeighbourMapEntry *n;
1087
1088   GNUNET_assert (neighbours != NULL);
1089
1090   n = lookup_neighbour (neighbour);
1091   if (n == NULL)
1092   {
1093     GNUNET_STATISTICS_update (GST_stats,
1094                               gettext_noop
1095                               ("# SET QUOTA messages ignored (no such peer)"),
1096                               1, GNUNET_NO);
1097     return;
1098   }
1099   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1100   if (0 != ntohl (quota.value__))
1101     return;
1102 #if DEBUG_TRANSPORT
1103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1104               GNUNET_i2s (&n->id), "SET_QUOTA");
1105 #endif
1106   if (GNUNET_YES == n->is_connected)
1107     GNUNET_STATISTICS_update (GST_stats,
1108                               gettext_noop ("# disconnects due to quota of 0"), 1,
1109                               GNUNET_NO);
1110   disconnect_neighbour (n);
1111 }
1112
1113
1114 /**
1115  * Closure for the neighbours_iterate function.
1116  */
1117 struct IteratorContext
1118 {
1119   /**
1120    * Function to call on each connected neighbour.
1121    */
1122   GST_NeighbourIterator cb;
1123
1124   /**
1125    * Closure for 'cb'.
1126    */
1127   void *cb_cls;
1128 };
1129
1130
1131 /**
1132  * Call the callback from the closure for each connected neighbour.
1133  *
1134  * @param cls the 'struct IteratorContext'
1135  * @param key the hash of the public key of the neighbour
1136  * @param value the 'struct NeighbourMapEntry'
1137  * @return GNUNET_OK (continue to iterate)
1138  */
1139 static int
1140 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1141 {
1142   struct IteratorContext *ic = cls;
1143   struct NeighbourMapEntry *n = value;
1144
1145   if (GNUNET_YES != n->is_connected)
1146     return GNUNET_OK;
1147
1148   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->plugin_name, n->addr, n->addrlen);
1149   return GNUNET_OK;
1150 }
1151
1152
1153 /**
1154  * Iterate over all connected neighbours.
1155  *
1156  * @param cb function to call
1157  * @param cb_cls closure for cb
1158  */
1159 void
1160 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1161 {
1162   struct IteratorContext ic;
1163
1164   GNUNET_assert (neighbours != NULL);
1165
1166   ic.cb = cb;
1167   ic.cb_cls = cb_cls;
1168   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1169 }
1170
1171
1172 /**
1173  * If we have an active connection to the given target, it must be shutdown.
1174  *
1175  * @param target peer to disconnect from
1176  */
1177 void
1178 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1179 {
1180   struct NeighbourMapEntry *n;
1181   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1182   struct SessionDisconnectMessage disconnect_msg;
1183
1184   GNUNET_assert (neighbours != NULL);
1185
1186   n = lookup_neighbour (target);
1187   if (NULL == n)
1188     return;                     /* not active */
1189   if (GNUNET_YES == n->is_connected)
1190   {
1191     /* we're actually connected, send DISCONNECT message */
1192     disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
1193     disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1194     disconnect_msg.reserved = htonl (0);
1195     disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1196                                          sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1197                                          sizeof (struct GNUNET_TIME_AbsoluteNBO) );
1198     disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
1199     disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1200     disconnect_msg.public_key = GST_my_public_key;
1201     GNUNET_assert (GNUNET_OK ==
1202                    GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
1203                                            &disconnect_msg.purpose,
1204                                            &disconnect_msg.signature));
1205     papi = GST_plugins_find (n->plugin_name);
1206     if (papi != NULL)
1207       papi->send (papi->cls, target, (const void *) &disconnect_msg,
1208                   sizeof (disconnect_msg),
1209                   UINT32_MAX /* priority */ ,
1210                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
1211                   GNUNET_YES, NULL, NULL);
1212     GNUNET_STATISTICS_update (GST_stats,
1213                               gettext_noop ("# peers disconnected due to external request"), 1,
1214                               GNUNET_NO);
1215     n = lookup_neighbour (target);
1216     if (NULL == n)
1217       return;                     /* gone already */
1218   }
1219   disconnect_neighbour (n);
1220 }
1221
1222
1223 /**
1224  * We received a disconnect message from the given peer,
1225  * validate and process.
1226  * 
1227  * @param peer sender of the message
1228  * @param msg the disconnect message
1229  */
1230 void
1231 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer,
1232                                           const struct GNUNET_MessageHeader *msg)
1233 {
1234   struct NeighbourMapEntry *n;
1235   const struct SessionDisconnectMessage *sdm;
1236   GNUNET_HashCode hc;
1237
1238   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
1239   {
1240     // GNUNET_break_op (0);
1241     GNUNET_STATISTICS_update (GST_stats,
1242                               gettext_noop ("# disconnect messages ignored (old format)"), 1,
1243                               GNUNET_NO);
1244     return;
1245   }
1246   sdm = (const struct SessionDisconnectMessage* ) msg;
1247   n = lookup_neighbour (peer);
1248   if (NULL == n)
1249     return;                     /* gone already */
1250   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
1251       n->connect_ts.abs_value)
1252   {
1253     GNUNET_STATISTICS_update (GST_stats,
1254                               gettext_noop ("# disconnect messages ignored (timestamp)"), 1,
1255                               GNUNET_NO);
1256     return;
1257   }
1258   GNUNET_CRYPTO_hash (&sdm->public_key,
1259                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1260                       &hc);
1261   if (0 != memcmp (peer,
1262                    &hc,
1263                    sizeof (struct GNUNET_PeerIdentity)))
1264   {
1265     GNUNET_break_op (0);
1266     return;
1267   }
1268   if (ntohl (sdm->purpose.size) != 
1269       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1270       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1271       sizeof (struct GNUNET_TIME_AbsoluteNBO))
1272   {
1273     GNUNET_break_op (0);
1274     return;
1275   }
1276   if (GNUNET_OK !=
1277       GNUNET_CRYPTO_rsa_verify (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT,
1278                                 &sdm->purpose,
1279                                 &sdm->signature,
1280                                 &sdm->public_key))
1281   {
1282     GNUNET_break_op (0);
1283     return;
1284   }
1285   GST_neighbours_force_disconnect (peer);
1286 }
1287
1288
1289 /**
1290  * We received a 'SESSION_CONNECT' message from the other peer.
1291  * Consider switching to it.
1292  *
1293  * @param message possibly a 'struct SessionConnectMessage' (check format)
1294  * @param peer identity of the peer to switch the address for
1295  * @param plugin_name name of transport that delivered the PONG
1296  * @param address address of the other peer, NULL if other peer
1297  *                       connected to us
1298  * @param address_len number of bytes in address
1299  * @param session session to use (or NULL)
1300  * @param ats performance data
1301  * @param ats_count number of entries in ats (excluding 0-termination)
1302   */
1303 void
1304 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
1305                                const struct GNUNET_PeerIdentity *peer,
1306                                const char *plugin_name,
1307                                const char *sender_address, uint16_t sender_address_len,
1308                                struct Session *session,
1309                                const struct GNUNET_ATS_Information *ats,
1310                                uint32_t ats_count)
1311 {
1312   const struct SessionConnectMessage *scm;
1313   struct GNUNET_TIME_Absolute ts;
1314   struct NeighbourMapEntry *n;
1315
1316   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
1317   {
1318     GNUNET_break_op (0);
1319     return;
1320   }
1321   scm = (const struct SessionConnectMessage *) message;
1322   GNUNET_break_op (ntohl (scm->reserved) == 0);
1323   ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
1324   n = lookup_neighbour (peer);
1325   if (NULL == n) 
1326     n = setup_neighbour (peer);
1327   if (ts.abs_value > n->connect_ts.abs_value)
1328   {
1329     if (NULL != session)
1330       GNUNET_log_from (GNUNET_ERROR_TYPE_INFO | GNUNET_ERROR_TYPE_BULK,
1331                        "transport-ats",
1332                        "Giving ATS session %p of plugin %s for peer %s\n",
1333                        session,
1334                        plugin_name,
1335                        GNUNET_i2s (peer));
1336     GNUNET_ATS_address_update (GST_ats,
1337                                peer,
1338                                plugin_name, sender_address, sender_address_len,
1339                                session, ats, ats_count);
1340     n->connect_ts = ts;
1341   }
1342 }
1343
1344
1345 /* end of file gnunet-service-transport_neighbours.c */