fixing #2014
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
61
62 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
63
64 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
65
66
67 /**
68  * Entry in neighbours.
69  */
70 struct NeighbourMapEntry;
71
72 /**
73  * Message a peer sends to another to indicate its
74  * preference for communicating via a particular
75  * session (and the desire to establish a real
76  * connection).
77  */
78 struct SessionConnectMessage
79 {
80   /**
81    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
82    */
83   struct GNUNET_MessageHeader header;
84
85   /**
86    * Always zero.
87    */
88   uint32_t reserved GNUNET_PACKED;
89
90   /**
91    * Absolute time at the sender.  Only the most recent connect
92    * message implies which session is preferred by the sender.
93    */
94   struct GNUNET_TIME_AbsoluteNBO timestamp;
95
96 };
97
98
99 struct SessionDisconnectMessage
100 {
101   /**
102    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
103    */
104   struct GNUNET_MessageHeader header;
105
106   /**
107    * Always zero.
108    */
109   uint32_t reserved GNUNET_PACKED;
110
111   /**
112    * Purpose of the signature.  Extends over the timestamp.
113    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
114    */
115   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
116
117   /**
118    * Absolute time at the sender.  Only the most recent connect
119    * message implies which session is preferred by the sender.
120    */
121   struct GNUNET_TIME_AbsoluteNBO timestamp;
122
123   /**
124    * Public key of the sender.
125    */
126   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
127
128   /**
129    * Signature of the peer that sends us the disconnect.  Only
130    * valid if the timestamp is AFTER the timestamp from the
131    * corresponding 'CONNECT' message.
132    */
133   struct GNUNET_CRYPTO_RsaSignature signature;
134
135 };
136
137
138 /**
139  * For each neighbour we keep a list of messages
140  * that we still want to transmit to the neighbour.
141  */
142 struct MessageQueue
143 {
144
145   /**
146    * This is a doubly linked list.
147    */
148   struct MessageQueue *next;
149
150   /**
151    * This is a doubly linked list.
152    */
153   struct MessageQueue *prev;
154
155   /**
156    * Once this message is actively being transmitted, which
157    * neighbour is it associated with?
158    */
159   struct NeighbourMapEntry *n;
160
161   /**
162    * Function to call once we're done.
163    */
164   GST_NeighbourSendContinuation cont;
165
166   /**
167    * Closure for 'cont'
168    */
169   void *cont_cls;
170
171   /**
172    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
173    * stuck together in memory.  Allocated at the end of this struct.
174    */
175   const char *message_buf;
176
177   /**
178    * Size of the message buf
179    */
180   size_t message_buf_size;
181
182   /**
183    * At what time should we fail?
184    */
185   struct GNUNET_TIME_Absolute timeout;
186
187 };
188
189
190 enum State
191 {
192   /**
193    * fresh peer or completely disconnected
194    */
195   S_NOT_CONNECTED,
196
197   /**
198    * sent CONNECT message to other peer, waiting for CONNECT_ACK
199    */
200   S_CONNECT_SENT,
201
202   /**
203    * received CONNECT message to other peer, sending CONNECT_ACK
204    */
205   S_CONNECT_RECV,
206
207   /**
208    * received ACK or payload
209    */
210   S_CONNECTED,
211
212   /**
213    * connection ended, fast reconnect
214    */
215   S_FAST_RECONNECT,
216
217   /**
218    * Disconnect in progress
219    */
220   S_DISCONNECT
221 };
222
223 enum Address_State
224 {
225   USED,
226   UNUSED,
227   FRESH,
228 };
229
230
231 /**
232  * Entry in neighbours.
233  */
234 struct NeighbourMapEntry
235 {
236
237   /**
238    * Head of list of messages we would like to send to this peer;
239    * must contain at most one message per client.
240    */
241   struct MessageQueue *messages_head;
242
243   /**
244    * Tail of list of messages we would like to send to this peer; must
245    * contain at most one message per client.
246    */
247   struct MessageQueue *messages_tail;
248
249   /**
250    * Are we currently trying to send a message? If so, which one?
251    */
252   struct MessageQueue *is_active;
253
254   /**
255    * Active session for communicating with the peer.
256    */
257   struct Session *session;
258
259   /**
260    * Address we currently use.
261    */
262   struct GNUNET_HELLO_Address *address;
263
264   /**
265    * Identity of this neighbour.
266    */
267   struct GNUNET_PeerIdentity id;
268
269   /**
270    * ID of task scheduled to run when this peer is about to
271    * time out (will free resources associated with the peer).
272    */
273   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
274
275   /**
276    * ID of task scheduled to send keepalives.
277    */
278   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
279
280   /**
281    * ID of task scheduled to run when we should try transmitting
282    * the head of the message queue.
283    */
284   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
285
286   /**
287    * Tracker for inbound bandwidth.
288    */
289   struct GNUNET_BANDWIDTH_Tracker in_tracker;
290
291   /**
292    * Inbound bandwidth from ATS, activated when connection is up
293    */
294   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
295
296   /**
297    * Inbound bandwidth from ATS, activated when connection is up
298    */
299   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
300
301   /**
302    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
303    */
304   struct GNUNET_TIME_Absolute connect_ts;
305
306   /**
307    * When did we sent the last keep-alive message?
308    */
309   struct GNUNET_TIME_Absolute keep_alive_sent;
310
311   /**
312    * Latest calculated latency value
313    */
314   struct GNUNET_TIME_Relative latency;
315
316   /**
317    * Timeout for ATS
318    * We asked ATS for a new address for this peer
319    */
320   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
321
322   /**
323    * Task the resets the peer state after due to an pending
324    * unsuccessful connection setup
325    */
326   GNUNET_SCHEDULER_TaskIdentifier state_reset;
327
328
329   /**
330    * How often has the other peer (recently) violated the inbound
331    * traffic limit?  Incremented by 10 per violation, decremented by 1
332    * per non-violation (for each time interval).
333    */
334   unsigned int quota_violation_count;
335
336
337   /**
338    * The current state of the peer
339    * Element of enum State
340    */
341   int state;
342
343   /**
344    * Did we sent an KEEP_ALIVE message and are we expecting a response?
345    */
346   int expect_latency_response;
347   int address_state;
348 };
349
350
351 /**
352  * All known neighbours and their HELLOs.
353  */
354 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
355
356 /**
357  * Closure for connect_notify_cb and disconnect_notify_cb
358  */
359 static void *callback_cls;
360
361 /**
362  * Function to call when we connected to a neighbour.
363  */
364 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
365
366 /**
367  * Function to call when we disconnected from a neighbour.
368  */
369 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
370
371 /**
372  * counter for connected neighbours
373  */
374 static int neighbours_connected;
375
376 /**
377  * Lookup a neighbour entry in the neighbours hash map.
378  *
379  * @param pid identity of the peer to look up
380  * @return the entry, NULL if there is no existing record
381  */
382 static struct NeighbourMapEntry *
383 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
384 {
385   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
386 }
387
388 #define change_state(n, state, ...) change (n, state, __LINE__)
389
390 static int
391 is_connecting (struct NeighbourMapEntry *n)
392 {
393   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
394     return GNUNET_YES;
395   return GNUNET_NO;
396 }
397
398 static int
399 is_connected (struct NeighbourMapEntry *n)
400 {
401   if (n->state == S_CONNECTED)
402     return GNUNET_YES;
403   return GNUNET_NO;
404 }
405
406 static int
407 is_disconnecting (struct NeighbourMapEntry *n)
408 {
409   if (n->state == S_DISCONNECT)
410     return GNUNET_YES;
411   return GNUNET_NO;
412 }
413
414 static const char *
415 print_state (int state)
416 {
417   switch (state)
418   {
419   case S_CONNECTED:
420     return "S_CONNECTED";
421     break;
422   case S_CONNECT_RECV:
423     return "S_CONNECT_RECV";
424     break;
425   case S_CONNECT_SENT:
426     return "S_CONNECT_SENT";
427     break;
428   case S_DISCONNECT:
429     return "S_DISCONNECT";
430     break;
431   case S_NOT_CONNECTED:
432     return "S_NOT_CONNECTED";
433     break;
434   case S_FAST_RECONNECT:
435     return "S_FAST_RECONNECT";
436     break;
437   default:
438     GNUNET_break (0);
439     break;
440   }
441   return NULL;
442 }
443
444 static int
445 change (struct NeighbourMapEntry *n, int state, int line);
446
447 static void
448 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
449
450
451 static void
452 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
453 {
454   struct NeighbourMapEntry *n = cls;
455
456   if (n == NULL)
457     return;
458
459   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
460   if (n->state == S_CONNECTED)
461     return;
462
463 #if DEBUG_TRANSPORT
464   GNUNET_STATISTICS_update (GST_stats,
465                             gettext_noop
466                             ("# failed connection attempts due to timeout"), 1,
467                             GNUNET_NO);
468 #endif
469
470   /* resetting state */
471   n->state = S_NOT_CONNECTED;
472
473   /* destroying address */
474   if (n->address != NULL)
475   {
476     GNUNET_assert (strlen (n->address->transport_name) > 0);
477     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
478   }
479
480   /* request new address */
481   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
482     GNUNET_SCHEDULER_cancel (n->ats_suggest);
483   n->ats_suggest =
484       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
485                                     n);
486   GNUNET_ATS_suggest_address (GST_ats, &n->id);
487 }
488
489 static int
490 change (struct NeighbourMapEntry *n, int state, int line)
491 {
492   /* allowed transitions */
493   int allowed = GNUNET_NO;
494
495   switch (n->state)
496   {
497   case S_NOT_CONNECTED:
498     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
499         (state == S_DISCONNECT))
500       allowed = GNUNET_YES;
501     break;
502   case S_CONNECT_RECV:
503     allowed = GNUNET_YES;
504     break;
505   case S_CONNECT_SENT:
506     allowed = GNUNET_YES;
507     break;
508   case S_CONNECTED:
509     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
510       allowed = GNUNET_YES;
511     break;
512   case S_DISCONNECT:
513     break;
514   case S_FAST_RECONNECT:
515     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
516       allowed = GNUNET_YES;
517     break;
518   default:
519     GNUNET_break (0);
520     break;
521   }
522   if (allowed == GNUNET_NO)
523   {
524     char *old = GNUNET_strdup (print_state (n->state));
525     char *new = GNUNET_strdup (print_state (state));
526
527     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
528                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
529                 new, line);
530     GNUNET_break (0);
531     GNUNET_free (old);
532     GNUNET_free (new);
533     return GNUNET_SYSERR;
534   }
535 #if DEBUG_TRANSPORT
536   {
537     char *old = GNUNET_strdup (print_state (n->state));
538     char *new = GNUNET_strdup (print_state (state));
539
540     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
541                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
542                 GNUNET_i2s (&n->id), n, old, new, line);
543     GNUNET_free (old);
544     GNUNET_free (new);
545   }
546 #endif
547   n->state = state;
548
549   switch (n->state)
550   {
551   case S_FAST_RECONNECT:
552   case S_CONNECT_RECV:
553   case S_CONNECT_SENT:
554     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
555       GNUNET_SCHEDULER_cancel (n->state_reset);
556     n->state_reset =
557         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
558     break;
559   case S_CONNECTED:
560   case S_NOT_CONNECTED:
561   case S_DISCONNECT:
562     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
563     {
564 #if DEBUG_TRANSPORT
565       char *old = GNUNET_strdup (print_state (n->state));
566       char *new = GNUNET_strdup (print_state (state));
567
568       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
569                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
570                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
571       GNUNET_free (old);
572       GNUNET_free (new);
573 #endif
574       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
575       GNUNET_SCHEDULER_cancel (n->state_reset);
576       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
577     }
578     break;
579
580   default:
581     GNUNET_assert (0);
582   }
583
584
585
586   return GNUNET_OK;
587 }
588
589 static ssize_t
590 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
591                   size_t msgbuf_size, uint32_t priority,
592                   struct GNUNET_TIME_Relative timeout, struct Session *session,
593                   const struct GNUNET_HELLO_Address *address, int force_address,
594                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
595 {
596   struct GNUNET_TRANSPORT_PluginFunctions *papi;
597   size_t ret = GNUNET_SYSERR;
598
599   /* FIXME : ats returns an address with all values 0 */
600   if (address == NULL)
601   {
602     if (cont != NULL)
603       cont (cont_cls, target, GNUNET_SYSERR);
604     return GNUNET_SYSERR;
605   }
606
607   if ((session == NULL) && (address->address_length == 0))
608   {
609     if (cont != NULL)
610       cont (cont_cls, target, GNUNET_SYSERR);
611     return GNUNET_SYSERR;
612   }
613
614   papi = GST_plugins_find (address->transport_name);
615   if (papi == NULL)
616   {
617     if (cont != NULL)
618       cont (cont_cls, target, GNUNET_SYSERR);
619     return GNUNET_SYSERR;
620   }
621
622   ret =
623       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
624                   address->address, address->address_length, GNUNET_YES, cont,
625                   cont_cls);
626
627   if (ret == -1)
628   {
629     if (cont != NULL)
630       cont (cont_cls, target, GNUNET_SYSERR);
631   }
632   return ret;
633 }
634
635
636 /**
637  * Task invoked to start a transmission to another peer.
638  *
639  * @param cls the 'struct NeighbourMapEntry'
640  * @param tc scheduler context
641  */
642 static void
643 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
644
645
646 /**
647  * We're done with our transmission attempt, continue processing.
648  *
649  * @param cls the 'struct MessageQueue' of the message
650  * @param receiver intended receiver
651  * @param success whether it worked or not
652  */
653 static void
654 transmit_send_continuation (void *cls,
655                             const struct GNUNET_PeerIdentity *receiver,
656                             int success)
657 {
658   struct MessageQueue *mq = cls;
659   struct NeighbourMapEntry *n;
660   struct NeighbourMapEntry *tmp;
661
662   tmp = lookup_neighbour (receiver);
663   n = mq->n;
664   if ((NULL != n) && (tmp != NULL) && (tmp == n))
665   {
666     GNUNET_assert (n->is_active == mq);
667     n->is_active = NULL;
668     if (success == GNUNET_YES)
669     {
670       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
671       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
672     }
673   }
674 #if DEBUG_TRANSPORT
675   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
676               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
677               (success == GNUNET_OK) ? "successful" : "FAILED");
678 #endif
679   if (NULL != mq->cont)
680     mq->cont (mq->cont_cls, success);
681   GNUNET_free (mq);
682 }
683
684
685 /**
686  * Check the ready list for the given neighbour and if a plugin is
687  * ready for transmission (and if we have a message), do so!
688  *
689  * @param n target peer for which to transmit
690  */
691 static void
692 try_transmission_to_peer (struct NeighbourMapEntry *n)
693 {
694   struct MessageQueue *mq;
695   struct GNUNET_TIME_Relative timeout;
696   ssize_t ret;
697
698   if (n->is_active != NULL)
699   {
700     GNUNET_break (0);
701     return;                     /* transmission already pending */
702   }
703   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
704   {
705     GNUNET_break (0);
706     return;                     /* currently waiting for bandwidth */
707   }
708   while (NULL != (mq = n->messages_head))
709   {
710     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
711     if (timeout.rel_value > 0)
712       break;
713     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
714     n->is_active = mq;
715     mq->n = n;
716     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
717   }
718   if (NULL == mq)
719     return;                     /* no more messages */
720
721   if (n->address == NULL)
722   {
723 #if DEBUG_TRANSPORT
724     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
725                 GNUNET_i2s (&n->id));
726 #endif
727     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
728     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
729     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
730     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
731     return;
732   }
733
734   if (GST_plugins_find (n->address->transport_name) == NULL)
735   {
736     GNUNET_break (0);
737     return;
738   }
739   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
740   n->is_active = mq;
741   mq->n = n;
742
743   if ((n->address->address_length == 0) && (n->session == NULL))
744   {
745     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
746                 GNUNET_i2s (&n->id));
747     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
748     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
749     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
750     return;
751   }
752
753   ret =
754       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
755                         timeout, n->session, n->address, GNUNET_YES,
756                         &transmit_send_continuation, mq);
757   if (ret == -1)
758   {
759     /* failure, but 'send' would not call continuation in this case,
760      * so we need to do it here! */
761     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
762   }
763
764 }
765
766
767 /**
768  * Task invoked to start a transmission to another peer.
769  *
770  * @param cls the 'struct NeighbourMapEntry'
771  * @param tc scheduler context
772  */
773 static void
774 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
775 {
776   struct NeighbourMapEntry *n = cls;
777
778   GNUNET_assert (NULL != lookup_neighbour (&n->id));
779   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
780   try_transmission_to_peer (n);
781 }
782
783
784 /**
785  * Initialize the neighbours subsystem.
786  *
787  * @param cls closure for callbacks
788  * @param connect_cb function to call if we connect to a peer
789  * @param disconnect_cb function to call if we disconnect from a peer
790  */
791 void
792 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
793                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
794 {
795   callback_cls = cls;
796   connect_notify_cb = connect_cb;
797   disconnect_notify_cb = disconnect_cb;
798   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
799 }
800
801
802 static void
803 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
804                       int result)
805 {
806 #if DEBUG_TRANSPORT
807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808               "Sending DISCONNECT message to peer `%4s': %i\n",
809               GNUNET_i2s (target), result);
810 #endif
811 }
812
813
814 static int
815 send_disconnect (const struct GNUNET_PeerIdentity *target,
816                  const struct GNUNET_HELLO_Address *address,
817                  struct Session *session)
818 {
819   size_t ret;
820   struct SessionDisconnectMessage disconnect_msg;
821
822 #if DEBUG_TRANSPORT
823   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
824               "Sending DISCONNECT message to peer `%4s'\n",
825               GNUNET_i2s (target));
826 #endif
827
828   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
829   disconnect_msg.header.type =
830       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
831   disconnect_msg.reserved = htonl (0);
832   disconnect_msg.purpose.size =
833       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
834              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
835              sizeof (struct GNUNET_TIME_AbsoluteNBO));
836   disconnect_msg.purpose.purpose =
837       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
838   disconnect_msg.timestamp =
839       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
840   disconnect_msg.public_key = GST_my_public_key;
841   GNUNET_assert (GNUNET_OK ==
842                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
843                                          &disconnect_msg.purpose,
844                                          &disconnect_msg.signature));
845
846   ret =
847       send_with_plugin (target, (const char *) &disconnect_msg,
848                         sizeof (disconnect_msg), UINT32_MAX,
849                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
850                         GNUNET_YES, &send_disconnect_cont, NULL);
851
852   if (ret == GNUNET_SYSERR)
853     return GNUNET_SYSERR;
854
855   GNUNET_STATISTICS_update (GST_stats,
856                             gettext_noop
857                             ("# peers disconnected due to external request"), 1,
858                             GNUNET_NO);
859   return GNUNET_OK;
860 }
861
862
863 /**
864  * Disconnect from the given neighbour, clean up the record.
865  *
866  * @param n neighbour to disconnect from
867  */
868 static void
869 disconnect_neighbour (struct NeighbourMapEntry *n)
870 {
871   struct MessageQueue *mq;
872   int previous_state;
873
874   previous_state = n->state;
875
876   if (is_disconnecting (n))
877     return;
878
879   /* send DISCONNECT MESSAGE */
880   if (previous_state == S_CONNECTED)
881   {
882     if (GNUNET_OK == send_disconnect (&n->id, n->address, n->session))
883       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
884                   GNUNET_i2s (&n->id));
885     else
886       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
887                   "Could not send DISCONNECT_MSG to `%s'\n",
888                   GNUNET_i2s (&n->id));
889   }
890
891   change_state (n, S_DISCONNECT);
892
893   if (previous_state == S_CONNECTED)
894   {
895     GNUNET_assert (NULL != n->address);
896     if (n->address_state == USED)
897     {
898       GST_validation_set_address_use (n->address, n->session,
899                                       GNUNET_NO);
900
901       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
902       n->address_state = UNUSED;
903     }
904   }
905
906   if (n->address != NULL)
907   {
908     struct GNUNET_TRANSPORT_PluginFunctions *papi;
909
910     papi = GST_plugins_find (n->address->transport_name);
911     if (papi != NULL)
912       papi->disconnect (papi->cls, &n->id);
913   }
914   while (NULL != (mq = n->messages_head))
915   {
916     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
917     if (NULL != mq->cont)
918       mq->cont (mq->cont_cls, GNUNET_SYSERR);
919     GNUNET_free (mq);
920   }
921   if (NULL != n->is_active)
922   {
923     n->is_active->n = NULL;
924     n->is_active = NULL;
925   }
926
927   switch (previous_state)
928   {
929   case S_CONNECTED:
930     GNUNET_assert (neighbours_connected > 0);
931     neighbours_connected--;
932     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
933     GNUNET_SCHEDULER_cancel (n->keepalive_task);
934     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
935     n->expect_latency_response = GNUNET_NO;
936     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
937                               GNUNET_NO);
938     disconnect_notify_cb (callback_cls, &n->id);
939     break;
940   case S_FAST_RECONNECT:
941     GNUNET_STATISTICS_update (GST_stats,
942                               gettext_noop ("# fast reconnects failed"), 1,
943                               GNUNET_NO);
944     disconnect_notify_cb (callback_cls, &n->id);
945     break;
946   default:
947     break;
948   }
949
950   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
951
952   GNUNET_assert (GNUNET_YES ==
953                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
954                                                        &n->id.hashPubKey, n));
955   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
956   {
957     GNUNET_SCHEDULER_cancel (n->ats_suggest);
958     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
959   }
960   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
961   {
962     GNUNET_SCHEDULER_cancel (n->timeout_task);
963     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
964   }
965   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
966   {
967     GNUNET_SCHEDULER_cancel (n->transmission_task);
968     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
969   }
970   if (NULL != n->address)
971   {
972     GNUNET_HELLO_address_free (n->address);
973     n->address = NULL;
974   }
975   n->session = NULL;
976   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
977               GNUNET_i2s (&n->id), n);
978   GNUNET_free (n);
979 }
980
981
982 /**
983  * Peer has been idle for too long. Disconnect.
984  *
985  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
986  * @param tc scheduler context
987  */
988 static void
989 neighbour_timeout_task (void *cls,
990                         const struct GNUNET_SCHEDULER_TaskContext *tc)
991 {
992   struct NeighbourMapEntry *n = cls;
993
994   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
995
996   GNUNET_STATISTICS_update (GST_stats,
997                             gettext_noop
998                             ("# peers disconnected due to timeout"), 1,
999                             GNUNET_NO);
1000   disconnect_neighbour (n);
1001 }
1002
1003
1004 /**
1005  * Send another keepalive message.
1006  *
1007  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1008  * @param tc scheduler context
1009  */
1010 static void
1011 neighbour_keepalive_task (void *cls,
1012                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1013 {
1014   struct NeighbourMapEntry *n = cls;
1015   struct GNUNET_MessageHeader m;
1016   int ret;
1017
1018   GNUNET_assert (S_CONNECTED == n->state);
1019   n->keepalive_task =
1020       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1021                                     &neighbour_keepalive_task, n);
1022
1023   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1024                             GNUNET_NO);
1025   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1026   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1027
1028
1029   ret =
1030       send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1031                         UINT32_MAX /* priority */ ,
1032                         GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1033                         GNUNET_YES, NULL, NULL);
1034
1035   n->expect_latency_response = GNUNET_NO;
1036   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1037   if (ret != GNUNET_SYSERR)
1038   {
1039     n->expect_latency_response = GNUNET_YES;
1040     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1041   }
1042
1043 }
1044
1045
1046 /**
1047  * Disconnect from the given neighbour.
1048  *
1049  * @param cls unused
1050  * @param key hash of neighbour's public key (not used)
1051  * @param value the 'struct NeighbourMapEntry' of the neighbour
1052  */
1053 static int
1054 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1055 {
1056   struct NeighbourMapEntry *n = value;
1057
1058 #if DEBUG_TRANSPORT
1059   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1060               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1061 #endif
1062   if (S_CONNECTED == n->state)
1063     GNUNET_STATISTICS_update (GST_stats,
1064                               gettext_noop
1065                               ("# peers disconnected due to global disconnect"),
1066                               1, GNUNET_NO);
1067   disconnect_neighbour (n);
1068   return GNUNET_OK;
1069 }
1070
1071
1072 static void
1073 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1074 {
1075   struct NeighbourMapEntry *n = cls;
1076
1077   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1078
1079   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1080               "ATS did not suggested address to connect to peer `%s'\n",
1081               GNUNET_i2s (&n->id));
1082
1083   disconnect_neighbour (n);
1084 }
1085
1086 /**
1087  * Cleanup the neighbours subsystem.
1088  */
1089 void
1090 GST_neighbours_stop ()
1091 {
1092   // This can happen during shutdown
1093   if (neighbours == NULL)
1094   {
1095     return;
1096   }
1097
1098   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1099                                          NULL);
1100   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1101 //  GNUNET_assert (neighbours_connected == 0);
1102   neighbours = NULL;
1103   callback_cls = NULL;
1104   connect_notify_cb = NULL;
1105   disconnect_notify_cb = NULL;
1106 }
1107
1108 struct ContinutionContext
1109 {
1110   struct GNUNET_HELLO_Address *address;
1111
1112   struct Session *session;
1113 };
1114
1115 static void
1116 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1117                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1118 {
1119   struct QuotaSetMessage q_msg;
1120
1121 #if DEBUG_TRANSPORT
1122   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1123               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1124               ntohl (quota.value__), GNUNET_i2s (target));
1125 #endif
1126   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1127   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1128   q_msg.quota = quota;
1129   q_msg.peer = (*target);
1130   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1131 }
1132
1133 /**
1134  * We tried to send a SESSION_CONNECT message to another peer.  If this
1135  * succeeded, we change the state.  If it failed, we should tell
1136  * ATS to not use this address anymore (until it is re-validated).
1137  *
1138  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1139  * @param target peer to send the message to
1140  * @param success GNUNET_OK on success
1141  */
1142 static void
1143 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1144                            int success)
1145 {
1146   struct ContinutionContext *cc = cls;
1147   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1148
1149   if (GNUNET_YES != success)
1150   {
1151     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1152     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1153   }
1154   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1155   {
1156     GNUNET_HELLO_address_free (cc->address);
1157     GNUNET_free (cc);
1158     return;
1159   }
1160
1161   if ((GNUNET_YES == success) &&
1162       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1163   {
1164     change_state (n, S_CONNECT_SENT);
1165     GNUNET_HELLO_address_free (cc->address);
1166     GNUNET_free (cc);
1167     return;
1168   }
1169
1170   if ((GNUNET_NO == success) &&
1171       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1172   {
1173 #if DEBUG_TRANSPORT
1174     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1176                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1177 #endif
1178     change_state (n, S_NOT_CONNECTED);
1179     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1180       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1181     n->ats_suggest =
1182         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1183                                       n);
1184     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1185   }
1186   GNUNET_HELLO_address_free (cc->address);
1187   GNUNET_free (cc);
1188 }
1189
1190
1191 /**
1192  * We tried to switch addresses with an peer already connected. If it failed,
1193  * we should tell ATS to not use this address anymore (until it is re-validated).
1194  *
1195  * @param cls the 'struct NeighbourMapEntry'
1196  * @param target peer to send the message to
1197  * @param success GNUNET_OK on success
1198  */
1199 static void
1200 send_switch_address_continuation (void *cls,
1201                                   const struct GNUNET_PeerIdentity *target,
1202                                   int success)
1203 {
1204   struct ContinutionContext *cc = cls;
1205   struct NeighbourMapEntry *n;
1206
1207   if (neighbours == NULL)
1208   {
1209     GNUNET_HELLO_address_free (cc->address);
1210     GNUNET_free (cc);
1211     return;                     /* neighbour is going away */
1212   }
1213
1214   n = lookup_neighbour (&cc->address->peer);
1215   if ((n == NULL) || (is_disconnecting (n)))
1216   {
1217     GNUNET_HELLO_address_free (cc->address);
1218     GNUNET_free (cc);
1219     return;                     /* neighbour is going away */
1220   }
1221
1222   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1223   if (GNUNET_YES != success)
1224   {
1225 #if DEBUG_TRANSPORT
1226     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227                 "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n",
1228                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1229 #endif
1230     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1231     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1232
1233     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1234       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1235     n->ats_suggest =
1236         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1237                                       n);
1238     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1239     GNUNET_HELLO_address_free (cc->address);
1240     GNUNET_free (cc);
1241     return;
1242   }
1243   /* Tell ATS that switching addresses was successful */
1244   switch (n->state)
1245   {
1246   case S_CONNECTED:
1247     if (n->address_state == FRESH)
1248     {
1249       GST_validation_set_address_use (cc->address, cc->session,
1250                                       GNUNET_YES);
1251       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1252       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1253       n->address_state = USED;
1254     }
1255     break;
1256   case S_FAST_RECONNECT:
1257 #if DEBUG_TRANSPORT
1258     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259                 "Successful fast reconnect to peer `%s'\n",
1260                 GNUNET_i2s (&n->id));
1261 #endif
1262     change_state (n, S_CONNECTED);
1263     neighbours_connected++;
1264     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1265                               GNUNET_NO);
1266
1267     if (n->address_state == FRESH)
1268     {
1269       GST_validation_set_address_use (cc->address, cc->session,
1270                                       GNUNET_YES);
1271       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1272       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1273       n->address_state = USED;
1274     }
1275
1276     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1277       n->keepalive_task =
1278           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1279
1280     /* Updating quotas */
1281     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1282     send_outbound_quota (target, n->bandwidth_out);
1283
1284   default:
1285     break;
1286   }
1287   GNUNET_HELLO_address_free (cc->address);
1288   GNUNET_free (cc);
1289 }
1290
1291
1292 /**
1293  * We tried to send a SESSION_CONNECT message to another peer.  If this
1294  * succeeded, we change the state.  If it failed, we should tell
1295  * ATS to not use this address anymore (until it is re-validated).
1296  *
1297  * @param cls the 'struct NeighbourMapEntry'
1298  * @param target peer to send the message to
1299  * @param success GNUNET_OK on success
1300  */
1301 static void
1302 send_connect_ack_continuation (void *cls,
1303                                const struct GNUNET_PeerIdentity *target,
1304                                int success)
1305 {
1306   struct ContinutionContext *cc = cls;
1307   struct NeighbourMapEntry *n;
1308
1309   if (neighbours == NULL)
1310   {
1311     GNUNET_HELLO_address_free (cc->address);
1312     GNUNET_free (cc);
1313     return;                     /* neighbour is going away */
1314   }
1315
1316   n = lookup_neighbour (&cc->address->peer);
1317   if ((n == NULL) || (is_disconnecting (n)))
1318   {
1319     GNUNET_HELLO_address_free (cc->address);
1320     GNUNET_free (cc);
1321     return;                     /* neighbour is going away */
1322   }
1323
1324   if (GNUNET_YES == success)
1325   {
1326     GNUNET_HELLO_address_free (cc->address);
1327     GNUNET_free (cc);
1328     return;                     /* sending successful */
1329   }
1330
1331   /* sending failed, ask for next address  */
1332 #if DEBUG_TRANSPORT
1333   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1334               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1335               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1336 #endif
1337   change_state (n, S_NOT_CONNECTED);
1338   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1339   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1340
1341   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1342     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1343   n->ats_suggest =
1344       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1345                                     n);
1346   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1347   GNUNET_HELLO_address_free (cc->address);
1348   GNUNET_free (cc);
1349 }
1350
1351
1352 /**
1353  * For an existing neighbour record, set the active connection to
1354  * the given address.
1355  *
1356  * @param peer identity of the peer to switch the address for
1357  * @param address address of the other peer, NULL if other peer
1358  *                       connected to us
1359  * @param session session to use (or NULL)
1360  * @param ats performance data
1361  * @param ats_count number of entries in ats
1362  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1363  *         connection is not up (yet)
1364  */
1365 int
1366 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1367                                        const struct GNUNET_HELLO_Address
1368                                        *address, struct Session *session,
1369                                        const struct GNUNET_ATS_Information *ats,
1370                                        uint32_t ats_count,
1371                                        struct GNUNET_BANDWIDTH_Value32NBO
1372                                        bandwidth_in,
1373                                        struct GNUNET_BANDWIDTH_Value32NBO
1374                                        bandwidth_out)
1375 {
1376   struct NeighbourMapEntry *n;
1377   struct SessionConnectMessage connect_msg;
1378   struct ContinutionContext *cc;
1379   size_t msg_len;
1380   size_t ret;
1381
1382   if (neighbours == NULL)
1383   {
1384     /* This can happen during shutdown */
1385     return GNUNET_NO;
1386   }
1387   n = lookup_neighbour (peer);
1388   if (NULL == n)
1389     return GNUNET_NO;
1390   if (n->state == S_DISCONNECT)
1391   {
1392     /* We are disconnecting, nothing to do here */
1393     return GNUNET_NO;
1394   }
1395   GNUNET_assert (address->transport_name != NULL);
1396   if ((session == NULL) && (0 == address->address_length))
1397   {
1398     GNUNET_break_op (0);
1399     /* FIXME: is this actually possible? When does this happen? */
1400     if (strlen (address->transport_name) > 0)
1401       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1402     GNUNET_ATS_suggest_address (GST_ats, peer);
1403     return GNUNET_NO;
1404   }
1405
1406   /* checks successful and neighbour != NULL */
1407 #if DEBUG_TRANSPORT
1408   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1409               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1410               GST_plugins_a2s (address), session, GNUNET_i2s (peer),
1411               print_state (n->state));
1412 #endif
1413   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1414   {
1415     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1416     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1417   }
1418   /* do not switch addresses just update quotas */
1419   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1420       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1421       (n->session == session))
1422   {
1423     n->bandwidth_in = bandwidth_in;
1424     n->bandwidth_out = bandwidth_out;
1425     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1426     send_outbound_quota (peer, n->bandwidth_out);
1427     return GNUNET_NO;
1428   }
1429   if (n->state == S_CONNECTED)
1430   {
1431     /* mark old address as no longer used */
1432     GNUNET_assert (NULL != n->address);
1433     if (n->address_state == USED)
1434     {
1435       GST_validation_set_address_use (n->address, n->session,
1436                                       GNUNET_NO);
1437       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1438       n->address_state = UNUSED;
1439     }
1440
1441   }
1442
1443   /* set new address */
1444   if (NULL != n->address)
1445     GNUNET_HELLO_address_free (n->address);
1446   n->address = GNUNET_HELLO_address_copy (address);
1447   n->address_state = FRESH;
1448   n->session = session;
1449   n->bandwidth_in = bandwidth_in;
1450   n->bandwidth_out = bandwidth_out;
1451   GNUNET_SCHEDULER_cancel (n->timeout_task);
1452   n->timeout_task =
1453       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1454                                     &neighbour_timeout_task, n);
1455   switch (n->state)
1456   {
1457   case S_NOT_CONNECTED:
1458   case S_CONNECT_SENT:
1459     msg_len = sizeof (struct SessionConnectMessage);
1460     connect_msg.header.size = htons (msg_len);
1461     connect_msg.header.type =
1462         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1463     connect_msg.reserved = htonl (0);
1464     connect_msg.timestamp =
1465         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1466
1467     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1468     cc->session = session;
1469     cc->address = GNUNET_HELLO_address_copy (address);
1470     ret =
1471         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1472                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1473                           address, GNUNET_YES, &send_connect_continuation, cc);
1474     return GNUNET_NO;
1475   case S_CONNECT_RECV:
1476     /* We received a CONNECT message and asked ATS for an address */
1477     msg_len = sizeof (struct SessionConnectMessage);
1478     connect_msg.header.size = htons (msg_len);
1479     connect_msg.header.type =
1480         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1481     connect_msg.reserved = htonl (0);
1482     connect_msg.timestamp =
1483         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1484     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1485     cc->session = session;
1486     cc->address = GNUNET_HELLO_address_copy (address);
1487     ret =
1488         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1489                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1490                           address, GNUNET_YES, &send_connect_ack_continuation,
1491                           cc);
1492     return GNUNET_NO;
1493   case S_CONNECTED:
1494   case S_FAST_RECONNECT:
1495     /* connected peer is switching addresses or tries fast reconnect */
1496     msg_len = sizeof (struct SessionConnectMessage);
1497     connect_msg.header.size = htons (msg_len);
1498     connect_msg.header.type =
1499         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1500     connect_msg.reserved = htonl (0);
1501     connect_msg.timestamp =
1502         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1503     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1504     cc->session = session;
1505     cc->address = GNUNET_HELLO_address_copy (address);
1506     ret =
1507         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1508                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1509                           address, GNUNET_YES,
1510                           &send_switch_address_continuation, cc);
1511     if (ret == GNUNET_SYSERR)
1512     {
1513       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1514                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1515                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1516     }
1517     return GNUNET_NO;
1518   default:
1519     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1520                 "Invalid connection state to switch addresses %u \n", n->state);
1521     GNUNET_break_op (0);
1522     return GNUNET_NO;
1523   }
1524 }
1525
1526
1527 /**
1528  * Obtain current latency information for the given neighbour.
1529  *
1530  * @param peer
1531  * @return observed latency of the address, FOREVER if the address was
1532  *         never successfully validated
1533  */
1534 struct GNUNET_TIME_Relative
1535 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1536 {
1537   struct NeighbourMapEntry *n;
1538
1539   n = lookup_neighbour (peer);
1540   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1541     return GNUNET_TIME_UNIT_FOREVER_REL;
1542
1543   return n->latency;
1544 }
1545
1546 /**
1547  * Obtain current address information for the given neighbour.
1548  *
1549  * @param peer
1550  * @return address currently used
1551  */
1552 struct GNUNET_HELLO_Address *
1553 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1554 {
1555   struct NeighbourMapEntry *n;
1556
1557   n = lookup_neighbour (peer);
1558   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1559     return NULL;
1560
1561   return n->address;
1562 }
1563
1564
1565
1566 /**
1567  * Create an entry in the neighbour map for the given peer
1568  *
1569  * @param peer peer to create an entry for
1570  * @return new neighbour map entry
1571  */
1572 static struct NeighbourMapEntry *
1573 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1574 {
1575   struct NeighbourMapEntry *n;
1576
1577 #if DEBUG_TRANSPORT
1578   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1579               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1580 #endif
1581   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1582   n->id = *peer;
1583   n->state = S_NOT_CONNECTED;
1584   n->latency = GNUNET_TIME_relative_get_forever ();
1585   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1586                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1587                                  MAX_BANDWIDTH_CARRY_S);
1588   n->timeout_task =
1589       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1590                                     &neighbour_timeout_task, n);
1591   GNUNET_assert (GNUNET_OK ==
1592                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1593                                                     &n->id.hashPubKey, n,
1594                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1595   return n;
1596 }
1597
1598
1599 /**
1600  * Try to create a connection to the given target (eventually).
1601  *
1602  * @param target peer to try to connect to
1603  */
1604 void
1605 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1606 {
1607   struct NeighbourMapEntry *n;
1608
1609   // This can happen during shutdown
1610   if (neighbours == NULL)
1611   {
1612     return;
1613   }
1614 #if DEBUG_TRANSPORT
1615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1616               GNUNET_i2s (target));
1617 #endif
1618   if (0 ==
1619       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1620   {
1621     /* my own hello */
1622     return;
1623   }
1624   n = lookup_neighbour (target);
1625
1626   if (NULL != n)
1627   {
1628     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1629       return;                   /* already connecting or connected */
1630     if (is_disconnecting (n))
1631       change_state (n, S_NOT_CONNECTED);
1632   }
1633
1634
1635   if (n == NULL)
1636     n = setup_neighbour (target);
1637 #if DEBUG_TRANSPORT
1638   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1639               "Asking ATS for suggested address to connect to peer `%s'\n",
1640               GNUNET_i2s (&n->id));
1641 #endif
1642
1643   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1644 }
1645
1646 /**
1647  * Test if we're connected to the given peer.
1648  *
1649  * @param target peer to test
1650  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1651  */
1652 int
1653 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1654 {
1655   struct NeighbourMapEntry *n;
1656
1657   // This can happen during shutdown
1658   if (neighbours == NULL)
1659   {
1660     return GNUNET_NO;
1661   }
1662
1663   n = lookup_neighbour (target);
1664
1665   if ((NULL == n) || (S_CONNECTED != n->state))
1666     return GNUNET_NO;           /* not connected */
1667   return GNUNET_YES;
1668 }
1669
1670 /**
1671  * A session was terminated. Take note.
1672  *
1673  * @param peer identity of the peer where the session died
1674  * @param session session that is gone
1675  */
1676 void
1677 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1678                                    struct Session *session)
1679 {
1680   struct NeighbourMapEntry *n;
1681
1682   if (neighbours == NULL)
1683   {
1684     /* This can happen during shutdown */
1685     return;
1686   }
1687
1688 #if DEBUG_TRANSPORT
1689   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Session %X to peer `%s' ended \n",
1690               session, GNUNET_i2s (peer));
1691 #endif
1692
1693   n = lookup_neighbour (peer);
1694   if (NULL == n)
1695     return;
1696   if (session != n->session)
1697     return;                     /* doesn't affect us */
1698   if (n->state == S_CONNECTED)
1699   {
1700     if (n->address_state == USED)
1701     {
1702       GST_validation_set_address_use (n->address, n->session,
1703                                       GNUNET_NO);
1704       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1705       n->address_state = UNUSED;
1706     }
1707   }
1708
1709   if (NULL != n->address)
1710   {
1711     GNUNET_HELLO_address_free (n->address);
1712     n->address = NULL;
1713   }
1714   n->session = NULL;
1715
1716   /* not connected anymore anyway, shouldn't matter */
1717   if (S_CONNECTED != n->state)
1718     return;
1719
1720   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1721   {
1722     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1723     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1724     n->expect_latency_response = GNUNET_NO;
1725   }
1726
1727   /* connected, try fast reconnect */
1728   /* statistics "transport" : "# peers connected" -= 1
1729    * neighbours_connected -= 1
1730    * BUT: no disconnect_cb to notify clients about disconnect
1731    */
1732 #if DEBUG_TRANSPORT
1733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1734               GNUNET_i2s (peer));
1735 #endif
1736   GNUNET_assert (neighbours_connected > 0);
1737   change_state (n, S_FAST_RECONNECT);
1738   neighbours_connected--;
1739   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
1740                             GNUNET_NO);
1741
1742
1743   /* We are connected, so ask ATS to switch addresses */
1744   GNUNET_SCHEDULER_cancel (n->timeout_task);
1745   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1746                                     &neighbour_timeout_task, n);
1747   /* try QUICKLY to re-establish a connection, reduce timeout! */
1748   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1749     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1750   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1751                                     n);
1752   GNUNET_ATS_suggest_address (GST_ats, peer);
1753 }
1754
1755
1756 /**
1757  * Transmit a message to the given target using the active connection.
1758  *
1759  * @param target destination
1760  * @param msg message to send
1761  * @param msg_size number of bytes in msg
1762  * @param timeout when to fail with timeout
1763  * @param cont function to call when done
1764  * @param cont_cls closure for 'cont'
1765  */
1766 void
1767 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1768                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1769                      GST_NeighbourSendContinuation cont, void *cont_cls)
1770 {
1771   struct NeighbourMapEntry *n;
1772   struct MessageQueue *mq;
1773
1774   // This can happen during shutdown
1775   if (neighbours == NULL)
1776   {
1777     return;
1778   }
1779
1780   n = lookup_neighbour (target);
1781   if ((n == NULL) || (!is_connected (n)))
1782   {
1783     GNUNET_STATISTICS_update (GST_stats,
1784                               gettext_noop
1785                               ("# messages not sent (no such peer or not connected)"),
1786                               1, GNUNET_NO);
1787 #if DEBUG_TRANSPORT
1788     if (n == NULL)
1789       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1790                   "Could not send message to peer `%s': unknown neighbour",
1791                   GNUNET_i2s (target));
1792     else if (!is_connected (n))
1793       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1794                   "Could not send message to peer `%s': not connected\n",
1795                   GNUNET_i2s (target));
1796 #endif
1797     if (NULL != cont)
1798       cont (cont_cls, GNUNET_SYSERR);
1799     return;
1800   }
1801
1802   if ((n->session == NULL) && (n->address == NULL))
1803   {
1804     GNUNET_STATISTICS_update (GST_stats,
1805                               gettext_noop
1806                               ("# messages not sent (no such peer or not connected)"),
1807                               1, GNUNET_NO);
1808 #if DEBUG_TRANSPORT
1809     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1810                 "Could not send message to peer `%s': no address available\n",
1811                 GNUNET_i2s (target));
1812 #endif
1813
1814     if (NULL != cont)
1815       cont (cont_cls, GNUNET_SYSERR);
1816     return;
1817   }
1818
1819   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1820   GNUNET_STATISTICS_update (GST_stats,
1821                             gettext_noop
1822                             ("# bytes in message queue for other peers"),
1823                             msg_size, GNUNET_NO);
1824   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1825   mq->cont = cont;
1826   mq->cont_cls = cont_cls;
1827   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1828   memcpy (&mq[1], msg, msg_size);
1829   mq->message_buf = (const char *) &mq[1];
1830   mq->message_buf_size = msg_size;
1831   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1832   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1833
1834   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1835       (NULL == n->is_active))
1836     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1837 }
1838
1839
1840 /**
1841  * We have received a message from the given sender.  How long should
1842  * we delay before receiving more?  (Also used to keep the peer marked
1843  * as live).
1844  *
1845  * @param sender sender of the message
1846  * @param size size of the message
1847  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1848  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1849  *                   GNUNET_SYSERR if the connection is not fully up yet
1850  * @return how long to wait before reading more from this sender
1851  */
1852 struct GNUNET_TIME_Relative
1853 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1854                                         *sender, ssize_t size, int *do_forward)
1855 {
1856   struct NeighbourMapEntry *n;
1857   struct GNUNET_TIME_Relative ret;
1858
1859   // This can happen during shutdown
1860   if (neighbours == NULL)
1861   {
1862     return GNUNET_TIME_UNIT_FOREVER_REL;
1863   }
1864
1865   n = lookup_neighbour (sender);
1866   if (n == NULL)
1867   {
1868     GST_neighbours_try_connect (sender);
1869     n = lookup_neighbour (sender);
1870     if (NULL == n)
1871     {
1872       GNUNET_STATISTICS_update (GST_stats,
1873                                 gettext_noop
1874                                 ("# messages discarded due to lack of neighbour record"),
1875                                 1, GNUNET_NO);
1876       *do_forward = GNUNET_NO;
1877       return GNUNET_TIME_UNIT_ZERO;
1878     }
1879   }
1880   if (!is_connected (n))
1881   {
1882     *do_forward = GNUNET_SYSERR;
1883     return GNUNET_TIME_UNIT_ZERO;
1884   }
1885   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1886   {
1887     n->quota_violation_count++;
1888 #if DEBUG_TRANSPORT
1889     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1890                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1891                 n->in_tracker.available_bytes_per_s__,
1892                 n->quota_violation_count);
1893 #endif
1894     /* Discount 32k per violation */
1895     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1896   }
1897   else
1898   {
1899     if (n->quota_violation_count > 0)
1900     {
1901       /* try to add 32k back */
1902       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1903       n->quota_violation_count--;
1904     }
1905   }
1906   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1907   {
1908     GNUNET_STATISTICS_update (GST_stats,
1909                               gettext_noop
1910                               ("# bandwidth quota violations by other peers"),
1911                               1, GNUNET_NO);
1912     *do_forward = GNUNET_NO;
1913     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1914   }
1915   *do_forward = GNUNET_YES;
1916   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1917   if (ret.rel_value > 0)
1918   {
1919 #if DEBUG_TRANSPORT
1920     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1921                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1922                 (unsigned long long) n->in_tracker.
1923                 consumption_since_last_update__,
1924                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1925                 (unsigned long long) ret.rel_value);
1926 #endif
1927     GNUNET_STATISTICS_update (GST_stats,
1928                               gettext_noop ("# ms throttling suggested"),
1929                               (int64_t) ret.rel_value, GNUNET_NO);
1930   }
1931   return ret;
1932 }
1933
1934
1935 /**
1936  * Keep the connection to the given neighbour alive longer,
1937  * we received a KEEPALIVE (or equivalent).
1938  *
1939  * @param neighbour neighbour to keep alive
1940  */
1941 void
1942 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1943 {
1944   struct NeighbourMapEntry *n;
1945
1946   // This can happen during shutdown
1947   if (neighbours == NULL)
1948   {
1949     return;
1950   }
1951
1952   n = lookup_neighbour (neighbour);
1953   if (NULL == n)
1954   {
1955     GNUNET_STATISTICS_update (GST_stats,
1956                               gettext_noop
1957                               ("# KEEPALIVE messages discarded (not connected)"),
1958                               1, GNUNET_NO);
1959     return;
1960   }
1961   GNUNET_SCHEDULER_cancel (n->timeout_task);
1962   n->timeout_task =
1963       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1964                                     &neighbour_timeout_task, n);
1965
1966   /* send reply to measure latency */
1967   if (S_CONNECTED != n->state)
1968     return;
1969
1970   struct GNUNET_MessageHeader m;
1971
1972   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1973   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
1974
1975   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1976                     UINT32_MAX /* priority */ ,
1977                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1978                     GNUNET_YES, NULL, NULL);
1979 }
1980
1981 /**
1982  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
1983  * to this peer
1984  *
1985  * @param neighbour neighbour to keep alive
1986  * @param ats performance data
1987  * @param ats_count number of entries in ats
1988  */
1989 void
1990 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
1991                                    const struct GNUNET_ATS_Information *ats,
1992                                    uint32_t ats_count)
1993 {
1994   struct NeighbourMapEntry *n;
1995   struct GNUNET_ATS_Information *ats_new;
1996   uint32_t latency;
1997
1998   if (neighbours == NULL)
1999   {
2000     // This can happen during shutdown
2001     return;
2002   }
2003
2004   n = lookup_neighbour (neighbour);
2005   if ((NULL == n) || (n->state != S_CONNECTED))
2006   {
2007     GNUNET_STATISTICS_update (GST_stats,
2008                               gettext_noop
2009                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2010                               1, GNUNET_NO);
2011     return;
2012   }
2013   if (n->expect_latency_response != GNUNET_YES)
2014   {
2015     GNUNET_STATISTICS_update (GST_stats,
2016                               gettext_noop
2017                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2018                               1, GNUNET_NO);
2019     return;
2020   }
2021   n->expect_latency_response = GNUNET_NO;
2022
2023   GNUNET_assert (n->keep_alive_sent.abs_value !=
2024                  GNUNET_TIME_absolute_get_zero ().abs_value);
2025   n->latency =
2026       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2027                                            GNUNET_TIME_absolute_get ());
2028 #if DEBUG_TRANSPORT
2029   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2030               GNUNET_i2s (&n->id), n->latency.rel_value);
2031 #endif
2032
2033
2034   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2035   {
2036     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2037   }
2038   else
2039   {
2040     ats_new =
2041         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2042                        (ats_count + 1));
2043     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2044
2045     /* add latency */
2046     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2047     if (n->latency.rel_value > UINT32_MAX)
2048       latency = UINT32_MAX;
2049     else
2050       latency = n->latency.rel_value;
2051     ats_new[ats_count].value = htonl (latency);
2052
2053     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2054                                ats_count + 1);
2055     GNUNET_free (ats_new);
2056   }
2057 }
2058
2059
2060 /**
2061  * Change the incoming quota for the given peer.
2062  *
2063  * @param neighbour identity of peer to change qutoa for
2064  * @param quota new quota
2065  */
2066 void
2067 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2068                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2069 {
2070   struct NeighbourMapEntry *n;
2071
2072   // This can happen during shutdown
2073   if (neighbours == NULL)
2074   {
2075     return;
2076   }
2077
2078   n = lookup_neighbour (neighbour);
2079   if (n == NULL)
2080   {
2081     GNUNET_STATISTICS_update (GST_stats,
2082                               gettext_noop
2083                               ("# SET QUOTA messages ignored (no such peer)"),
2084                               1, GNUNET_NO);
2085     return;
2086   }
2087 #if DEBUG_TRANSPORT
2088   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2089               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2090               ntohl (quota.value__), GNUNET_i2s (&n->id));
2091 #endif
2092   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2093   if (0 != ntohl (quota.value__))
2094     return;
2095 #if DEBUG_TRANSPORT
2096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2097               GNUNET_i2s (&n->id), "SET_QUOTA");
2098 #endif
2099   if (is_connected (n))
2100     GNUNET_STATISTICS_update (GST_stats,
2101                               gettext_noop ("# disconnects due to quota of 0"),
2102                               1, GNUNET_NO);
2103   disconnect_neighbour (n);
2104 }
2105
2106
2107 /**
2108  * Closure for the neighbours_iterate function.
2109  */
2110 struct IteratorContext
2111 {
2112   /**
2113    * Function to call on each connected neighbour.
2114    */
2115   GST_NeighbourIterator cb;
2116
2117   /**
2118    * Closure for 'cb'.
2119    */
2120   void *cb_cls;
2121 };
2122
2123
2124 /**
2125  * Call the callback from the closure for each connected neighbour.
2126  *
2127  * @param cls the 'struct IteratorContext'
2128  * @param key the hash of the public key of the neighbour
2129  * @param value the 'struct NeighbourMapEntry'
2130  * @return GNUNET_OK (continue to iterate)
2131  */
2132 static int
2133 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2134 {
2135   struct IteratorContext *ic = cls;
2136   struct NeighbourMapEntry *n = value;
2137
2138   if (!is_connected (n))
2139     return GNUNET_OK;
2140
2141   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2142   return GNUNET_OK;
2143 }
2144
2145
2146 /**
2147  * Iterate over all connected neighbours.
2148  *
2149  * @param cb function to call
2150  * @param cb_cls closure for cb
2151  */
2152 void
2153 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2154 {
2155   struct IteratorContext ic;
2156
2157   // This can happen during shutdown
2158   if (neighbours == NULL)
2159   {
2160     return;
2161   }
2162
2163   ic.cb = cb;
2164   ic.cb_cls = cb_cls;
2165   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2166 }
2167
2168 /**
2169  * If we have an active connection to the given target, it must be shutdown.
2170  *
2171  * @param target peer to disconnect from
2172  */
2173 void
2174 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2175 {
2176   struct NeighbourMapEntry *n;
2177
2178   // This can happen during shutdown
2179   if (neighbours == NULL)
2180   {
2181     return;
2182   }
2183
2184   n = lookup_neighbour (target);
2185   if (NULL == n)
2186     return;                     /* not active */
2187   disconnect_neighbour (n);
2188 }
2189
2190
2191 /**
2192  * We received a disconnect message from the given peer,
2193  * validate and process.
2194  *
2195  * @param peer sender of the message
2196  * @param msg the disconnect message
2197  */
2198 void
2199 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2200                                           *peer,
2201                                           const struct GNUNET_MessageHeader
2202                                           *msg)
2203 {
2204   struct NeighbourMapEntry *n;
2205   const struct SessionDisconnectMessage *sdm;
2206   GNUNET_HashCode hc;
2207
2208 #if DEBUG_TRANSPORT
2209   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2210               "Received DISCONNECT message from peer `%s'\n",
2211               GNUNET_i2s (peer));
2212 #endif
2213
2214   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2215   {
2216     // GNUNET_break_op (0);
2217     GNUNET_STATISTICS_update (GST_stats,
2218                               gettext_noop
2219                               ("# disconnect messages ignored (old format)"), 1,
2220                               GNUNET_NO);
2221     return;
2222   }
2223   sdm = (const struct SessionDisconnectMessage *) msg;
2224   n = lookup_neighbour (peer);
2225   if (NULL == n)
2226     return;                     /* gone already */
2227   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2228       n->connect_ts.abs_value)
2229   {
2230     GNUNET_STATISTICS_update (GST_stats,
2231                               gettext_noop
2232                               ("# disconnect messages ignored (timestamp)"), 1,
2233                               GNUNET_NO);
2234     return;
2235   }
2236   GNUNET_CRYPTO_hash (&sdm->public_key,
2237                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2238                       &hc);
2239   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2240   {
2241     GNUNET_break_op (0);
2242     return;
2243   }
2244   if (ntohl (sdm->purpose.size) !=
2245       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2246       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2247       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2248   {
2249     GNUNET_break_op (0);
2250     return;
2251   }
2252   if (GNUNET_OK !=
2253       GNUNET_CRYPTO_rsa_verify
2254       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2255        &sdm->signature, &sdm->public_key))
2256   {
2257     GNUNET_break_op (0);
2258     return;
2259   }
2260   GST_neighbours_force_disconnect (peer);
2261 }
2262
2263
2264 /**
2265  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2266  * Consider switching to it.
2267  *
2268  * @param message possibly a 'struct SessionConnectMessage' (check format)
2269  * @param peer identity of the peer to switch the address for
2270  * @param address address of the other peer, NULL if other peer
2271  *                       connected to us
2272  * @param session session to use (or NULL)
2273  * @param ats performance data
2274  * @param ats_count number of entries in ats
2275  */
2276 void
2277 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2278                                    const struct GNUNET_PeerIdentity *peer,
2279                                    const struct GNUNET_HELLO_Address *address,
2280                                    struct Session *session,
2281                                    const struct GNUNET_ATS_Information *ats,
2282                                    uint32_t ats_count)
2283 {
2284   const struct SessionConnectMessage *scm;
2285   struct GNUNET_MessageHeader msg;
2286   struct NeighbourMapEntry *n;
2287   size_t msg_len;
2288   size_t ret;
2289
2290 #if DEBUG_TRANSPORT
2291   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2292               "Received CONNECT_ACK message from peer `%s'\n",
2293               GNUNET_i2s (peer));
2294 #endif
2295
2296   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2297   {
2298     GNUNET_break_op (0);
2299     return;
2300   }
2301   scm = (const struct SessionConnectMessage *) message;
2302   GNUNET_break_op (ntohl (scm->reserved) == 0);
2303   n = lookup_neighbour (peer);
2304   if (NULL == n)
2305   {
2306     /* we did not send 'CONNECT' -- at least not recently */
2307     GNUNET_STATISTICS_update (GST_stats,
2308                               gettext_noop
2309                               ("# unexpected CONNECT_ACK messages (no peer)"),
2310                               1, GNUNET_NO);
2311     return;
2312   }
2313
2314   /* Additional check
2315    *
2316    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2317    *
2318    * We also received an CONNECT message, switched from SENDT to RECV and
2319    * ATS already suggested us an address after a successful blacklist check
2320    */
2321   if ((n->state != S_CONNECT_SENT) &&
2322       ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2323   {
2324     GNUNET_STATISTICS_update (GST_stats,
2325                               gettext_noop
2326                               ("# unexpected CONNECT_ACK messages"), 1,
2327                               GNUNET_NO);
2328     return;
2329   }
2330
2331   change_state (n, S_CONNECTED);
2332
2333   if (NULL != session)
2334   {
2335     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2336                      "transport-ats",
2337                      "Giving ATS session %p of plugin %s for peer %s\n",
2338                      session, address->transport_name, GNUNET_i2s (peer));
2339   }
2340   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2341   GNUNET_assert (NULL != n->address);
2342   if (n->address_state == FRESH)
2343   {
2344     GST_validation_set_address_use (n->address, n->session, GNUNET_YES);
2345     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2346     n->address_state = USED;
2347   }
2348
2349   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2350
2351   /* send ACK (ACK) */
2352   msg_len = sizeof (msg);
2353   msg.size = htons (msg_len);
2354   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2355
2356   ret =
2357       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2358                         GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
2359                         GNUNET_YES, NULL, NULL);
2360
2361   if (ret == GNUNET_SYSERR)
2362     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2363                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2364                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2365
2366
2367   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2368     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2369
2370   neighbours_connected++;
2371   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2372                             GNUNET_NO);
2373 #if DEBUG_TRANSPORT
2374   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2375               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2376               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2377               __LINE__);
2378 #endif
2379   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2380   send_outbound_quota (peer, n->bandwidth_out);
2381
2382 }
2383
2384
2385 void
2386 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2387                            const struct GNUNET_PeerIdentity *peer,
2388                            const struct GNUNET_HELLO_Address *address,
2389                            struct Session *session,
2390                            const struct GNUNET_ATS_Information *ats,
2391                            uint32_t ats_count)
2392 {
2393   struct NeighbourMapEntry *n;
2394
2395 #if DEBUG_TRANSPORT
2396   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2397               GNUNET_i2s (peer));
2398 #endif
2399
2400   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2401   {
2402     GNUNET_break_op (0);
2403     return;
2404   }
2405   n = lookup_neighbour (peer);
2406   if (NULL == n)
2407   {
2408     send_disconnect (peer, address, session);
2409     GNUNET_break (0);
2410     return;
2411   }
2412   if (S_CONNECTED == n->state)
2413     return;
2414   if (!is_connecting (n))
2415   {
2416     GNUNET_STATISTICS_update (GST_stats,
2417                               gettext_noop ("# unexpected ACK messages"), 1,
2418                               GNUNET_NO);
2419     return;
2420   }
2421   change_state (n, S_CONNECTED);
2422   if (NULL != session)
2423     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2424                      "transport-ats",
2425                      "Giving ATS session %p of plugin %s for peer %s\n",
2426                      session, address->transport_name, GNUNET_i2s (peer));
2427   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2428   GNUNET_assert (n->address != NULL);
2429   if (n->address_state == FRESH)
2430   {
2431     GST_validation_set_address_use (n->address, n->session, GNUNET_YES);
2432     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2433     n->address_state = USED;
2434   }
2435
2436   neighbours_connected++;
2437   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2438                             GNUNET_NO);
2439
2440   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2441   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2442     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2443 #if DEBUG_TRANSPORT
2444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2445               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2446               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2447               __LINE__);
2448 #endif
2449   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2450   send_outbound_quota (peer, n->bandwidth_out);
2451 }
2452
2453 struct BlackListCheckContext
2454 {
2455   struct GNUNET_ATS_Information *ats;
2456
2457   uint32_t ats_count;
2458
2459   struct Session *session;
2460
2461   struct GNUNET_HELLO_Address *address;
2462
2463   struct GNUNET_TIME_Absolute ts;
2464 };
2465
2466
2467 static void
2468 handle_connect_blacklist_cont (void *cls,
2469                                const struct GNUNET_PeerIdentity *peer,
2470                                int result)
2471 {
2472   struct NeighbourMapEntry *n;
2473   struct BlackListCheckContext *bcc = cls;
2474
2475 #if DEBUG_TRANSPORT
2476   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2477               "Blacklist check due to CONNECT message: `%s'\n",
2478               GNUNET_i2s (peer),
2479               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2480 #endif
2481
2482   /* not allowed */
2483   if (GNUNET_OK != result)
2484   {
2485     GNUNET_HELLO_address_free (bcc->address);
2486     GNUNET_free (bcc);
2487     return;
2488   }
2489
2490   n = lookup_neighbour (peer);
2491   if (NULL == n)
2492     n = setup_neighbour (peer);
2493
2494   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2495   {
2496     if (NULL != bcc->session)
2497       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2498                        "transport-ats",
2499                        "Giving ATS session %p of address `%s' for peer %s\n",
2500                        bcc->session, GST_plugins_a2s (bcc->address),
2501                        GNUNET_i2s (peer));
2502     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2503
2504     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2505                                bcc->ats_count);
2506     n->connect_ts = bcc->ts;
2507   }
2508
2509   GNUNET_HELLO_address_free (bcc->address);
2510   GNUNET_free (bcc);
2511
2512   if (n->state != S_CONNECT_RECV)
2513     change_state (n, S_CONNECT_RECV);
2514
2515
2516   /* Ask ATS for an address to connect via that address */
2517   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2518     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2519   n->ats_suggest =
2520       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2521                                     n);
2522   GNUNET_ATS_suggest_address (GST_ats, peer);
2523 }
2524
2525 /**
2526  * We received a 'SESSION_CONNECT' message from the other peer.
2527  * Consider switching to it.
2528  *
2529  * @param message possibly a 'struct SessionConnectMessage' (check format)
2530  * @param peer identity of the peer to switch the address for
2531  * @param address address of the other peer, NULL if other peer
2532  *                       connected to us
2533  * @param session session to use (or NULL)
2534  * @param ats performance data
2535  * @param ats_count number of entries in ats (excluding 0-termination)
2536  */
2537 void
2538 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2539                                const struct GNUNET_PeerIdentity *peer,
2540                                const struct GNUNET_HELLO_Address *address,
2541                                struct Session *session,
2542                                const struct GNUNET_ATS_Information *ats,
2543                                uint32_t ats_count)
2544 {
2545   const struct SessionConnectMessage *scm;
2546   struct BlackListCheckContext *bcc = NULL;
2547   struct NeighbourMapEntry *n;
2548
2549 #if DEBUG_TRANSPORT
2550   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2551               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2552 #endif
2553
2554   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2555   {
2556     GNUNET_break_op (0);
2557     return;
2558   }
2559
2560   scm = (const struct SessionConnectMessage *) message;
2561   GNUNET_break_op (ntohl (scm->reserved) == 0);
2562
2563   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2564
2565   n = lookup_neighbour (peer);
2566   if ((n != NULL) && (S_CONNECTED == n->state))
2567   {
2568     /* connected peer switches addresses */
2569     return;
2570   }
2571
2572
2573   /* we are not connected to this peer */
2574   /* do blacklist check */
2575   bcc =
2576       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2577                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2578   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2579   bcc->ats_count = ats_count + 1;
2580   bcc->address = GNUNET_HELLO_address_copy (address);
2581   bcc->session = session;
2582   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2583   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2584   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2585   bcc->ats[ats_count].value =
2586       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2587   GST_blacklist_test_allowed (peer, address->transport_name,
2588                               handle_connect_blacklist_cont, bcc);
2589 }
2590
2591
2592 /* end of file gnunet-service-transport_neighbours.c */