removing ats functions from plugins, instead provide callback function
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
61
62 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
63
64 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
65
66
67 /**
68  * Entry in neighbours.
69  */
70 struct NeighbourMapEntry;
71
72 /**
73  * Message a peer sends to another to indicate its
74  * preference for communicating via a particular
75  * session (and the desire to establish a real
76  * connection).
77  */
78 struct SessionConnectMessage
79 {
80   /**
81    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
82    */
83   struct GNUNET_MessageHeader header;
84
85   /**
86    * Always zero.
87    */
88   uint32_t reserved GNUNET_PACKED;
89
90   /**
91    * Absolute time at the sender.  Only the most recent connect
92    * message implies which session is preferred by the sender.
93    */
94   struct GNUNET_TIME_AbsoluteNBO timestamp;
95
96 };
97
98
99 struct SessionDisconnectMessage
100 {
101   /**
102    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
103    */
104   struct GNUNET_MessageHeader header;
105
106   /**
107    * Always zero.
108    */
109   uint32_t reserved GNUNET_PACKED;
110
111   /**
112    * Purpose of the signature.  Extends over the timestamp.
113    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
114    */
115   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
116
117   /**
118    * Absolute time at the sender.  Only the most recent connect
119    * message implies which session is preferred by the sender.
120    */
121   struct GNUNET_TIME_AbsoluteNBO timestamp;
122
123   /**
124    * Public key of the sender.
125    */
126   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
127
128   /**
129    * Signature of the peer that sends us the disconnect.  Only
130    * valid if the timestamp is AFTER the timestamp from the
131    * corresponding 'CONNECT' message.
132    */
133   struct GNUNET_CRYPTO_RsaSignature signature;
134
135 };
136
137
138 /**
139  * For each neighbour we keep a list of messages
140  * that we still want to transmit to the neighbour.
141  */
142 struct MessageQueue
143 {
144
145   /**
146    * This is a doubly linked list.
147    */
148   struct MessageQueue *next;
149
150   /**
151    * This is a doubly linked list.
152    */
153   struct MessageQueue *prev;
154
155   /**
156    * Once this message is actively being transmitted, which
157    * neighbour is it associated with?
158    */
159   struct NeighbourMapEntry *n;
160
161   /**
162    * Function to call once we're done.
163    */
164   GST_NeighbourSendContinuation cont;
165
166   /**
167    * Closure for 'cont'
168    */
169   void *cont_cls;
170
171   /**
172    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
173    * stuck together in memory.  Allocated at the end of this struct.
174    */
175   const char *message_buf;
176
177   /**
178    * Size of the message buf
179    */
180   size_t message_buf_size;
181
182   /**
183    * At what time should we fail?
184    */
185   struct GNUNET_TIME_Absolute timeout;
186
187 };
188
189
190 enum State
191 {
192   /**
193    * fresh peer or completely disconnected
194    */
195   S_NOT_CONNECTED,
196
197   /**
198    * sent CONNECT message to other peer, waiting for CONNECT_ACK
199    */
200   S_CONNECT_SENT,
201
202   /**
203    * received CONNECT message to other peer, sending CONNECT_ACK
204    */
205   S_CONNECT_RECV,
206
207   /**
208    * received ACK or payload
209    */
210   S_CONNECTED,
211
212   /**
213    * connection ended, fast reconnect
214    */
215   S_FAST_RECONNECT,
216
217   /**
218    * Disconnect in progress
219    */
220   S_DISCONNECT
221 };
222
223 enum Address_State
224 {
225   USED,
226   UNUSED,
227   FRESH,
228 };
229
230
231 /**
232  * Entry in neighbours.
233  */
234 struct NeighbourMapEntry
235 {
236
237   /**
238    * Head of list of messages we would like to send to this peer;
239    * must contain at most one message per client.
240    */
241   struct MessageQueue *messages_head;
242
243   /**
244    * Tail of list of messages we would like to send to this peer; must
245    * contain at most one message per client.
246    */
247   struct MessageQueue *messages_tail;
248
249   /**
250    * Are we currently trying to send a message? If so, which one?
251    */
252   struct MessageQueue *is_active;
253
254   /**
255    * Active session for communicating with the peer.
256    */
257   struct Session *session;
258
259   /**
260    * Address we currently use.
261    */
262   struct GNUNET_HELLO_Address *address;
263
264   /**
265    * Identity of this neighbour.
266    */
267   struct GNUNET_PeerIdentity id;
268
269   /**
270    * ID of task scheduled to run when this peer is about to
271    * time out (will free resources associated with the peer).
272    */
273   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
274
275   /**
276    * ID of task scheduled to send keepalives.
277    */
278   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
279
280   /**
281    * ID of task scheduled to run when we should try transmitting
282    * the head of the message queue.
283    */
284   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
285
286   /**
287    * Tracker for inbound bandwidth.
288    */
289   struct GNUNET_BANDWIDTH_Tracker in_tracker;
290
291   /**
292    * Inbound bandwidth from ATS, activated when connection is up
293    */
294   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
295
296   /**
297    * Inbound bandwidth from ATS, activated when connection is up
298    */
299   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
300
301   /**
302    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
303    */
304   struct GNUNET_TIME_Absolute connect_ts;
305
306   /**
307    * When did we sent the last keep-alive message?
308    */
309   struct GNUNET_TIME_Absolute keep_alive_sent;
310
311   /**
312    * Latest calculated latency value
313    */
314   struct GNUNET_TIME_Relative latency;
315
316   /**
317    * Timeout for ATS
318    * We asked ATS for a new address for this peer
319    */
320   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
321
322   /**
323    * Task the resets the peer state after due to an pending
324    * unsuccessful connection setup
325    */
326   GNUNET_SCHEDULER_TaskIdentifier state_reset;
327
328
329   /**
330    * How often has the other peer (recently) violated the inbound
331    * traffic limit?  Incremented by 10 per violation, decremented by 1
332    * per non-violation (for each time interval).
333    */
334   unsigned int quota_violation_count;
335
336
337   /**
338    * The current state of the peer
339    * Element of enum State
340    */
341   int state;
342
343   /**
344    * Did we sent an KEEP_ALIVE message and are we expecting a response?
345    */
346   int expect_latency_response;
347   int address_state;
348 };
349
350
351 /**
352  * All known neighbours and their HELLOs.
353  */
354 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
355
356 /**
357  * Closure for connect_notify_cb and disconnect_notify_cb
358  */
359 static void *callback_cls;
360
361 /**
362  * Function to call when we connected to a neighbour.
363  */
364 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
365
366 /**
367  * Function to call when we disconnected from a neighbour.
368  */
369 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
370
371 /**
372  * counter for connected neighbours
373  */
374 static int neighbours_connected;
375
376 /**
377  * Lookup a neighbour entry in the neighbours hash map.
378  *
379  * @param pid identity of the peer to look up
380  * @return the entry, NULL if there is no existing record
381  */
382 static struct NeighbourMapEntry *
383 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
384 {
385   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
386 }
387
388 #define change_state(n, state, ...) change (n, state, __LINE__)
389
390 static int
391 is_connecting (struct NeighbourMapEntry *n)
392 {
393   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
394     return GNUNET_YES;
395   return GNUNET_NO;
396 }
397
398 static int
399 is_connected (struct NeighbourMapEntry *n)
400 {
401   if (n->state == S_CONNECTED)
402     return GNUNET_YES;
403   return GNUNET_NO;
404 }
405
406 static int
407 is_disconnecting (struct NeighbourMapEntry *n)
408 {
409   if (n->state == S_DISCONNECT)
410     return GNUNET_YES;
411   return GNUNET_NO;
412 }
413
414 static const char *
415 print_state (int state)
416 {
417   switch (state)
418   {
419   case S_CONNECTED:
420     return "S_CONNECTED";
421     break;
422   case S_CONNECT_RECV:
423     return "S_CONNECT_RECV";
424     break;
425   case S_CONNECT_SENT:
426     return "S_CONNECT_SENT";
427     break;
428   case S_DISCONNECT:
429     return "S_DISCONNECT";
430     break;
431   case S_NOT_CONNECTED:
432     return "S_NOT_CONNECTED";
433     break;
434   case S_FAST_RECONNECT:
435     return "S_FAST_RECONNECT";
436     break;
437   default:
438     GNUNET_break (0);
439     break;
440   }
441   return NULL;
442 }
443
444 static int
445 change (struct NeighbourMapEntry *n, int state, int line);
446
447 static void
448 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
449
450
451 static void
452 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
453 {
454   struct NeighbourMapEntry *n = cls;
455
456   if (n == NULL)
457     return;
458
459   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
460   if (n->state == S_CONNECTED)
461     return;
462
463 #if DEBUG_TRANSPORT
464   GNUNET_STATISTICS_update (GST_stats,
465                             gettext_noop
466                             ("# failed connection attempts due to timeout"), 1,
467                             GNUNET_NO);
468 #endif
469
470   /* resetting state */
471   n->state = S_NOT_CONNECTED;
472
473   /* destroying address */
474   if (n->address != NULL)
475   {
476     GNUNET_assert (strlen (n->address->transport_name) > 0);
477     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
478   }
479
480   /* request new address */
481   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
482     GNUNET_SCHEDULER_cancel (n->ats_suggest);
483   n->ats_suggest =
484       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
485                                     n);
486   GNUNET_ATS_suggest_address (GST_ats, &n->id);
487 }
488
489 static int
490 change (struct NeighbourMapEntry *n, int state, int line)
491 {
492   /* allowed transitions */
493   int allowed = GNUNET_NO;
494
495   switch (n->state)
496   {
497   case S_NOT_CONNECTED:
498     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
499         (state == S_DISCONNECT))
500       allowed = GNUNET_YES;
501     break;
502   case S_CONNECT_RECV:
503     allowed = GNUNET_YES;
504     break;
505   case S_CONNECT_SENT:
506     allowed = GNUNET_YES;
507     break;
508   case S_CONNECTED:
509     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
510       allowed = GNUNET_YES;
511     break;
512   case S_DISCONNECT:
513     break;
514   case S_FAST_RECONNECT:
515     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
516       allowed = GNUNET_YES;
517     break;
518   default:
519     GNUNET_break (0);
520     break;
521   }
522   if (allowed == GNUNET_NO)
523   {
524     char *old = GNUNET_strdup (print_state (n->state));
525     char *new = GNUNET_strdup (print_state (state));
526
527     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
528                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
529                 new, line);
530     GNUNET_break (0);
531     GNUNET_free (old);
532     GNUNET_free (new);
533     return GNUNET_SYSERR;
534   }
535 #if DEBUG_TRANSPORT
536   {
537     char *old = GNUNET_strdup (print_state (n->state));
538     char *new = GNUNET_strdup (print_state (state));
539
540     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
541                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
542                 GNUNET_i2s (&n->id), n, old, new, line);
543     GNUNET_free (old);
544     GNUNET_free (new);
545   }
546 #endif
547   n->state = state;
548
549   switch (n->state)
550   {
551   case S_FAST_RECONNECT:
552   case S_CONNECT_RECV:
553   case S_CONNECT_SENT:
554     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
555       GNUNET_SCHEDULER_cancel (n->state_reset);
556     n->state_reset =
557         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
558     break;
559   case S_CONNECTED:
560   case S_NOT_CONNECTED:
561   case S_DISCONNECT:
562     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
563     {
564 #if DEBUG_TRANSPORT
565       char *old = GNUNET_strdup (print_state (n->state));
566       char *new = GNUNET_strdup (print_state (state));
567
568       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
569                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
570                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
571       GNUNET_free (old);
572       GNUNET_free (new);
573 #endif
574       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
575       GNUNET_SCHEDULER_cancel (n->state_reset);
576       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
577     }
578     break;
579
580   default:
581     GNUNET_assert (0);
582   }
583
584
585
586   return GNUNET_OK;
587 }
588
589 static ssize_t
590 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
591                   size_t msgbuf_size, uint32_t priority,
592                   struct GNUNET_TIME_Relative timeout, struct Session *session,
593                   const struct GNUNET_HELLO_Address *address, int force_address,
594                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
595 {
596   struct GNUNET_TRANSPORT_PluginFunctions *papi;
597   size_t ret = GNUNET_SYSERR;
598
599   /* FIXME : ats returns an address with all values 0 */
600   if (address == NULL)
601   {
602     if (cont != NULL)
603       cont (cont_cls, target, GNUNET_SYSERR);
604     return GNUNET_SYSERR;
605   }
606
607   if ((session == NULL) && (address->address_length == 0))
608   {
609     if (cont != NULL)
610       cont (cont_cls, target, GNUNET_SYSERR);
611     return GNUNET_SYSERR;
612   }
613
614   papi = GST_plugins_find (address->transport_name);
615   if (papi == NULL)
616   {
617     if (cont != NULL)
618       cont (cont_cls, target, GNUNET_SYSERR);
619     return GNUNET_SYSERR;
620   }
621
622   ret =
623       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
624                   address->address, address->address_length, GNUNET_YES, cont,
625                   cont_cls);
626
627   if (ret == -1)
628   {
629     if (cont != NULL)
630       cont (cont_cls, target, GNUNET_SYSERR);
631   }
632   return ret;
633 }
634
635 /**
636  * Task invoked to start a transmission to another peer.
637  *
638  * @param cls the 'struct NeighbourMapEntry'
639  * @param tc scheduler context
640  */
641 static void
642 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
643
644
645 /**
646  * We're done with our transmission attempt, continue processing.
647  *
648  * @param cls the 'struct MessageQueue' of the message
649  * @param receiver intended receiver
650  * @param success whether it worked or not
651  */
652 static void
653 transmit_send_continuation (void *cls,
654                             const struct GNUNET_PeerIdentity *receiver,
655                             int success)
656 {
657   struct MessageQueue *mq;
658   struct NeighbourMapEntry *n;
659   struct NeighbourMapEntry *tmp;
660
661   tmp = lookup_neighbour (receiver);
662
663   mq = cls;
664   n = mq->n;
665   if ((NULL != n) && (tmp != NULL) && (tmp == n))
666   {
667     GNUNET_assert (n->is_active == mq);
668     n->is_active = NULL;
669     if (success == GNUNET_YES)
670     {
671       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
672       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
673     }
674   }
675 #if DEBUG_TRANSPORT
676   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
677               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
678               (success == GNUNET_OK) ? "successful" : "FAILED");
679 #endif
680   if (NULL != mq->cont)
681     mq->cont (mq->cont_cls, success);
682   GNUNET_free (mq);
683 }
684
685
686 /**
687  * Check the ready list for the given neighbour and if a plugin is
688  * ready for transmission (and if we have a message), do so!
689  *
690  * @param n target peer for which to transmit
691  */
692 static void
693 try_transmission_to_peer (struct NeighbourMapEntry *n)
694 {
695   struct MessageQueue *mq;
696   struct GNUNET_TIME_Relative timeout;
697   ssize_t ret;
698
699   if (n->is_active != NULL)
700   {
701     GNUNET_break (0);
702     return;                     /* transmission already pending */
703   }
704   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
705   {
706     GNUNET_break (0);
707     return;                     /* currently waiting for bandwidth */
708   }
709   while (NULL != (mq = n->messages_head))
710   {
711     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
712     if (timeout.rel_value > 0)
713       break;
714     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
715     n->is_active = mq;
716     mq->n = n;
717     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
718   }
719   if (NULL == mq)
720     return;                     /* no more messages */
721
722   if (n->address == NULL)
723   {
724     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
725                 GNUNET_i2s (&n->id));
726     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
727     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
728     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
729     return;
730   }
731
732   if (GST_plugins_find (n->address->transport_name) == NULL)
733   {
734     GNUNET_break (0);
735     return;
736   }
737   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
738   n->is_active = mq;
739   mq->n = n;
740
741   if ((n->address->address_length == 0) && (n->session == NULL))
742   {
743     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
744                 GNUNET_i2s (&n->id));
745     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
746     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
747     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
748     return;
749   }
750
751   ret =
752       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
753                         timeout, n->session, n->address, GNUNET_YES,
754                         &transmit_send_continuation, mq);
755   if (ret == -1)
756   {
757     /* failure, but 'send' would not call continuation in this case,
758      * so we need to do it here! */
759     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
760   }
761
762 }
763
764
765 /**
766  * Task invoked to start a transmission to another peer.
767  *
768  * @param cls the 'struct NeighbourMapEntry'
769  * @param tc scheduler context
770  */
771 static void
772 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
773 {
774   struct NeighbourMapEntry *n = cls;
775
776   GNUNET_assert (NULL != lookup_neighbour (&n->id));
777   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
778   try_transmission_to_peer (n);
779 }
780
781
782 /**
783  * Initialize the neighbours subsystem.
784  *
785  * @param cls closure for callbacks
786  * @param connect_cb function to call if we connect to a peer
787  * @param disconnect_cb function to call if we disconnect from a peer
788  */
789 void
790 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
791                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
792 {
793   callback_cls = cls;
794   connect_notify_cb = connect_cb;
795   disconnect_notify_cb = disconnect_cb;
796   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
797 }
798
799
800 static void
801 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
802                       int result)
803 {
804 #if DEBUG_TRANSPORT
805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806               "Sending DISCONNECT message to peer `%4s': %i\n",
807               GNUNET_i2s (target), result);
808 #endif
809 }
810
811
812 static int
813 send_disconnect (const struct GNUNET_PeerIdentity *target,
814                  const struct GNUNET_HELLO_Address *address,
815                  struct Session *session)
816 {
817   size_t ret;
818   struct SessionDisconnectMessage disconnect_msg;
819
820 #if DEBUG_TRANSPORT
821   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
822               "Sending DISCONNECT message to peer `%4s'\n",
823               GNUNET_i2s (target));
824 #endif
825
826   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
827   disconnect_msg.header.type =
828       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
829   disconnect_msg.reserved = htonl (0);
830   disconnect_msg.purpose.size =
831       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
832              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
833              sizeof (struct GNUNET_TIME_AbsoluteNBO));
834   disconnect_msg.purpose.purpose =
835       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
836   disconnect_msg.timestamp =
837       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
838   disconnect_msg.public_key = GST_my_public_key;
839   GNUNET_assert (GNUNET_OK ==
840                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
841                                          &disconnect_msg.purpose,
842                                          &disconnect_msg.signature));
843
844   ret =
845       send_with_plugin (target, (const char *) &disconnect_msg,
846                         sizeof (disconnect_msg), UINT32_MAX,
847                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
848                         GNUNET_YES, &send_disconnect_cont, NULL);
849
850   if (ret == GNUNET_SYSERR)
851     return GNUNET_SYSERR;
852
853   GNUNET_STATISTICS_update (GST_stats,
854                             gettext_noop
855                             ("# peers disconnected due to external request"), 1,
856                             GNUNET_NO);
857   return GNUNET_OK;
858 }
859
860
861 /**
862  * Disconnect from the given neighbour, clean up the record.
863  *
864  * @param n neighbour to disconnect from
865  */
866 static void
867 disconnect_neighbour (struct NeighbourMapEntry *n)
868 {
869   struct MessageQueue *mq;
870   int previous_state;
871
872   previous_state = n->state;
873
874   if (is_disconnecting (n))
875     return;
876
877
878   /* send DISCONNECT MESSAGE */
879   if ((previous_state == S_CONNECTED) || is_connecting (n))
880   {
881     if (GNUNET_OK == send_disconnect (&n->id, n->address, n->session))
882       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
883                   GNUNET_i2s (&n->id));
884     else
885       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886                   "Could not send DISCONNECT_MSG to `%s'\n",
887                   GNUNET_i2s (&n->id));
888   }
889
890   change_state (n, S_DISCONNECT);
891
892   if (previous_state == S_CONNECTED)
893   {
894     GNUNET_assert (NULL != n->address);
895     if (n->address_state == USED)
896     {
897       GST_validation_set_address_use (n->address, n->session,
898                                       GNUNET_NO);
899
900       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
901       n->address_state = UNUSED;
902     }
903   }
904
905   if (n->address != NULL)
906   {
907     struct GNUNET_TRANSPORT_PluginFunctions *papi;
908
909     papi = GST_plugins_find (n->address->transport_name);
910     if (papi != NULL)
911       papi->disconnect (papi->cls, &n->id);
912   }
913   while (NULL != (mq = n->messages_head))
914   {
915     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
916     if (NULL != mq->cont)
917       mq->cont (mq->cont_cls, GNUNET_SYSERR);
918     GNUNET_free (mq);
919   }
920   if (NULL != n->is_active)
921   {
922     n->is_active->n = NULL;
923     n->is_active = NULL;
924   }
925
926   switch (previous_state)
927   {
928   case S_CONNECTED:
929 //      GNUNET_assert (neighbours_connected > 0);
930     neighbours_connected--;
931     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
932     GNUNET_SCHEDULER_cancel (n->keepalive_task);
933     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
934     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
935                               GNUNET_NO);
936     disconnect_notify_cb (callback_cls, &n->id);
937     break;
938   case S_FAST_RECONNECT:
939     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
940                               GNUNET_NO);
941     GNUNET_STATISTICS_update (GST_stats,
942                               gettext_noop ("# fast reconnects failed"), 1,
943                               GNUNET_NO);
944     disconnect_notify_cb (callback_cls, &n->id);
945   default:
946     break;
947   }
948
949   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
950
951   GNUNET_assert (GNUNET_YES ==
952                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
953                                                        &n->id.hashPubKey, n));
954   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
955   {
956     GNUNET_SCHEDULER_cancel (n->ats_suggest);
957     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
958   }
959   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
960   {
961     GNUNET_SCHEDULER_cancel (n->timeout_task);
962     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
963   }
964   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
965   {
966     GNUNET_SCHEDULER_cancel (n->transmission_task);
967     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
968   }
969   if (NULL != n->address)
970   {
971     GNUNET_HELLO_address_free (n->address);
972     n->address = NULL;
973   }
974   n->session = NULL;
975   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
976               GNUNET_i2s (&n->id), n);
977   GNUNET_free (n);
978 }
979
980
981 /**
982  * Peer has been idle for too long. Disconnect.
983  *
984  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
985  * @param tc scheduler context
986  */
987 static void
988 neighbour_timeout_task (void *cls,
989                         const struct GNUNET_SCHEDULER_TaskContext *tc)
990 {
991   struct NeighbourMapEntry *n = cls;
992
993   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
994
995   GNUNET_STATISTICS_update (GST_stats,
996                             gettext_noop
997                             ("# peers disconnected due to timeout"), 1,
998                             GNUNET_NO);
999   disconnect_neighbour (n);
1000 }
1001
1002
1003 /**
1004  * Send another keepalive message.
1005  *
1006  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1007  * @param tc scheduler context
1008  */
1009 static void
1010 neighbour_keepalive_task (void *cls,
1011                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1012 {
1013   struct NeighbourMapEntry *n = cls;
1014   struct GNUNET_MessageHeader m;
1015   int ret;
1016
1017   n->keepalive_task =
1018       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1019                                     &neighbour_keepalive_task, n);
1020
1021   GNUNET_assert (S_CONNECTED == n->state);
1022   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1023                             GNUNET_NO);
1024   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1025   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1026
1027
1028   ret =
1029       send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1030                         UINT32_MAX /* priority */ ,
1031                         GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1032                         GNUNET_YES, NULL, NULL);
1033
1034   n->expect_latency_response = GNUNET_NO;
1035   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1036   if (ret != GNUNET_SYSERR)
1037   {
1038     n->expect_latency_response = GNUNET_YES;
1039     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1040   }
1041
1042 }
1043
1044
1045 /**
1046  * Disconnect from the given neighbour.
1047  *
1048  * @param cls unused
1049  * @param key hash of neighbour's public key (not used)
1050  * @param value the 'struct NeighbourMapEntry' of the neighbour
1051  */
1052 static int
1053 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1054 {
1055   struct NeighbourMapEntry *n = value;
1056
1057 #if DEBUG_TRANSPORT
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1059               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1060 #endif
1061   if (S_CONNECTED == n->state)
1062     GNUNET_STATISTICS_update (GST_stats,
1063                               gettext_noop
1064                               ("# peers disconnected due to global disconnect"),
1065                               1, GNUNET_NO);
1066   disconnect_neighbour (n);
1067   return GNUNET_OK;
1068 }
1069
1070
1071 static void
1072 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1073 {
1074   struct NeighbourMapEntry *n = cls;
1075
1076   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1077
1078   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1079               "ATS did not suggested address to connect to peer `%s'\n",
1080               GNUNET_i2s (&n->id));
1081
1082   disconnect_neighbour (n);
1083 }
1084
1085 /**
1086  * Cleanup the neighbours subsystem.
1087  */
1088 void
1089 GST_neighbours_stop ()
1090 {
1091   // This can happen during shutdown
1092   if (neighbours == NULL)
1093   {
1094     return;
1095   }
1096
1097   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1098                                          NULL);
1099   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1100 //  GNUNET_assert (neighbours_connected == 0);
1101   neighbours = NULL;
1102   callback_cls = NULL;
1103   connect_notify_cb = NULL;
1104   disconnect_notify_cb = NULL;
1105 }
1106
1107 struct ContinutionContext
1108 {
1109   struct GNUNET_HELLO_Address *address;
1110
1111   struct Session *session;
1112 };
1113
1114 static void
1115 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1116                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1117 {
1118   struct QuotaSetMessage q_msg;
1119
1120 #if DEBUG_TRANSPORT
1121   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1122               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1123               ntohl (quota.value__), GNUNET_i2s (target));
1124 #endif
1125   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1126   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1127   q_msg.quota = quota;
1128   q_msg.peer = (*target);
1129   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1130 }
1131
1132 /**
1133  * We tried to send a SESSION_CONNECT message to another peer.  If this
1134  * succeeded, we change the state.  If it failed, we should tell
1135  * ATS to not use this address anymore (until it is re-validated).
1136  *
1137  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1138  * @param target peer to send the message to
1139  * @param success GNUNET_OK on success
1140  */
1141 static void
1142 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1143                            int success)
1144 {
1145   struct ContinutionContext *cc = cls;
1146   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1147
1148   if (GNUNET_YES != success)
1149   {
1150     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1151     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1152   }
1153   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1154   {
1155     GNUNET_HELLO_address_free (cc->address);
1156     GNUNET_free (cc);
1157     return;
1158   }
1159
1160   if ((GNUNET_YES == success) &&
1161       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1162   {
1163     change_state (n, S_CONNECT_SENT);
1164     GNUNET_HELLO_address_free (cc->address);
1165     GNUNET_free (cc);
1166     return;
1167   }
1168
1169   if ((GNUNET_NO == success) &&
1170       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1171   {
1172 #if DEBUG_TRANSPORT
1173     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1174                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1175                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1176 #endif
1177     change_state (n, S_NOT_CONNECTED);
1178     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1179       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1180     n->ats_suggest =
1181         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1182                                       n);
1183     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1184   }
1185   GNUNET_HELLO_address_free (cc->address);
1186   GNUNET_free (cc);
1187 }
1188
1189
1190 /**
1191  * We tried to switch addresses with an peer already connected. If it failed,
1192  * we should tell ATS to not use this address anymore (until it is re-validated).
1193  *
1194  * @param cls the 'struct NeighbourMapEntry'
1195  * @param target peer to send the message to
1196  * @param success GNUNET_OK on success
1197  */
1198 static void
1199 send_switch_address_continuation (void *cls,
1200                                   const struct GNUNET_PeerIdentity *target,
1201                                   int success)
1202 {
1203   struct ContinutionContext *cc = cls;
1204   struct NeighbourMapEntry *n;
1205
1206   if (neighbours == NULL)
1207   {
1208     GNUNET_HELLO_address_free (cc->address);
1209     GNUNET_free (cc);
1210     return;                     /* neighbour is going away */
1211   }
1212
1213   n = lookup_neighbour (&cc->address->peer);
1214   if ((n == NULL) || (is_disconnecting (n)))
1215   {
1216     GNUNET_HELLO_address_free (cc->address);
1217     GNUNET_free (cc);
1218     return;                     /* neighbour is going away */
1219   }
1220
1221   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1222   if (GNUNET_YES != success)
1223   {
1224 #if DEBUG_TRANSPORT
1225     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1226                 "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n",
1227                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1228 #endif
1229     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1230     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1231
1232     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1233       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1234     n->ats_suggest =
1235         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1236                                       n);
1237     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1238     GNUNET_HELLO_address_free (cc->address);
1239     GNUNET_free (cc);
1240     return;
1241   }
1242   /* Tell ATS that switching addresses was successful */
1243   switch (n->state)
1244   {
1245   case S_CONNECTED:
1246     if (n->address_state == FRESH)
1247     {
1248       GST_validation_set_address_use (cc->address, cc->session,
1249                                       GNUNET_YES);
1250       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1251       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1252       n->address_state = USED;
1253     }
1254     break;
1255   case S_FAST_RECONNECT:
1256 #if DEBUG_TRANSPORT
1257     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258                 "Successful fast reconnect to peer `%s'\n",
1259                 GNUNET_i2s (&n->id));
1260 #endif
1261     change_state (n, S_CONNECTED);
1262     neighbours_connected++;
1263     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1264                               GNUNET_NO);
1265
1266     if (n->address_state == FRESH)
1267     {
1268       GST_validation_set_address_use (cc->address, cc->session,
1269                                       GNUNET_YES);
1270       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1271       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1272       n->address_state = USED;
1273     }
1274
1275     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1276       n->keepalive_task =
1277           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1278
1279     /* Updating quotas */
1280     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1281     send_outbound_quota (target, n->bandwidth_out);
1282
1283   default:
1284     break;
1285   }
1286   GNUNET_HELLO_address_free (cc->address);
1287   GNUNET_free (cc);
1288 }
1289
1290
1291 /**
1292  * We tried to send a SESSION_CONNECT message to another peer.  If this
1293  * succeeded, we change the state.  If it failed, we should tell
1294  * ATS to not use this address anymore (until it is re-validated).
1295  *
1296  * @param cls the 'struct NeighbourMapEntry'
1297  * @param target peer to send the message to
1298  * @param success GNUNET_OK on success
1299  */
1300 static void
1301 send_connect_ack_continuation (void *cls,
1302                                const struct GNUNET_PeerIdentity *target,
1303                                int success)
1304 {
1305   struct ContinutionContext *cc = cls;
1306   struct NeighbourMapEntry *n;
1307
1308   if (neighbours == NULL)
1309   {
1310     GNUNET_HELLO_address_free (cc->address);
1311     GNUNET_free (cc);
1312     return;                     /* neighbour is going away */
1313   }
1314
1315   n = lookup_neighbour (&cc->address->peer);
1316   if ((n == NULL) || (is_disconnecting (n)))
1317   {
1318     GNUNET_HELLO_address_free (cc->address);
1319     GNUNET_free (cc);
1320     return;                     /* neighbour is going away */
1321   }
1322
1323   if (GNUNET_YES == success)
1324   {
1325     GNUNET_HELLO_address_free (cc->address);
1326     GNUNET_free (cc);
1327     return;                     /* sending successful */
1328   }
1329
1330   /* sending failed, ask for next address  */
1331 #if DEBUG_TRANSPORT
1332   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1333               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1334               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1335 #endif
1336   change_state (n, S_NOT_CONNECTED);
1337   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1338   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1339
1340   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1341     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1342   n->ats_suggest =
1343       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1344                                     n);
1345   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1346   GNUNET_HELLO_address_free (cc->address);
1347   GNUNET_free (cc);
1348 }
1349
1350
1351 /**
1352  * For an existing neighbour record, set the active connection to
1353  * the given address.
1354  *
1355  * @param peer identity of the peer to switch the address for
1356  * @param address address of the other peer, NULL if other peer
1357  *                       connected to us
1358  * @param session session to use (or NULL)
1359  * @param ats performance data
1360  * @param ats_count number of entries in ats
1361  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1362  *         connection is not up (yet)
1363  */
1364 int
1365 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1366                                        const struct GNUNET_HELLO_Address
1367                                        *address, struct Session *session,
1368                                        const struct GNUNET_ATS_Information *ats,
1369                                        uint32_t ats_count,
1370                                        struct GNUNET_BANDWIDTH_Value32NBO
1371                                        bandwidth_in,
1372                                        struct GNUNET_BANDWIDTH_Value32NBO
1373                                        bandwidth_out)
1374 {
1375   struct NeighbourMapEntry *n;
1376   struct SessionConnectMessage connect_msg;
1377   struct ContinutionContext *cc;
1378   size_t msg_len;
1379   size_t ret;
1380
1381   if (neighbours == NULL)
1382   {
1383     /* This can happen during shutdown */
1384     return GNUNET_NO;
1385   }
1386   n = lookup_neighbour (peer);
1387   if (NULL == n)
1388     return GNUNET_NO;
1389   if (n->state == S_DISCONNECT)
1390   {
1391     /* We are disconnecting, nothing to do here */
1392     return GNUNET_NO;
1393   }
1394   GNUNET_assert (address->transport_name != NULL);
1395   if ((session == NULL) && (0 == address->address_length))
1396   {
1397     GNUNET_break_op (0);
1398     /* FIXME: is this actually possible? When does this happen? */
1399     if (strlen (address->transport_name) > 0)
1400       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1401     GNUNET_ATS_suggest_address (GST_ats, peer);
1402     return GNUNET_NO;
1403   }
1404
1405   /* checks successful and neighbour != NULL */
1406 #if DEBUG_TRANSPORT
1407   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1408               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1409               GST_plugins_a2s (address), session, GNUNET_i2s (peer),
1410               print_state (n->state));
1411 #endif
1412   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1413   {
1414     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1415     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1416   }
1417   /* do not switch addresses just update quotas */
1418   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1419       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1420       (n->session == session))
1421   {
1422     n->bandwidth_in = bandwidth_in;
1423     n->bandwidth_out = bandwidth_out;
1424     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1425     send_outbound_quota (peer, n->bandwidth_out);
1426     return GNUNET_NO;
1427   }
1428   if (n->state == S_CONNECTED)
1429   {
1430     /* mark old address as no longer used */
1431     GNUNET_assert (NULL != n->address);
1432     if (n->address_state == USED)
1433     {
1434       GST_validation_set_address_use (n->address, n->session,
1435                                       GNUNET_NO);
1436       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1437       n->address_state = UNUSED;
1438     }
1439
1440   }
1441
1442   /* set new address */
1443   if (NULL != n->address)
1444     GNUNET_HELLO_address_free (n->address);
1445   n->address = GNUNET_HELLO_address_copy (address);
1446   n->address_state = FRESH;
1447   n->session = session;
1448   n->bandwidth_in = bandwidth_in;
1449   n->bandwidth_out = bandwidth_out;
1450   GNUNET_SCHEDULER_cancel (n->timeout_task);
1451   n->timeout_task =
1452       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1453                                     &neighbour_timeout_task, n);
1454   switch (n->state)
1455   {
1456   case S_NOT_CONNECTED:
1457   case S_CONNECT_SENT:
1458     msg_len = sizeof (struct SessionConnectMessage);
1459     connect_msg.header.size = htons (msg_len);
1460     connect_msg.header.type =
1461         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1462     connect_msg.reserved = htonl (0);
1463     connect_msg.timestamp =
1464         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1465
1466     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1467     cc->session = session;
1468     cc->address = GNUNET_HELLO_address_copy (address);
1469     ret =
1470         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1471                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1472                           address, GNUNET_YES, &send_connect_continuation, cc);
1473     return GNUNET_NO;
1474   case S_CONNECT_RECV:
1475     /* We received a CONNECT message and asked ATS for an address */
1476     msg_len = sizeof (struct SessionConnectMessage);
1477     connect_msg.header.size = htons (msg_len);
1478     connect_msg.header.type =
1479         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1480     connect_msg.reserved = htonl (0);
1481     connect_msg.timestamp =
1482         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1483     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1484     cc->session = session;
1485     cc->address = GNUNET_HELLO_address_copy (address);
1486     ret =
1487         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1488                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1489                           address, GNUNET_YES, &send_connect_ack_continuation,
1490                           cc);
1491     return GNUNET_NO;
1492   case S_CONNECTED:
1493   case S_FAST_RECONNECT:
1494     /* connected peer is switching addresses or tries fast reconnect */
1495     msg_len = sizeof (struct SessionConnectMessage);
1496     connect_msg.header.size = htons (msg_len);
1497     connect_msg.header.type =
1498         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1499     connect_msg.reserved = htonl (0);
1500     connect_msg.timestamp =
1501         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1502     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1503     cc->session = session;
1504     cc->address = GNUNET_HELLO_address_copy (address);
1505     ret =
1506         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1507                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1508                           address, GNUNET_YES,
1509                           &send_switch_address_continuation, cc);
1510     if (ret == GNUNET_SYSERR)
1511     {
1512       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1513                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1514                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1515     }
1516     return GNUNET_NO;
1517   default:
1518     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1519                 "Invalid connection state to switch addresses %u \n", n->state);
1520     GNUNET_break_op (0);
1521     return GNUNET_NO;
1522   }
1523 }
1524
1525
1526 /**
1527  * Obtain current latency information for the given neighbour.
1528  *
1529  * @param peer
1530  * @return observed latency of the address, FOREVER if the address was
1531  *         never successfully validated
1532  */
1533 struct GNUNET_TIME_Relative
1534 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1535 {
1536   struct NeighbourMapEntry *n;
1537
1538   n = lookup_neighbour (peer);
1539   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1540     return GNUNET_TIME_UNIT_FOREVER_REL;
1541
1542   return n->latency;
1543 }
1544
1545 /**
1546  * Obtain current address information for the given neighbour.
1547  *
1548  * @param peer
1549  * @return address currently used
1550  */
1551 struct GNUNET_HELLO_Address *
1552 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1553 {
1554   struct NeighbourMapEntry *n;
1555
1556   n = lookup_neighbour (peer);
1557   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1558     return NULL;
1559
1560   return n->address;
1561 }
1562
1563
1564
1565 /**
1566  * Create an entry in the neighbour map for the given peer
1567  *
1568  * @param peer peer to create an entry for
1569  * @return new neighbour map entry
1570  */
1571 static struct NeighbourMapEntry *
1572 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1573 {
1574   struct NeighbourMapEntry *n;
1575
1576 #if DEBUG_TRANSPORT
1577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1578               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1579 #endif
1580   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1581   n->id = *peer;
1582   n->state = S_NOT_CONNECTED;
1583   n->latency = GNUNET_TIME_relative_get_forever ();
1584   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1585                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1586                                  MAX_BANDWIDTH_CARRY_S);
1587   n->timeout_task =
1588       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1589                                     &neighbour_timeout_task, n);
1590   GNUNET_assert (GNUNET_OK ==
1591                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1592                                                     &n->id.hashPubKey, n,
1593                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1594   return n;
1595 }
1596
1597
1598 /**
1599  * Try to create a connection to the given target (eventually).
1600  *
1601  * @param target peer to try to connect to
1602  */
1603 void
1604 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1605 {
1606   struct NeighbourMapEntry *n;
1607
1608   // This can happen during shutdown
1609   if (neighbours == NULL)
1610   {
1611     return;
1612   }
1613 #if DEBUG_TRANSPORT
1614   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1615               GNUNET_i2s (target));
1616 #endif
1617   if (0 ==
1618       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1619   {
1620     /* my own hello */
1621     return;
1622   }
1623   n = lookup_neighbour (target);
1624
1625   if (NULL != n)
1626   {
1627     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1628       return;                   /* already connecting or connected */
1629     if (is_disconnecting (n))
1630       change_state (n, S_NOT_CONNECTED);
1631   }
1632
1633
1634   if (n == NULL)
1635     n = setup_neighbour (target);
1636 #if DEBUG_TRANSPORT
1637   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1638               "Asking ATS for suggested address to connect to peer `%s'\n",
1639               GNUNET_i2s (&n->id));
1640 #endif
1641
1642   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1643 }
1644
1645 /**
1646  * Test if we're connected to the given peer.
1647  *
1648  * @param target peer to test
1649  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1650  */
1651 int
1652 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1653 {
1654   struct NeighbourMapEntry *n;
1655
1656   // This can happen during shutdown
1657   if (neighbours == NULL)
1658   {
1659     return GNUNET_NO;
1660   }
1661
1662   n = lookup_neighbour (target);
1663
1664   if ((NULL == n) || (S_CONNECTED != n->state))
1665     return GNUNET_NO;           /* not connected */
1666   return GNUNET_YES;
1667 }
1668
1669 /**
1670  * A session was terminated. Take note.
1671  *
1672  * @param peer identity of the peer where the session died
1673  * @param session session that is gone
1674  */
1675 void
1676 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1677                                    struct Session *session)
1678 {
1679   struct NeighbourMapEntry *n;
1680
1681   if (neighbours == NULL)
1682   {
1683     /* This can happen during shutdown */
1684     return;
1685   }
1686
1687 #if DEBUG_TRANSPORT
1688   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Session %X to peer `%s' ended \n",
1689               session, GNUNET_i2s (peer));
1690 #endif
1691
1692   n = lookup_neighbour (peer);
1693   if (NULL == n)
1694     return;
1695   if (session != n->session)
1696     return;                     /* doesn't affect us */
1697   if (n->state == S_CONNECTED)
1698   {
1699     if (n->address_state == USED)
1700     {
1701       GST_validation_set_address_use (n->address, n->session,
1702                                       GNUNET_NO);
1703       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1704       n->address_state = UNUSED;
1705     }
1706   }
1707
1708   if (NULL != n->address)
1709   {
1710     GNUNET_HELLO_address_free (n->address);
1711     n->address = NULL;
1712   }
1713   n->session = NULL;
1714
1715   /* not connected anymore anyway, shouldn't matter */
1716   if (S_CONNECTED != n->state)
1717     return;
1718
1719   /* connected, try fast reconnect */
1720 #if DEBUG_TRANSPORT
1721   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1722               GNUNET_i2s (peer));
1723 #endif
1724   change_state (n, S_FAST_RECONNECT);
1725   GNUNET_assert (neighbours_connected > 0);
1726   neighbours_connected--;
1727
1728   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1729   {
1730     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1731     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1732   }
1733
1734   /* We are connected, so ask ATS to switch addresses */
1735   GNUNET_SCHEDULER_cancel (n->timeout_task);
1736   n->timeout_task =
1737       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1738                                     &neighbour_timeout_task, n);
1739   /* try QUICKLY to re-establish a connection, reduce timeout! */
1740   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1741     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1742   n->ats_suggest =
1743       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1744                                     n);
1745   GNUNET_ATS_suggest_address (GST_ats, peer);
1746 }
1747
1748
1749 /**
1750  * Transmit a message to the given target using the active connection.
1751  *
1752  * @param target destination
1753  * @param msg message to send
1754  * @param msg_size number of bytes in msg
1755  * @param timeout when to fail with timeout
1756  * @param cont function to call when done
1757  * @param cont_cls closure for 'cont'
1758  */
1759 void
1760 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1761                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1762                      GST_NeighbourSendContinuation cont, void *cont_cls)
1763 {
1764   struct NeighbourMapEntry *n;
1765   struct MessageQueue *mq;
1766
1767   // This can happen during shutdown
1768   if (neighbours == NULL)
1769   {
1770     return;
1771   }
1772
1773   n = lookup_neighbour (target);
1774   if ((n == NULL) || (!is_connected (n)))
1775   {
1776     GNUNET_STATISTICS_update (GST_stats,
1777                               gettext_noop
1778                               ("# messages not sent (no such peer or not connected)"),
1779                               1, GNUNET_NO);
1780 #if DEBUG_TRANSPORT
1781     if (n == NULL)
1782       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783                   "Could not send message to peer `%s': unknown neighbour",
1784                   GNUNET_i2s (target));
1785     else if (!is_connected (n))
1786       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1787                   "Could not send message to peer `%s': not connected\n",
1788                   GNUNET_i2s (target));
1789 #endif
1790     if (NULL != cont)
1791       cont (cont_cls, GNUNET_SYSERR);
1792     return;
1793   }
1794
1795   if ((n->session == NULL) && (n->address == NULL))
1796   {
1797     GNUNET_STATISTICS_update (GST_stats,
1798                               gettext_noop
1799                               ("# messages not sent (no such peer or not connected)"),
1800                               1, GNUNET_NO);
1801 #if DEBUG_TRANSPORT
1802     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1803                 "Could not send message to peer `%s': no address available\n",
1804                 GNUNET_i2s (target));
1805 #endif
1806
1807     if (NULL != cont)
1808       cont (cont_cls, GNUNET_SYSERR);
1809     return;
1810   }
1811
1812   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1813   GNUNET_STATISTICS_update (GST_stats,
1814                             gettext_noop
1815                             ("# bytes in message queue for other peers"),
1816                             msg_size, GNUNET_NO);
1817   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1818   mq->cont = cont;
1819   mq->cont_cls = cont_cls;
1820   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1821   memcpy (&mq[1], msg, msg_size);
1822   mq->message_buf = (const char *) &mq[1];
1823   mq->message_buf_size = msg_size;
1824   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1825   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1826
1827   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1828       (NULL == n->is_active))
1829     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1830 }
1831
1832
1833 /**
1834  * We have received a message from the given sender.  How long should
1835  * we delay before receiving more?  (Also used to keep the peer marked
1836  * as live).
1837  *
1838  * @param sender sender of the message
1839  * @param size size of the message
1840  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1841  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1842  *                   GNUNET_SYSERR if the connection is not fully up yet
1843  * @return how long to wait before reading more from this sender
1844  */
1845 struct GNUNET_TIME_Relative
1846 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1847                                         *sender, ssize_t size, int *do_forward)
1848 {
1849   struct NeighbourMapEntry *n;
1850   struct GNUNET_TIME_Relative ret;
1851
1852   // This can happen during shutdown
1853   if (neighbours == NULL)
1854   {
1855     return GNUNET_TIME_UNIT_FOREVER_REL;
1856   }
1857
1858   n = lookup_neighbour (sender);
1859   if (n == NULL)
1860   {
1861     GST_neighbours_try_connect (sender);
1862     n = lookup_neighbour (sender);
1863     if (NULL == n)
1864     {
1865       GNUNET_STATISTICS_update (GST_stats,
1866                                 gettext_noop
1867                                 ("# messages discarded due to lack of neighbour record"),
1868                                 1, GNUNET_NO);
1869       *do_forward = GNUNET_NO;
1870       return GNUNET_TIME_UNIT_ZERO;
1871     }
1872   }
1873   if (!is_connected (n))
1874   {
1875     *do_forward = GNUNET_SYSERR;
1876     return GNUNET_TIME_UNIT_ZERO;
1877   }
1878   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1879   {
1880     n->quota_violation_count++;
1881 #if DEBUG_TRANSPORT
1882     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1883                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1884                 n->in_tracker.available_bytes_per_s__,
1885                 n->quota_violation_count);
1886 #endif
1887     /* Discount 32k per violation */
1888     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1889   }
1890   else
1891   {
1892     if (n->quota_violation_count > 0)
1893     {
1894       /* try to add 32k back */
1895       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1896       n->quota_violation_count--;
1897     }
1898   }
1899   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1900   {
1901     GNUNET_STATISTICS_update (GST_stats,
1902                               gettext_noop
1903                               ("# bandwidth quota violations by other peers"),
1904                               1, GNUNET_NO);
1905     *do_forward = GNUNET_NO;
1906     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1907   }
1908   *do_forward = GNUNET_YES;
1909   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1910   if (ret.rel_value > 0)
1911   {
1912 #if DEBUG_TRANSPORT
1913     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1914                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1915                 (unsigned long long) n->in_tracker.
1916                 consumption_since_last_update__,
1917                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1918                 (unsigned long long) ret.rel_value);
1919 #endif
1920     GNUNET_STATISTICS_update (GST_stats,
1921                               gettext_noop ("# ms throttling suggested"),
1922                               (int64_t) ret.rel_value, GNUNET_NO);
1923   }
1924   return ret;
1925 }
1926
1927
1928 /**
1929  * Keep the connection to the given neighbour alive longer,
1930  * we received a KEEPALIVE (or equivalent).
1931  *
1932  * @param neighbour neighbour to keep alive
1933  */
1934 void
1935 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1936 {
1937   struct NeighbourMapEntry *n;
1938
1939   // This can happen during shutdown
1940   if (neighbours == NULL)
1941   {
1942     return;
1943   }
1944
1945   n = lookup_neighbour (neighbour);
1946   if (NULL == n)
1947   {
1948     GNUNET_STATISTICS_update (GST_stats,
1949                               gettext_noop
1950                               ("# KEEPALIVE messages discarded (not connected)"),
1951                               1, GNUNET_NO);
1952     return;
1953   }
1954   GNUNET_SCHEDULER_cancel (n->timeout_task);
1955   n->timeout_task =
1956       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1957                                     &neighbour_timeout_task, n);
1958
1959   /* send reply to measure latency */
1960   if (S_CONNECTED != n->state)
1961     return;
1962
1963   struct GNUNET_MessageHeader m;
1964
1965   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1966   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
1967
1968   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1969                     UINT32_MAX /* priority */ ,
1970                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1971                     GNUNET_YES, NULL, NULL);
1972 }
1973
1974 /**
1975  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
1976  * to this peer
1977  *
1978  * @param neighbour neighbour to keep alive
1979  * @param ats performance data
1980  * @param ats_count number of entries in ats
1981  */
1982 void
1983 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
1984                                    const struct GNUNET_ATS_Information *ats,
1985                                    uint32_t ats_count)
1986 {
1987   struct NeighbourMapEntry *n;
1988   struct GNUNET_ATS_Information *ats_new;
1989   uint32_t latency;
1990
1991   if (neighbours == NULL)
1992   {
1993     // This can happen during shutdown
1994     return;
1995   }
1996
1997   n = lookup_neighbour (neighbour);
1998   if (NULL == n)
1999   {
2000     GNUNET_STATISTICS_update (GST_stats,
2001                               gettext_noop
2002                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2003                               1, GNUNET_NO);
2004     return;
2005   }
2006   if (n->expect_latency_response != GNUNET_YES)
2007   {
2008     GNUNET_STATISTICS_update (GST_stats,
2009                               gettext_noop
2010                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2011                               1, GNUNET_NO);
2012     return;
2013   }
2014   n->expect_latency_response = GNUNET_NO;
2015
2016   GNUNET_assert (n->keep_alive_sent.abs_value !=
2017                  GNUNET_TIME_absolute_get_zero ().abs_value);
2018   n->latency =
2019       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2020                                            GNUNET_TIME_absolute_get ());
2021 #if DEBUG_TRANSPORT
2022   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2023               GNUNET_i2s (&n->id), n->latency.rel_value);
2024 #endif
2025
2026
2027   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2028   {
2029     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2030   }
2031   else
2032   {
2033     ats_new =
2034         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2035                        (ats_count + 1));
2036     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2037
2038     /* add latency */
2039     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2040     if (n->latency.rel_value > UINT32_MAX)
2041       latency = UINT32_MAX;
2042     else
2043       latency = n->latency.rel_value;
2044     ats_new[ats_count].value = htonl (latency);
2045
2046     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2047                                ats_count + 1);
2048     GNUNET_free (ats_new);
2049   }
2050 }
2051
2052
2053 /**
2054  * Change the incoming quota for the given peer.
2055  *
2056  * @param neighbour identity of peer to change qutoa for
2057  * @param quota new quota
2058  */
2059 void
2060 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2061                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2062 {
2063   struct NeighbourMapEntry *n;
2064
2065   // This can happen during shutdown
2066   if (neighbours == NULL)
2067   {
2068     return;
2069   }
2070
2071   n = lookup_neighbour (neighbour);
2072   if (n == NULL)
2073   {
2074     GNUNET_STATISTICS_update (GST_stats,
2075                               gettext_noop
2076                               ("# SET QUOTA messages ignored (no such peer)"),
2077                               1, GNUNET_NO);
2078     return;
2079   }
2080 #if DEBUG_TRANSPORT
2081   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2082               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2083               ntohl (quota.value__), GNUNET_i2s (&n->id));
2084 #endif
2085   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2086   if (0 != ntohl (quota.value__))
2087     return;
2088 #if DEBUG_TRANSPORT
2089   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2090               GNUNET_i2s (&n->id), "SET_QUOTA");
2091 #endif
2092   if (is_connected (n))
2093     GNUNET_STATISTICS_update (GST_stats,
2094                               gettext_noop ("# disconnects due to quota of 0"),
2095                               1, GNUNET_NO);
2096   disconnect_neighbour (n);
2097 }
2098
2099
2100 /**
2101  * Closure for the neighbours_iterate function.
2102  */
2103 struct IteratorContext
2104 {
2105   /**
2106    * Function to call on each connected neighbour.
2107    */
2108   GST_NeighbourIterator cb;
2109
2110   /**
2111    * Closure for 'cb'.
2112    */
2113   void *cb_cls;
2114 };
2115
2116
2117 /**
2118  * Call the callback from the closure for each connected neighbour.
2119  *
2120  * @param cls the 'struct IteratorContext'
2121  * @param key the hash of the public key of the neighbour
2122  * @param value the 'struct NeighbourMapEntry'
2123  * @return GNUNET_OK (continue to iterate)
2124  */
2125 static int
2126 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2127 {
2128   struct IteratorContext *ic = cls;
2129   struct NeighbourMapEntry *n = value;
2130
2131   if (!is_connected (n))
2132     return GNUNET_OK;
2133
2134   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2135   return GNUNET_OK;
2136 }
2137
2138
2139 /**
2140  * Iterate over all connected neighbours.
2141  *
2142  * @param cb function to call
2143  * @param cb_cls closure for cb
2144  */
2145 void
2146 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2147 {
2148   struct IteratorContext ic;
2149
2150   // This can happen during shutdown
2151   if (neighbours == NULL)
2152   {
2153     return;
2154   }
2155
2156   ic.cb = cb;
2157   ic.cb_cls = cb_cls;
2158   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2159 }
2160
2161 /**
2162  * If we have an active connection to the given target, it must be shutdown.
2163  *
2164  * @param target peer to disconnect from
2165  */
2166 void
2167 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2168 {
2169   struct NeighbourMapEntry *n;
2170
2171   // This can happen during shutdown
2172   if (neighbours == NULL)
2173   {
2174     return;
2175   }
2176
2177   n = lookup_neighbour (target);
2178   if (NULL == n)
2179     return;                     /* not active */
2180   disconnect_neighbour (n);
2181 }
2182
2183
2184 /**
2185  * We received a disconnect message from the given peer,
2186  * validate and process.
2187  *
2188  * @param peer sender of the message
2189  * @param msg the disconnect message
2190  */
2191 void
2192 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2193                                           *peer,
2194                                           const struct GNUNET_MessageHeader
2195                                           *msg)
2196 {
2197   struct NeighbourMapEntry *n;
2198   const struct SessionDisconnectMessage *sdm;
2199   GNUNET_HashCode hc;
2200
2201 #if DEBUG_TRANSPORT
2202   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2203               "Received DISCONNECT message from peer `%s'\n",
2204               GNUNET_i2s (peer));
2205 #endif
2206
2207   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2208   {
2209     // GNUNET_break_op (0);
2210     GNUNET_STATISTICS_update (GST_stats,
2211                               gettext_noop
2212                               ("# disconnect messages ignored (old format)"), 1,
2213                               GNUNET_NO);
2214     return;
2215   }
2216   sdm = (const struct SessionDisconnectMessage *) msg;
2217   n = lookup_neighbour (peer);
2218   if (NULL == n)
2219     return;                     /* gone already */
2220   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2221       n->connect_ts.abs_value)
2222   {
2223     GNUNET_STATISTICS_update (GST_stats,
2224                               gettext_noop
2225                               ("# disconnect messages ignored (timestamp)"), 1,
2226                               GNUNET_NO);
2227     return;
2228   }
2229   GNUNET_CRYPTO_hash (&sdm->public_key,
2230                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2231                       &hc);
2232   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2233   {
2234     GNUNET_break_op (0);
2235     return;
2236   }
2237   if (ntohl (sdm->purpose.size) !=
2238       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2239       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2240       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2241   {
2242     GNUNET_break_op (0);
2243     return;
2244   }
2245   if (GNUNET_OK !=
2246       GNUNET_CRYPTO_rsa_verify
2247       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2248        &sdm->signature, &sdm->public_key))
2249   {
2250     GNUNET_break_op (0);
2251     return;
2252   }
2253   GST_neighbours_force_disconnect (peer);
2254 }
2255
2256
2257 /**
2258  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2259  * Consider switching to it.
2260  *
2261  * @param message possibly a 'struct SessionConnectMessage' (check format)
2262  * @param peer identity of the peer to switch the address for
2263  * @param address address of the other peer, NULL if other peer
2264  *                       connected to us
2265  * @param session session to use (or NULL)
2266  * @param ats performance data
2267  * @param ats_count number of entries in ats
2268  */
2269 void
2270 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2271                                    const struct GNUNET_PeerIdentity *peer,
2272                                    const struct GNUNET_HELLO_Address *address,
2273                                    struct Session *session,
2274                                    const struct GNUNET_ATS_Information *ats,
2275                                    uint32_t ats_count)
2276 {
2277   const struct SessionConnectMessage *scm;
2278   struct GNUNET_MessageHeader msg;
2279   struct NeighbourMapEntry *n;
2280   size_t msg_len;
2281   size_t ret;
2282
2283 #if DEBUG_TRANSPORT
2284   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2285               "Received CONNECT_ACK message from peer `%s'\n",
2286               GNUNET_i2s (peer));
2287 #endif
2288
2289   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2290   {
2291     GNUNET_break_op (0);
2292     return;
2293   }
2294   scm = (const struct SessionConnectMessage *) message;
2295   GNUNET_break_op (ntohl (scm->reserved) == 0);
2296   n = lookup_neighbour (peer);
2297   if (NULL == n)
2298   {
2299     /* we did not send 'CONNECT' -- at least not recently */
2300     GNUNET_STATISTICS_update (GST_stats,
2301                               gettext_noop
2302                               ("# unexpected CONNECT_ACK messages (no peer)"),
2303                               1, GNUNET_NO);
2304     return;
2305   }
2306
2307   /* Additional check
2308    *
2309    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2310    *
2311    * We also received an CONNECT message, switched from SENDT to RECV and
2312    * ATS already suggested us an address after a successful blacklist check
2313    */
2314   if ((n->state != S_CONNECT_SENT) &&
2315       ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2316   {
2317     GNUNET_STATISTICS_update (GST_stats,
2318                               gettext_noop
2319                               ("# unexpected CONNECT_ACK messages"), 1,
2320                               GNUNET_NO);
2321     return;
2322   }
2323
2324   change_state (n, S_CONNECTED);
2325
2326   if (NULL != session)
2327   {
2328     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2329                      "transport-ats",
2330                      "Giving ATS session %p of plugin %s for peer %s\n",
2331                      session, address->transport_name, GNUNET_i2s (peer));
2332   }
2333   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2334   GNUNET_assert (NULL != n->address);
2335   if (n->address_state == FRESH)
2336   {
2337     GST_validation_set_address_use (n->address, n->session, GNUNET_YES);
2338     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2339     n->address_state = USED;
2340   }
2341
2342   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2343
2344   /* send ACK (ACK) */
2345   msg_len = sizeof (msg);
2346   msg.size = htons (msg_len);
2347   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2348
2349   ret =
2350       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2351                         GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
2352                         GNUNET_YES, NULL, NULL);
2353
2354   if (ret == GNUNET_SYSERR)
2355     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2356                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2357                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2358
2359
2360   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2361     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2362
2363   neighbours_connected++;
2364   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2365                             GNUNET_NO);
2366 #if DEBUG_TRANSPORT
2367   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2368               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2369               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2370               __LINE__);
2371 #endif
2372   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2373   send_outbound_quota (peer, n->bandwidth_out);
2374
2375 }
2376
2377
2378 void
2379 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2380                            const struct GNUNET_PeerIdentity *peer,
2381                            const struct GNUNET_HELLO_Address *address,
2382                            struct Session *session,
2383                            const struct GNUNET_ATS_Information *ats,
2384                            uint32_t ats_count)
2385 {
2386   struct NeighbourMapEntry *n;
2387
2388 #if DEBUG_TRANSPORT
2389   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2390               GNUNET_i2s (peer));
2391 #endif
2392
2393   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2394   {
2395     GNUNET_break_op (0);
2396     return;
2397   }
2398   n = lookup_neighbour (peer);
2399   if (NULL == n)
2400   {
2401     send_disconnect (peer, address, session);
2402     GNUNET_break (0);
2403     return;
2404   }
2405   if (S_CONNECTED == n->state)
2406     return;
2407   if (!is_connecting (n))
2408   {
2409     GNUNET_STATISTICS_update (GST_stats,
2410                               gettext_noop ("# unexpected ACK messages"), 1,
2411                               GNUNET_NO);
2412     return;
2413   }
2414   change_state (n, S_CONNECTED);
2415   if (NULL != session)
2416     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2417                      "transport-ats",
2418                      "Giving ATS session %p of plugin %s for peer %s\n",
2419                      session, address->transport_name, GNUNET_i2s (peer));
2420   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2421   GNUNET_assert (n->address != NULL);
2422   if (n->address_state == FRESH)
2423   {
2424     GST_validation_set_address_use (n->address, n->session, GNUNET_YES);
2425     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2426     n->address_state = USED;
2427   }
2428
2429   neighbours_connected++;
2430   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2431                             GNUNET_NO);
2432
2433   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2434   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2435     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2436 #if DEBUG_TRANSPORT
2437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2438               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2439               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2440               __LINE__);
2441 #endif
2442   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2443   send_outbound_quota (peer, n->bandwidth_out);
2444 }
2445
2446 struct BlackListCheckContext
2447 {
2448   struct GNUNET_ATS_Information *ats;
2449
2450   uint32_t ats_count;
2451
2452   struct Session *session;
2453
2454   struct GNUNET_HELLO_Address *address;
2455
2456   struct GNUNET_TIME_Absolute ts;
2457 };
2458
2459
2460 static void
2461 handle_connect_blacklist_cont (void *cls,
2462                                const struct GNUNET_PeerIdentity *peer,
2463                                int result)
2464 {
2465   struct NeighbourMapEntry *n;
2466   struct BlackListCheckContext *bcc = cls;
2467
2468 #if DEBUG_TRANSPORT
2469   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2470               "Blacklist check due to CONNECT message: `%s'\n",
2471               GNUNET_i2s (peer),
2472               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2473 #endif
2474
2475   /* not allowed */
2476   if (GNUNET_OK != result)
2477   {
2478     GNUNET_HELLO_address_free (bcc->address);
2479     GNUNET_free (bcc);
2480     return;
2481   }
2482
2483   n = lookup_neighbour (peer);
2484   if (NULL == n)
2485     n = setup_neighbour (peer);
2486
2487   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2488   {
2489     if (NULL != bcc->session)
2490       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2491                        "transport-ats",
2492                        "Giving ATS session %p of address `%s' for peer %s\n",
2493                        bcc->session, GST_plugins_a2s (bcc->address),
2494                        GNUNET_i2s (peer));
2495     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2496
2497     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2498                                bcc->ats_count);
2499     n->connect_ts = bcc->ts;
2500   }
2501
2502   GNUNET_HELLO_address_free (bcc->address);
2503   GNUNET_free (bcc);
2504
2505   if (n->state != S_CONNECT_RECV)
2506     change_state (n, S_CONNECT_RECV);
2507
2508
2509   /* Ask ATS for an address to connect via that address */
2510   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2511     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2512   n->ats_suggest =
2513       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2514                                     n);
2515   GNUNET_ATS_suggest_address (GST_ats, peer);
2516 }
2517
2518 /**
2519  * We received a 'SESSION_CONNECT' message from the other peer.
2520  * Consider switching to it.
2521  *
2522  * @param message possibly a 'struct SessionConnectMessage' (check format)
2523  * @param peer identity of the peer to switch the address for
2524  * @param address address of the other peer, NULL if other peer
2525  *                       connected to us
2526  * @param session session to use (or NULL)
2527  * @param ats performance data
2528  * @param ats_count number of entries in ats (excluding 0-termination)
2529  */
2530 void
2531 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2532                                const struct GNUNET_PeerIdentity *peer,
2533                                const struct GNUNET_HELLO_Address *address,
2534                                struct Session *session,
2535                                const struct GNUNET_ATS_Information *ats,
2536                                uint32_t ats_count)
2537 {
2538   const struct SessionConnectMessage *scm;
2539   struct BlackListCheckContext *bcc = NULL;
2540   struct NeighbourMapEntry *n;
2541
2542 #if DEBUG_TRANSPORT
2543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2544               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2545 #endif
2546
2547   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2548   {
2549     GNUNET_break_op (0);
2550     return;
2551   }
2552
2553   scm = (const struct SessionConnectMessage *) message;
2554   GNUNET_break_op (ntohl (scm->reserved) == 0);
2555
2556   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2557
2558   n = lookup_neighbour (peer);
2559   if ((n != NULL) && (S_CONNECTED == n->state))
2560   {
2561     /* connected peer switches addresses */
2562     return;
2563   }
2564
2565
2566   /* we are not connected to this peer */
2567   /* do blacklist check */
2568   bcc =
2569       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2570                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2571   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2572   bcc->ats_count = ats_count + 1;
2573   bcc->address = GNUNET_HELLO_address_copy (address);
2574   bcc->session = session;
2575   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2576   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2577   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2578   bcc->ats[ats_count].value =
2579       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2580   GST_blacklist_test_allowed (peer, address->transport_name,
2581                               handle_connect_blacklist_cont, bcc);
2582 }
2583
2584
2585 /* end of file gnunet-service-transport_neighbours.c */