fixes for mantis #1988
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
61
62 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
63
64 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
65
66
67 /**
68  * Entry in neighbours.
69  */
70 struct NeighbourMapEntry;
71
72 /**
73  * Message a peer sends to another to indicate its
74  * preference for communicating via a particular
75  * session (and the desire to establish a real
76  * connection).
77  */
78 struct SessionConnectMessage
79 {
80   /**
81    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
82    */
83   struct GNUNET_MessageHeader header;
84
85   /**
86    * Always zero.
87    */
88   uint32_t reserved GNUNET_PACKED;
89
90   /**
91    * Absolute time at the sender.  Only the most recent connect
92    * message implies which session is preferred by the sender.
93    */
94   struct GNUNET_TIME_AbsoluteNBO timestamp;
95
96 };
97
98
99 struct SessionDisconnectMessage
100 {
101   /**
102    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
103    */
104   struct GNUNET_MessageHeader header;
105
106   /**
107    * Always zero.
108    */
109   uint32_t reserved GNUNET_PACKED;
110
111   /**
112    * Purpose of the signature.  Extends over the timestamp.
113    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
114    */
115   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
116
117   /**
118    * Absolute time at the sender.  Only the most recent connect
119    * message implies which session is preferred by the sender.
120    */
121   struct GNUNET_TIME_AbsoluteNBO timestamp;
122
123   /**
124    * Public key of the sender.
125    */
126   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
127
128   /**
129    * Signature of the peer that sends us the disconnect.  Only
130    * valid if the timestamp is AFTER the timestamp from the
131    * corresponding 'CONNECT' message.
132    */
133   struct GNUNET_CRYPTO_RsaSignature signature;
134
135 };
136
137
138 /**
139  * For each neighbour we keep a list of messages
140  * that we still want to transmit to the neighbour.
141  */
142 struct MessageQueue
143 {
144
145   /**
146    * This is a doubly linked list.
147    */
148   struct MessageQueue *next;
149
150   /**
151    * This is a doubly linked list.
152    */
153   struct MessageQueue *prev;
154
155   /**
156    * Once this message is actively being transmitted, which
157    * neighbour is it associated with?
158    */
159   struct NeighbourMapEntry *n;
160
161   /**
162    * Function to call once we're done.
163    */
164   GST_NeighbourSendContinuation cont;
165
166   /**
167    * Closure for 'cont'
168    */
169   void *cont_cls;
170
171   /**
172    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
173    * stuck together in memory.  Allocated at the end of this struct.
174    */
175   const char *message_buf;
176
177   /**
178    * Size of the message buf
179    */
180   size_t message_buf_size;
181
182   /**
183    * At what time should we fail?
184    */
185   struct GNUNET_TIME_Absolute timeout;
186
187 };
188
189
190 enum State
191 {
192   /**
193    * fresh peer or completely disconnected
194    */
195   S_NOT_CONNECTED,
196
197   /**
198    * sent CONNECT message to other peer, waiting for CONNECT_ACK
199    */
200   S_CONNECT_SENT,
201
202   /**
203    * received CONNECT message to other peer, sending CONNECT_ACK
204    */
205   S_CONNECT_RECV,
206
207   /**
208    * received ACK or payload
209    */
210   S_CONNECTED,
211
212   /**
213    * connection ended, fast reconnect
214    */
215   S_FAST_RECONNECT,
216
217   /**
218    * Disconnect in progress
219    */
220   S_DISCONNECT
221 };
222
223 enum Address_State
224 {
225   USED,
226   UNUSED,
227   FRESH,
228 };
229
230
231 /**
232  * Entry in neighbours.
233  */
234 struct NeighbourMapEntry
235 {
236
237   /**
238    * Head of list of messages we would like to send to this peer;
239    * must contain at most one message per client.
240    */
241   struct MessageQueue *messages_head;
242
243   /**
244    * Tail of list of messages we would like to send to this peer; must
245    * contain at most one message per client.
246    */
247   struct MessageQueue *messages_tail;
248
249   /**
250    * Are we currently trying to send a message? If so, which one?
251    */
252   struct MessageQueue *is_active;
253
254   /**
255    * Active session for communicating with the peer.
256    */
257   struct Session *session;
258
259   /**
260    * Address we currently use.
261    */
262   struct GNUNET_HELLO_Address *address;
263
264   /**
265    * Identity of this neighbour.
266    */
267   struct GNUNET_PeerIdentity id;
268
269   /**
270    * ID of task scheduled to run when this peer is about to
271    * time out (will free resources associated with the peer).
272    */
273   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
274
275   /**
276    * ID of task scheduled to send keepalives.
277    */
278   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
279
280   /**
281    * ID of task scheduled to run when we should try transmitting
282    * the head of the message queue.
283    */
284   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
285
286   /**
287    * Tracker for inbound bandwidth.
288    */
289   struct GNUNET_BANDWIDTH_Tracker in_tracker;
290
291   /**
292    * Inbound bandwidth from ATS, activated when connection is up
293    */
294   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
295
296   /**
297    * Inbound bandwidth from ATS, activated when connection is up
298    */
299   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
300
301   /**
302    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
303    */
304   struct GNUNET_TIME_Absolute connect_ts;
305
306   /**
307    * When did we sent the last keep-alive message?
308    */
309   struct GNUNET_TIME_Absolute keep_alive_sent;
310
311   /**
312    * Latest calculated latency value
313    */
314   struct GNUNET_TIME_Relative latency;
315
316   /**
317    * Timeout for ATS
318    * We asked ATS for a new address for this peer
319    */
320   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
321
322   /**
323    * Task the resets the peer state after due to an pending
324    * unsuccessful connection setup
325    */
326   GNUNET_SCHEDULER_TaskIdentifier state_reset;
327
328
329   /**
330    * How often has the other peer (recently) violated the inbound
331    * traffic limit?  Incremented by 10 per violation, decremented by 1
332    * per non-violation (for each time interval).
333    */
334   unsigned int quota_violation_count;
335
336
337   /**
338    * The current state of the peer
339    * Element of enum State
340    */
341   int state;
342
343   /**
344    * Did we sent an KEEP_ALIVE message and are we expecting a response?
345    */
346   int expect_latency_response;
347   int address_state;
348 };
349
350
351 /**
352  * All known neighbours and their HELLOs.
353  */
354 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
355
356 /**
357  * Closure for connect_notify_cb and disconnect_notify_cb
358  */
359 static void *callback_cls;
360
361 /**
362  * Function to call when we connected to a neighbour.
363  */
364 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
365
366 /**
367  * Function to call when we disconnected from a neighbour.
368  */
369 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
370
371 /**
372  * counter for connected neighbours
373  */
374 static int neighbours_connected;
375
376 /**
377  * Lookup a neighbour entry in the neighbours hash map.
378  *
379  * @param pid identity of the peer to look up
380  * @return the entry, NULL if there is no existing record
381  */
382 static struct NeighbourMapEntry *
383 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
384 {
385   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
386 }
387
388 #define change_state(n, state, ...) change (n, state, __LINE__)
389
390 static int
391 is_connecting (struct NeighbourMapEntry *n)
392 {
393   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
394     return GNUNET_YES;
395   return GNUNET_NO;
396 }
397
398 static int
399 is_connected (struct NeighbourMapEntry *n)
400 {
401   if (n->state == S_CONNECTED)
402     return GNUNET_YES;
403   return GNUNET_NO;
404 }
405
406 static int
407 is_disconnecting (struct NeighbourMapEntry *n)
408 {
409   if (n->state == S_DISCONNECT)
410     return GNUNET_YES;
411   return GNUNET_NO;
412 }
413
414 static const char *
415 print_state (int state)
416 {
417   switch (state)
418   {
419   case S_CONNECTED:
420     return "S_CONNECTED";
421     break;
422   case S_CONNECT_RECV:
423     return "S_CONNECT_RECV";
424     break;
425   case S_CONNECT_SENT:
426     return "S_CONNECT_SENT";
427     break;
428   case S_DISCONNECT:
429     return "S_DISCONNECT";
430     break;
431   case S_NOT_CONNECTED:
432     return "S_NOT_CONNECTED";
433     break;
434   case S_FAST_RECONNECT:
435     return "S_FAST_RECONNECT";
436     break;
437   default:
438     GNUNET_break (0);
439     break;
440   }
441   return NULL;
442 }
443
444 static int
445 change (struct NeighbourMapEntry *n, int state, int line);
446
447 static void
448 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
449
450
451 static void
452 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
453 {
454   struct NeighbourMapEntry *n = cls;
455
456   if (n == NULL)
457     return;
458
459   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
460   if (n->state == S_CONNECTED)
461     return;
462
463 #if DEBUG_TRANSPORT
464   GNUNET_STATISTICS_update (GST_stats,
465                             gettext_noop
466                             ("# failed connection attempts due to timeout"), 1,
467                             GNUNET_NO);
468 #endif
469
470   /* resetting state */
471   n->state = S_NOT_CONNECTED;
472
473   /* destroying address */
474   if (n->address != NULL)
475   {
476     GNUNET_assert (strlen (n->address->transport_name) > 0);
477     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
478   }
479
480   /* request new address */
481   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
482     GNUNET_SCHEDULER_cancel (n->ats_suggest);
483   n->ats_suggest =
484       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
485                                     n);
486   GNUNET_ATS_suggest_address (GST_ats, &n->id);
487 }
488
489 static int
490 change (struct NeighbourMapEntry *n, int state, int line)
491 {
492   /* allowed transitions */
493   int allowed = GNUNET_NO;
494
495   switch (n->state)
496   {
497   case S_NOT_CONNECTED:
498     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
499         (state == S_DISCONNECT))
500       allowed = GNUNET_YES;
501     break;
502   case S_CONNECT_RECV:
503     allowed = GNUNET_YES;
504     break;
505   case S_CONNECT_SENT:
506     allowed = GNUNET_YES;
507     break;
508   case S_CONNECTED:
509     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
510       allowed = GNUNET_YES;
511     break;
512   case S_DISCONNECT:
513     break;
514   case S_FAST_RECONNECT:
515     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
516       allowed = GNUNET_YES;
517     break;
518   default:
519     GNUNET_break (0);
520     break;
521   }
522   if (allowed == GNUNET_NO)
523   {
524     char *old = GNUNET_strdup (print_state (n->state));
525     char *new = GNUNET_strdup (print_state (state));
526
527     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
528                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
529                 new, line);
530     GNUNET_break (0);
531     GNUNET_free (old);
532     GNUNET_free (new);
533     return GNUNET_SYSERR;
534   }
535 #if DEBUG_TRANSPORT
536   {
537     char *old = GNUNET_strdup (print_state (n->state));
538     char *new = GNUNET_strdup (print_state (state));
539
540     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
541                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
542                 GNUNET_i2s (&n->id), n, old, new, line);
543     GNUNET_free (old);
544     GNUNET_free (new);
545   }
546 #endif
547   n->state = state;
548
549   switch (n->state)
550   {
551   case S_FAST_RECONNECT:
552   case S_CONNECT_RECV:
553   case S_CONNECT_SENT:
554     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
555       GNUNET_SCHEDULER_cancel (n->state_reset);
556     n->state_reset =
557         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
558     break;
559   case S_CONNECTED:
560   case S_NOT_CONNECTED:
561   case S_DISCONNECT:
562     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
563     {
564 #if DEBUG_TRANSPORT
565       char *old = GNUNET_strdup (print_state (n->state));
566       char *new = GNUNET_strdup (print_state (state));
567
568       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
569                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
570                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
571       GNUNET_free (old);
572       GNUNET_free (new);
573 #endif
574       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
575       GNUNET_SCHEDULER_cancel (n->state_reset);
576       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
577     }
578     break;
579
580   default:
581     GNUNET_assert (0);
582   }
583
584
585
586   return GNUNET_OK;
587 }
588
589 static ssize_t
590 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
591                   size_t msgbuf_size, uint32_t priority,
592                   struct GNUNET_TIME_Relative timeout, struct Session *session,
593                   const struct GNUNET_HELLO_Address *address, int force_address,
594                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
595 {
596   struct GNUNET_TRANSPORT_PluginFunctions *papi;
597   size_t ret = GNUNET_SYSERR;
598
599   /* FIXME : ats returns an address with all values 0 */
600   if (address == NULL)
601   {
602     if (cont != NULL)
603       cont (cont_cls, target, GNUNET_SYSERR);
604     return GNUNET_SYSERR;
605   }
606
607   if ((session == NULL) && (address->address_length == 0))
608   {
609     if (cont != NULL)
610       cont (cont_cls, target, GNUNET_SYSERR);
611     return GNUNET_SYSERR;
612   }
613
614   papi = GST_plugins_find (address->transport_name);
615   if (papi == NULL)
616   {
617     if (cont != NULL)
618       cont (cont_cls, target, GNUNET_SYSERR);
619     return GNUNET_SYSERR;
620   }
621
622   ret =
623       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
624                   address->address, address->address_length, GNUNET_YES, cont,
625                   cont_cls);
626
627   if (ret == -1)
628   {
629     if (cont != NULL)
630       cont (cont_cls, target, GNUNET_SYSERR);
631   }
632   return ret;
633 }
634
635 /**
636  * Task invoked to start a transmission to another peer.
637  *
638  * @param cls the 'struct NeighbourMapEntry'
639  * @param tc scheduler context
640  */
641 static void
642 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
643
644
645 /**
646  * We're done with our transmission attempt, continue processing.
647  *
648  * @param cls the 'struct MessageQueue' of the message
649  * @param receiver intended receiver
650  * @param success whether it worked or not
651  */
652 static void
653 transmit_send_continuation (void *cls,
654                             const struct GNUNET_PeerIdentity *receiver,
655                             int success)
656 {
657   struct MessageQueue *mq;
658   struct NeighbourMapEntry *n;
659   struct NeighbourMapEntry *tmp;
660
661   tmp = lookup_neighbour (receiver);
662
663   mq = cls;
664   n = mq->n;
665   if ((NULL != n) && (tmp != NULL) && (tmp == n))
666   {
667     GNUNET_assert (n->is_active == mq);
668     n->is_active = NULL;
669     if (success == GNUNET_YES)
670     {
671       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
672       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
673     }
674   }
675 #if DEBUG_TRANSPORT
676   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
677               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
678               (success == GNUNET_OK) ? "successful" : "FAILED");
679 #endif
680   if (NULL != mq->cont)
681     mq->cont (mq->cont_cls, success);
682   GNUNET_free (mq);
683 }
684
685
686 /**
687  * Check the ready list for the given neighbour and if a plugin is
688  * ready for transmission (and if we have a message), do so!
689  *
690  * @param n target peer for which to transmit
691  */
692 static void
693 try_transmission_to_peer (struct NeighbourMapEntry *n)
694 {
695   struct MessageQueue *mq;
696   struct GNUNET_TIME_Relative timeout;
697   ssize_t ret;
698
699   if (n->is_active != NULL)
700   {
701     GNUNET_break (0);
702     return;                     /* transmission already pending */
703   }
704   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
705   {
706     GNUNET_break (0);
707     return;                     /* currently waiting for bandwidth */
708   }
709   while (NULL != (mq = n->messages_head))
710   {
711     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
712     if (timeout.rel_value > 0)
713       break;
714     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
715     n->is_active = mq;
716     mq->n = n;
717     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
718   }
719   if (NULL == mq)
720     return;                     /* no more messages */
721
722   if (n->address == NULL)
723   {
724     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
725                 GNUNET_i2s (&n->id));
726     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
727     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
728     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
729     return;
730   }
731
732   if (GST_plugins_find (n->address->transport_name) == NULL)
733   {
734     GNUNET_break (0);
735     return;
736   }
737   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
738   n->is_active = mq;
739   mq->n = n;
740
741   if ((n->address->address_length == 0) && (n->session == NULL))
742   {
743     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
744                 GNUNET_i2s (&n->id));
745     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
746     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
747     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
748     return;
749   }
750
751   ret =
752       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
753                         timeout, n->session, n->address, GNUNET_YES,
754                         &transmit_send_continuation, mq);
755   if (ret == -1)
756   {
757     /* failure, but 'send' would not call continuation in this case,
758      * so we need to do it here! */
759     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
760   }
761
762 }
763
764
765 /**
766  * Task invoked to start a transmission to another peer.
767  *
768  * @param cls the 'struct NeighbourMapEntry'
769  * @param tc scheduler context
770  */
771 static void
772 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
773 {
774   struct NeighbourMapEntry *n = cls;
775
776   GNUNET_assert (NULL != lookup_neighbour (&n->id));
777   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
778   try_transmission_to_peer (n);
779 }
780
781
782 /**
783  * Initialize the neighbours subsystem.
784  *
785  * @param cls closure for callbacks
786  * @param connect_cb function to call if we connect to a peer
787  * @param disconnect_cb function to call if we disconnect from a peer
788  */
789 void
790 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
791                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
792 {
793   callback_cls = cls;
794   connect_notify_cb = connect_cb;
795   disconnect_notify_cb = disconnect_cb;
796   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
797 }
798
799
800 static void
801 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
802                       int result)
803 {
804 #if DEBUG_TRANSPORT
805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
806               "Sending DISCONNECT message to peer `%4s': %i\n",
807               GNUNET_i2s (target), result);
808 #endif
809 }
810
811
812 static int
813 send_disconnect (const struct GNUNET_PeerIdentity *target,
814                  const struct GNUNET_HELLO_Address *address,
815                  struct Session *session)
816 {
817   size_t ret;
818   struct SessionDisconnectMessage disconnect_msg;
819
820 #if DEBUG_TRANSPORT
821   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
822               "Sending DISCONNECT message to peer `%4s'\n",
823               GNUNET_i2s (target));
824 #endif
825
826   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
827   disconnect_msg.header.type =
828       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
829   disconnect_msg.reserved = htonl (0);
830   disconnect_msg.purpose.size =
831       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
832              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
833              sizeof (struct GNUNET_TIME_AbsoluteNBO));
834   disconnect_msg.purpose.purpose =
835       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
836   disconnect_msg.timestamp =
837       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
838   disconnect_msg.public_key = GST_my_public_key;
839   GNUNET_assert (GNUNET_OK ==
840                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
841                                          &disconnect_msg.purpose,
842                                          &disconnect_msg.signature));
843
844   ret =
845       send_with_plugin (target, (const char *) &disconnect_msg,
846                         sizeof (disconnect_msg), UINT32_MAX,
847                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
848                         GNUNET_YES, &send_disconnect_cont, NULL);
849
850   if (ret == GNUNET_SYSERR)
851     return GNUNET_SYSERR;
852
853   GNUNET_STATISTICS_update (GST_stats,
854                             gettext_noop
855                             ("# peers disconnected due to external request"), 1,
856                             GNUNET_NO);
857   return GNUNET_OK;
858 }
859
860
861 /**
862  * Disconnect from the given neighbour, clean up the record.
863  *
864  * @param n neighbour to disconnect from
865  */
866 static void
867 disconnect_neighbour (struct NeighbourMapEntry *n)
868 {
869   struct MessageQueue *mq;
870   int previous_state;
871
872   previous_state = n->state;
873
874   if (is_disconnecting (n))
875     return;
876
877   /* send DISCONNECT MESSAGE */
878   if (previous_state == S_CONNECTED)
879   {
880     if (GNUNET_OK == send_disconnect (&n->id, n->address, n->session))
881       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
882                   GNUNET_i2s (&n->id));
883     else
884       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885                   "Could not send DISCONNECT_MSG to `%s'\n",
886                   GNUNET_i2s (&n->id));
887   }
888
889   change_state (n, S_DISCONNECT);
890
891   if (previous_state == S_CONNECTED)
892   {
893     GNUNET_assert (NULL != n->address);
894     if (n->address_state == USED)
895     {
896       GST_validation_set_address_use (n->address, n->session,
897                                       GNUNET_NO);
898
899       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
900       n->address_state = UNUSED;
901     }
902   }
903
904   if (n->address != NULL)
905   {
906     struct GNUNET_TRANSPORT_PluginFunctions *papi;
907
908     papi = GST_plugins_find (n->address->transport_name);
909     if (papi != NULL)
910       papi->disconnect (papi->cls, &n->id);
911   }
912   while (NULL != (mq = n->messages_head))
913   {
914     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
915     if (NULL != mq->cont)
916       mq->cont (mq->cont_cls, GNUNET_SYSERR);
917     GNUNET_free (mq);
918   }
919   if (NULL != n->is_active)
920   {
921     n->is_active->n = NULL;
922     n->is_active = NULL;
923   }
924
925   switch (previous_state)
926   {
927   case S_CONNECTED:
928     GNUNET_assert (neighbours_connected > 0);
929     neighbours_connected--;
930     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
931     GNUNET_SCHEDULER_cancel (n->keepalive_task);
932     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
933     n->expect_latency_response = GNUNET_NO;
934     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
935                               GNUNET_NO);
936     disconnect_notify_cb (callback_cls, &n->id);
937     break;
938   case S_FAST_RECONNECT:
939     GNUNET_STATISTICS_update (GST_stats,
940                               gettext_noop ("# fast reconnects failed"), 1,
941                               GNUNET_NO);
942     disconnect_notify_cb (callback_cls, &n->id);
943     break;
944   default:
945     break;
946   }
947
948   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
949
950   GNUNET_assert (GNUNET_YES ==
951                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
952                                                        &n->id.hashPubKey, n));
953   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
954   {
955     GNUNET_SCHEDULER_cancel (n->ats_suggest);
956     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
957   }
958   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
959   {
960     GNUNET_SCHEDULER_cancel (n->timeout_task);
961     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
962   }
963   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
964   {
965     GNUNET_SCHEDULER_cancel (n->transmission_task);
966     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
967   }
968   if (NULL != n->address)
969   {
970     GNUNET_HELLO_address_free (n->address);
971     n->address = NULL;
972   }
973   n->session = NULL;
974   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
975               GNUNET_i2s (&n->id), n);
976   GNUNET_free (n);
977 }
978
979
980 /**
981  * Peer has been idle for too long. Disconnect.
982  *
983  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
984  * @param tc scheduler context
985  */
986 static void
987 neighbour_timeout_task (void *cls,
988                         const struct GNUNET_SCHEDULER_TaskContext *tc)
989 {
990   struct NeighbourMapEntry *n = cls;
991
992   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
993
994   GNUNET_STATISTICS_update (GST_stats,
995                             gettext_noop
996                             ("# peers disconnected due to timeout"), 1,
997                             GNUNET_NO);
998   disconnect_neighbour (n);
999 }
1000
1001
1002 /**
1003  * Send another keepalive message.
1004  *
1005  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1006  * @param tc scheduler context
1007  */
1008 static void
1009 neighbour_keepalive_task (void *cls,
1010                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1011 {
1012   struct NeighbourMapEntry *n = cls;
1013   struct GNUNET_MessageHeader m;
1014   int ret;
1015
1016   GNUNET_assert (S_CONNECTED == n->state);
1017   n->keepalive_task =
1018       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1019                                     &neighbour_keepalive_task, n);
1020
1021   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1022                             GNUNET_NO);
1023   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1024   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1025
1026
1027   ret =
1028       send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1029                         UINT32_MAX /* priority */ ,
1030                         GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1031                         GNUNET_YES, NULL, NULL);
1032
1033   n->expect_latency_response = GNUNET_NO;
1034   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1035   if (ret != GNUNET_SYSERR)
1036   {
1037     n->expect_latency_response = GNUNET_YES;
1038     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1039   }
1040
1041 }
1042
1043
1044 /**
1045  * Disconnect from the given neighbour.
1046  *
1047  * @param cls unused
1048  * @param key hash of neighbour's public key (not used)
1049  * @param value the 'struct NeighbourMapEntry' of the neighbour
1050  */
1051 static int
1052 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1053 {
1054   struct NeighbourMapEntry *n = value;
1055
1056 #if DEBUG_TRANSPORT
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1058               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1059 #endif
1060   if (S_CONNECTED == n->state)
1061     GNUNET_STATISTICS_update (GST_stats,
1062                               gettext_noop
1063                               ("# peers disconnected due to global disconnect"),
1064                               1, GNUNET_NO);
1065   disconnect_neighbour (n);
1066   return GNUNET_OK;
1067 }
1068
1069
1070 static void
1071 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1072 {
1073   struct NeighbourMapEntry *n = cls;
1074
1075   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1076
1077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078               "ATS did not suggested address to connect to peer `%s'\n",
1079               GNUNET_i2s (&n->id));
1080
1081   disconnect_neighbour (n);
1082 }
1083
1084 /**
1085  * Cleanup the neighbours subsystem.
1086  */
1087 void
1088 GST_neighbours_stop ()
1089 {
1090   // This can happen during shutdown
1091   if (neighbours == NULL)
1092   {
1093     return;
1094   }
1095
1096   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1097                                          NULL);
1098   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1099 //  GNUNET_assert (neighbours_connected == 0);
1100   neighbours = NULL;
1101   callback_cls = NULL;
1102   connect_notify_cb = NULL;
1103   disconnect_notify_cb = NULL;
1104 }
1105
1106 struct ContinutionContext
1107 {
1108   struct GNUNET_HELLO_Address *address;
1109
1110   struct Session *session;
1111 };
1112
1113 static void
1114 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1115                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1116 {
1117   struct QuotaSetMessage q_msg;
1118
1119 #if DEBUG_TRANSPORT
1120   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1121               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1122               ntohl (quota.value__), GNUNET_i2s (target));
1123 #endif
1124   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1125   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1126   q_msg.quota = quota;
1127   q_msg.peer = (*target);
1128   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1129 }
1130
1131 /**
1132  * We tried to send a SESSION_CONNECT message to another peer.  If this
1133  * succeeded, we change the state.  If it failed, we should tell
1134  * ATS to not use this address anymore (until it is re-validated).
1135  *
1136  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1137  * @param target peer to send the message to
1138  * @param success GNUNET_OK on success
1139  */
1140 static void
1141 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1142                            int success)
1143 {
1144   struct ContinutionContext *cc = cls;
1145   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1146
1147   if (GNUNET_YES != success)
1148   {
1149     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1150     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1151   }
1152   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1153   {
1154     GNUNET_HELLO_address_free (cc->address);
1155     GNUNET_free (cc);
1156     return;
1157   }
1158
1159   if ((GNUNET_YES == success) &&
1160       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1161   {
1162     change_state (n, S_CONNECT_SENT);
1163     GNUNET_HELLO_address_free (cc->address);
1164     GNUNET_free (cc);
1165     return;
1166   }
1167
1168   if ((GNUNET_NO == success) &&
1169       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1170   {
1171 #if DEBUG_TRANSPORT
1172     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1173                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1174                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1175 #endif
1176     change_state (n, S_NOT_CONNECTED);
1177     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1178       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1179     n->ats_suggest =
1180         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1181                                       n);
1182     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1183   }
1184   GNUNET_HELLO_address_free (cc->address);
1185   GNUNET_free (cc);
1186 }
1187
1188
1189 /**
1190  * We tried to switch addresses with an peer already connected. If it failed,
1191  * we should tell ATS to not use this address anymore (until it is re-validated).
1192  *
1193  * @param cls the 'struct NeighbourMapEntry'
1194  * @param target peer to send the message to
1195  * @param success GNUNET_OK on success
1196  */
1197 static void
1198 send_switch_address_continuation (void *cls,
1199                                   const struct GNUNET_PeerIdentity *target,
1200                                   int success)
1201 {
1202   struct ContinutionContext *cc = cls;
1203   struct NeighbourMapEntry *n;
1204
1205   if (neighbours == NULL)
1206   {
1207     GNUNET_HELLO_address_free (cc->address);
1208     GNUNET_free (cc);
1209     return;                     /* neighbour is going away */
1210   }
1211
1212   n = lookup_neighbour (&cc->address->peer);
1213   if ((n == NULL) || (is_disconnecting (n)))
1214   {
1215     GNUNET_HELLO_address_free (cc->address);
1216     GNUNET_free (cc);
1217     return;                     /* neighbour is going away */
1218   }
1219
1220   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1221   if (GNUNET_YES != success)
1222   {
1223 #if DEBUG_TRANSPORT
1224     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225                 "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n",
1226                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1227 #endif
1228     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1229     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1230
1231     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1232       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1233     n->ats_suggest =
1234         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1235                                       n);
1236     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1237     GNUNET_HELLO_address_free (cc->address);
1238     GNUNET_free (cc);
1239     return;
1240   }
1241   /* Tell ATS that switching addresses was successful */
1242   switch (n->state)
1243   {
1244   case S_CONNECTED:
1245     if (n->address_state == FRESH)
1246     {
1247       GST_validation_set_address_use (cc->address, cc->session,
1248                                       GNUNET_YES);
1249       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1250       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1251       n->address_state = USED;
1252     }
1253     break;
1254   case S_FAST_RECONNECT:
1255 #if DEBUG_TRANSPORT
1256     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1257                 "Successful fast reconnect to peer `%s'\n",
1258                 GNUNET_i2s (&n->id));
1259 #endif
1260     change_state (n, S_CONNECTED);
1261     neighbours_connected++;
1262     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1263                               GNUNET_NO);
1264
1265     if (n->address_state == FRESH)
1266     {
1267       GST_validation_set_address_use (cc->address, cc->session,
1268                                       GNUNET_YES);
1269       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1270       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1271       n->address_state = USED;
1272     }
1273
1274     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1275       n->keepalive_task =
1276           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1277
1278     /* Updating quotas */
1279     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1280     send_outbound_quota (target, n->bandwidth_out);
1281
1282   default:
1283     break;
1284   }
1285   GNUNET_HELLO_address_free (cc->address);
1286   GNUNET_free (cc);
1287 }
1288
1289
1290 /**
1291  * We tried to send a SESSION_CONNECT message to another peer.  If this
1292  * succeeded, we change the state.  If it failed, we should tell
1293  * ATS to not use this address anymore (until it is re-validated).
1294  *
1295  * @param cls the 'struct NeighbourMapEntry'
1296  * @param target peer to send the message to
1297  * @param success GNUNET_OK on success
1298  */
1299 static void
1300 send_connect_ack_continuation (void *cls,
1301                                const struct GNUNET_PeerIdentity *target,
1302                                int success)
1303 {
1304   struct ContinutionContext *cc = cls;
1305   struct NeighbourMapEntry *n;
1306
1307   if (neighbours == NULL)
1308   {
1309     GNUNET_HELLO_address_free (cc->address);
1310     GNUNET_free (cc);
1311     return;                     /* neighbour is going away */
1312   }
1313
1314   n = lookup_neighbour (&cc->address->peer);
1315   if ((n == NULL) || (is_disconnecting (n)))
1316   {
1317     GNUNET_HELLO_address_free (cc->address);
1318     GNUNET_free (cc);
1319     return;                     /* neighbour is going away */
1320   }
1321
1322   if (GNUNET_YES == success)
1323   {
1324     GNUNET_HELLO_address_free (cc->address);
1325     GNUNET_free (cc);
1326     return;                     /* sending successful */
1327   }
1328
1329   /* sending failed, ask for next address  */
1330 #if DEBUG_TRANSPORT
1331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1332               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1333               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1334 #endif
1335   change_state (n, S_NOT_CONNECTED);
1336   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1337   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1338
1339   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1340     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1341   n->ats_suggest =
1342       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1343                                     n);
1344   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1345   GNUNET_HELLO_address_free (cc->address);
1346   GNUNET_free (cc);
1347 }
1348
1349
1350 /**
1351  * For an existing neighbour record, set the active connection to
1352  * the given address.
1353  *
1354  * @param peer identity of the peer to switch the address for
1355  * @param address address of the other peer, NULL if other peer
1356  *                       connected to us
1357  * @param session session to use (or NULL)
1358  * @param ats performance data
1359  * @param ats_count number of entries in ats
1360  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1361  *         connection is not up (yet)
1362  */
1363 int
1364 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1365                                        const struct GNUNET_HELLO_Address
1366                                        *address, struct Session *session,
1367                                        const struct GNUNET_ATS_Information *ats,
1368                                        uint32_t ats_count,
1369                                        struct GNUNET_BANDWIDTH_Value32NBO
1370                                        bandwidth_in,
1371                                        struct GNUNET_BANDWIDTH_Value32NBO
1372                                        bandwidth_out)
1373 {
1374   struct NeighbourMapEntry *n;
1375   struct SessionConnectMessage connect_msg;
1376   struct ContinutionContext *cc;
1377   size_t msg_len;
1378   size_t ret;
1379
1380   if (neighbours == NULL)
1381   {
1382     /* This can happen during shutdown */
1383     return GNUNET_NO;
1384   }
1385   n = lookup_neighbour (peer);
1386   if (NULL == n)
1387     return GNUNET_NO;
1388   if (n->state == S_DISCONNECT)
1389   {
1390     /* We are disconnecting, nothing to do here */
1391     return GNUNET_NO;
1392   }
1393   GNUNET_assert (address->transport_name != NULL);
1394   if ((session == NULL) && (0 == address->address_length))
1395   {
1396     GNUNET_break_op (0);
1397     /* FIXME: is this actually possible? When does this happen? */
1398     if (strlen (address->transport_name) > 0)
1399       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1400     GNUNET_ATS_suggest_address (GST_ats, peer);
1401     return GNUNET_NO;
1402   }
1403
1404   /* checks successful and neighbour != NULL */
1405 #if DEBUG_TRANSPORT
1406   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1407               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1408               GST_plugins_a2s (address), session, GNUNET_i2s (peer),
1409               print_state (n->state));
1410 #endif
1411   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1412   {
1413     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1414     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1415   }
1416   /* do not switch addresses just update quotas */
1417   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1418       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1419       (n->session == session))
1420   {
1421     n->bandwidth_in = bandwidth_in;
1422     n->bandwidth_out = bandwidth_out;
1423     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1424     send_outbound_quota (peer, n->bandwidth_out);
1425     return GNUNET_NO;
1426   }
1427   if (n->state == S_CONNECTED)
1428   {
1429     /* mark old address as no longer used */
1430     GNUNET_assert (NULL != n->address);
1431     if (n->address_state == USED)
1432     {
1433       GST_validation_set_address_use (n->address, n->session,
1434                                       GNUNET_NO);
1435       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1436       n->address_state = UNUSED;
1437     }
1438
1439   }
1440
1441   /* set new address */
1442   if (NULL != n->address)
1443     GNUNET_HELLO_address_free (n->address);
1444   n->address = GNUNET_HELLO_address_copy (address);
1445   n->address_state = FRESH;
1446   n->session = session;
1447   n->bandwidth_in = bandwidth_in;
1448   n->bandwidth_out = bandwidth_out;
1449   GNUNET_SCHEDULER_cancel (n->timeout_task);
1450   n->timeout_task =
1451       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1452                                     &neighbour_timeout_task, n);
1453   switch (n->state)
1454   {
1455   case S_NOT_CONNECTED:
1456   case S_CONNECT_SENT:
1457     msg_len = sizeof (struct SessionConnectMessage);
1458     connect_msg.header.size = htons (msg_len);
1459     connect_msg.header.type =
1460         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1461     connect_msg.reserved = htonl (0);
1462     connect_msg.timestamp =
1463         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1464
1465     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1466     cc->session = session;
1467     cc->address = GNUNET_HELLO_address_copy (address);
1468     ret =
1469         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1470                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1471                           address, GNUNET_YES, &send_connect_continuation, cc);
1472     return GNUNET_NO;
1473   case S_CONNECT_RECV:
1474     /* We received a CONNECT message and asked ATS for an address */
1475     msg_len = sizeof (struct SessionConnectMessage);
1476     connect_msg.header.size = htons (msg_len);
1477     connect_msg.header.type =
1478         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1479     connect_msg.reserved = htonl (0);
1480     connect_msg.timestamp =
1481         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1482     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1483     cc->session = session;
1484     cc->address = GNUNET_HELLO_address_copy (address);
1485     ret =
1486         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1487                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1488                           address, GNUNET_YES, &send_connect_ack_continuation,
1489                           cc);
1490     return GNUNET_NO;
1491   case S_CONNECTED:
1492   case S_FAST_RECONNECT:
1493     /* connected peer is switching addresses or tries fast reconnect */
1494     msg_len = sizeof (struct SessionConnectMessage);
1495     connect_msg.header.size = htons (msg_len);
1496     connect_msg.header.type =
1497         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1498     connect_msg.reserved = htonl (0);
1499     connect_msg.timestamp =
1500         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1501     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1502     cc->session = session;
1503     cc->address = GNUNET_HELLO_address_copy (address);
1504     ret =
1505         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1506                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1507                           address, GNUNET_YES,
1508                           &send_switch_address_continuation, cc);
1509     if (ret == GNUNET_SYSERR)
1510     {
1511       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1512                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1513                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1514     }
1515     return GNUNET_NO;
1516   default:
1517     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1518                 "Invalid connection state to switch addresses %u \n", n->state);
1519     GNUNET_break_op (0);
1520     return GNUNET_NO;
1521   }
1522 }
1523
1524
1525 /**
1526  * Obtain current latency information for the given neighbour.
1527  *
1528  * @param peer
1529  * @return observed latency of the address, FOREVER if the address was
1530  *         never successfully validated
1531  */
1532 struct GNUNET_TIME_Relative
1533 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1534 {
1535   struct NeighbourMapEntry *n;
1536
1537   n = lookup_neighbour (peer);
1538   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1539     return GNUNET_TIME_UNIT_FOREVER_REL;
1540
1541   return n->latency;
1542 }
1543
1544 /**
1545  * Obtain current address information for the given neighbour.
1546  *
1547  * @param peer
1548  * @return address currently used
1549  */
1550 struct GNUNET_HELLO_Address *
1551 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1552 {
1553   struct NeighbourMapEntry *n;
1554
1555   n = lookup_neighbour (peer);
1556   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1557     return NULL;
1558
1559   return n->address;
1560 }
1561
1562
1563
1564 /**
1565  * Create an entry in the neighbour map for the given peer
1566  *
1567  * @param peer peer to create an entry for
1568  * @return new neighbour map entry
1569  */
1570 static struct NeighbourMapEntry *
1571 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1572 {
1573   struct NeighbourMapEntry *n;
1574
1575 #if DEBUG_TRANSPORT
1576   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1577               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1578 #endif
1579   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1580   n->id = *peer;
1581   n->state = S_NOT_CONNECTED;
1582   n->latency = GNUNET_TIME_relative_get_forever ();
1583   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1584                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1585                                  MAX_BANDWIDTH_CARRY_S);
1586   n->timeout_task =
1587       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1588                                     &neighbour_timeout_task, n);
1589   GNUNET_assert (GNUNET_OK ==
1590                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1591                                                     &n->id.hashPubKey, n,
1592                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1593   return n;
1594 }
1595
1596
1597 /**
1598  * Try to create a connection to the given target (eventually).
1599  *
1600  * @param target peer to try to connect to
1601  */
1602 void
1603 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1604 {
1605   struct NeighbourMapEntry *n;
1606
1607   // This can happen during shutdown
1608   if (neighbours == NULL)
1609   {
1610     return;
1611   }
1612 #if DEBUG_TRANSPORT
1613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1614               GNUNET_i2s (target));
1615 #endif
1616   if (0 ==
1617       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1618   {
1619     /* my own hello */
1620     return;
1621   }
1622   n = lookup_neighbour (target);
1623
1624   if (NULL != n)
1625   {
1626     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1627       return;                   /* already connecting or connected */
1628     if (is_disconnecting (n))
1629       change_state (n, S_NOT_CONNECTED);
1630   }
1631
1632
1633   if (n == NULL)
1634     n = setup_neighbour (target);
1635 #if DEBUG_TRANSPORT
1636   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1637               "Asking ATS for suggested address to connect to peer `%s'\n",
1638               GNUNET_i2s (&n->id));
1639 #endif
1640
1641   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1642 }
1643
1644 /**
1645  * Test if we're connected to the given peer.
1646  *
1647  * @param target peer to test
1648  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1649  */
1650 int
1651 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1652 {
1653   struct NeighbourMapEntry *n;
1654
1655   // This can happen during shutdown
1656   if (neighbours == NULL)
1657   {
1658     return GNUNET_NO;
1659   }
1660
1661   n = lookup_neighbour (target);
1662
1663   if ((NULL == n) || (S_CONNECTED != n->state))
1664     return GNUNET_NO;           /* not connected */
1665   return GNUNET_YES;
1666 }
1667
1668 /**
1669  * A session was terminated. Take note.
1670  *
1671  * @param peer identity of the peer where the session died
1672  * @param session session that is gone
1673  */
1674 void
1675 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1676                                    struct Session *session)
1677 {
1678   struct NeighbourMapEntry *n;
1679
1680   if (neighbours == NULL)
1681   {
1682     /* This can happen during shutdown */
1683     return;
1684   }
1685
1686 #if DEBUG_TRANSPORT
1687   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Session %X to peer `%s' ended \n",
1688               session, GNUNET_i2s (peer));
1689 #endif
1690
1691   n = lookup_neighbour (peer);
1692   if (NULL == n)
1693     return;
1694   if (session != n->session)
1695     return;                     /* doesn't affect us */
1696   if (n->state == S_CONNECTED)
1697   {
1698     if (n->address_state == USED)
1699     {
1700       GST_validation_set_address_use (n->address, n->session,
1701                                       GNUNET_NO);
1702       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1703       n->address_state = UNUSED;
1704     }
1705   }
1706
1707   if (NULL != n->address)
1708   {
1709     GNUNET_HELLO_address_free (n->address);
1710     n->address = NULL;
1711   }
1712   n->session = NULL;
1713
1714   /* not connected anymore anyway, shouldn't matter */
1715   if (S_CONNECTED != n->state)
1716     return;
1717
1718   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1719   {
1720     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1721     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1722     n->expect_latency_response = GNUNET_NO;
1723   }
1724
1725   /* connected, try fast reconnect */
1726   /* statistics "transport" : "# peers connected" -= 1
1727    * neighbours_connected -= 1
1728    * BUT: no disconnect_cb to notify clients about disconnect
1729    */
1730 #if DEBUG_TRANSPORT
1731   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1732               GNUNET_i2s (peer));
1733 #endif
1734   GNUNET_assert (neighbours_connected > 0);
1735   change_state (n, S_FAST_RECONNECT);
1736   neighbours_connected--;
1737   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
1738                             GNUNET_NO);
1739
1740
1741   /* We are connected, so ask ATS to switch addresses */
1742   GNUNET_SCHEDULER_cancel (n->timeout_task);
1743   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1744                                     &neighbour_timeout_task, n);
1745   /* try QUICKLY to re-establish a connection, reduce timeout! */
1746   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1747     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1748   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1749                                     n);
1750   GNUNET_ATS_suggest_address (GST_ats, peer);
1751 }
1752
1753
1754 /**
1755  * Transmit a message to the given target using the active connection.
1756  *
1757  * @param target destination
1758  * @param msg message to send
1759  * @param msg_size number of bytes in msg
1760  * @param timeout when to fail with timeout
1761  * @param cont function to call when done
1762  * @param cont_cls closure for 'cont'
1763  */
1764 void
1765 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1766                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1767                      GST_NeighbourSendContinuation cont, void *cont_cls)
1768 {
1769   struct NeighbourMapEntry *n;
1770   struct MessageQueue *mq;
1771
1772   // This can happen during shutdown
1773   if (neighbours == NULL)
1774   {
1775     return;
1776   }
1777
1778   n = lookup_neighbour (target);
1779   if ((n == NULL) || (!is_connected (n)))
1780   {
1781     GNUNET_STATISTICS_update (GST_stats,
1782                               gettext_noop
1783                               ("# messages not sent (no such peer or not connected)"),
1784                               1, GNUNET_NO);
1785 #if DEBUG_TRANSPORT
1786     if (n == NULL)
1787       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1788                   "Could not send message to peer `%s': unknown neighbour",
1789                   GNUNET_i2s (target));
1790     else if (!is_connected (n))
1791       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1792                   "Could not send message to peer `%s': not connected\n",
1793                   GNUNET_i2s (target));
1794 #endif
1795     if (NULL != cont)
1796       cont (cont_cls, GNUNET_SYSERR);
1797     return;
1798   }
1799
1800   if ((n->session == NULL) && (n->address == NULL))
1801   {
1802     GNUNET_STATISTICS_update (GST_stats,
1803                               gettext_noop
1804                               ("# messages not sent (no such peer or not connected)"),
1805                               1, GNUNET_NO);
1806 #if DEBUG_TRANSPORT
1807     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1808                 "Could not send message to peer `%s': no address available\n",
1809                 GNUNET_i2s (target));
1810 #endif
1811
1812     if (NULL != cont)
1813       cont (cont_cls, GNUNET_SYSERR);
1814     return;
1815   }
1816
1817   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1818   GNUNET_STATISTICS_update (GST_stats,
1819                             gettext_noop
1820                             ("# bytes in message queue for other peers"),
1821                             msg_size, GNUNET_NO);
1822   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1823   mq->cont = cont;
1824   mq->cont_cls = cont_cls;
1825   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1826   memcpy (&mq[1], msg, msg_size);
1827   mq->message_buf = (const char *) &mq[1];
1828   mq->message_buf_size = msg_size;
1829   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1830   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1831
1832   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1833       (NULL == n->is_active))
1834     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1835 }
1836
1837
1838 /**
1839  * We have received a message from the given sender.  How long should
1840  * we delay before receiving more?  (Also used to keep the peer marked
1841  * as live).
1842  *
1843  * @param sender sender of the message
1844  * @param size size of the message
1845  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1846  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1847  *                   GNUNET_SYSERR if the connection is not fully up yet
1848  * @return how long to wait before reading more from this sender
1849  */
1850 struct GNUNET_TIME_Relative
1851 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1852                                         *sender, ssize_t size, int *do_forward)
1853 {
1854   struct NeighbourMapEntry *n;
1855   struct GNUNET_TIME_Relative ret;
1856
1857   // This can happen during shutdown
1858   if (neighbours == NULL)
1859   {
1860     return GNUNET_TIME_UNIT_FOREVER_REL;
1861   }
1862
1863   n = lookup_neighbour (sender);
1864   if (n == NULL)
1865   {
1866     GST_neighbours_try_connect (sender);
1867     n = lookup_neighbour (sender);
1868     if (NULL == n)
1869     {
1870       GNUNET_STATISTICS_update (GST_stats,
1871                                 gettext_noop
1872                                 ("# messages discarded due to lack of neighbour record"),
1873                                 1, GNUNET_NO);
1874       *do_forward = GNUNET_NO;
1875       return GNUNET_TIME_UNIT_ZERO;
1876     }
1877   }
1878   if (!is_connected (n))
1879   {
1880     *do_forward = GNUNET_SYSERR;
1881     return GNUNET_TIME_UNIT_ZERO;
1882   }
1883   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1884   {
1885     n->quota_violation_count++;
1886 #if DEBUG_TRANSPORT
1887     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1888                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1889                 n->in_tracker.available_bytes_per_s__,
1890                 n->quota_violation_count);
1891 #endif
1892     /* Discount 32k per violation */
1893     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1894   }
1895   else
1896   {
1897     if (n->quota_violation_count > 0)
1898     {
1899       /* try to add 32k back */
1900       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1901       n->quota_violation_count--;
1902     }
1903   }
1904   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1905   {
1906     GNUNET_STATISTICS_update (GST_stats,
1907                               gettext_noop
1908                               ("# bandwidth quota violations by other peers"),
1909                               1, GNUNET_NO);
1910     *do_forward = GNUNET_NO;
1911     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1912   }
1913   *do_forward = GNUNET_YES;
1914   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1915   if (ret.rel_value > 0)
1916   {
1917 #if DEBUG_TRANSPORT
1918     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1920                 (unsigned long long) n->in_tracker.
1921                 consumption_since_last_update__,
1922                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1923                 (unsigned long long) ret.rel_value);
1924 #endif
1925     GNUNET_STATISTICS_update (GST_stats,
1926                               gettext_noop ("# ms throttling suggested"),
1927                               (int64_t) ret.rel_value, GNUNET_NO);
1928   }
1929   return ret;
1930 }
1931
1932
1933 /**
1934  * Keep the connection to the given neighbour alive longer,
1935  * we received a KEEPALIVE (or equivalent).
1936  *
1937  * @param neighbour neighbour to keep alive
1938  */
1939 void
1940 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1941 {
1942   struct NeighbourMapEntry *n;
1943
1944   // This can happen during shutdown
1945   if (neighbours == NULL)
1946   {
1947     return;
1948   }
1949
1950   n = lookup_neighbour (neighbour);
1951   if (NULL == n)
1952   {
1953     GNUNET_STATISTICS_update (GST_stats,
1954                               gettext_noop
1955                               ("# KEEPALIVE messages discarded (not connected)"),
1956                               1, GNUNET_NO);
1957     return;
1958   }
1959   GNUNET_SCHEDULER_cancel (n->timeout_task);
1960   n->timeout_task =
1961       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1962                                     &neighbour_timeout_task, n);
1963
1964   /* send reply to measure latency */
1965   if (S_CONNECTED != n->state)
1966     return;
1967
1968   struct GNUNET_MessageHeader m;
1969
1970   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1971   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
1972
1973   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1974                     UINT32_MAX /* priority */ ,
1975                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1976                     GNUNET_YES, NULL, NULL);
1977 }
1978
1979 /**
1980  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
1981  * to this peer
1982  *
1983  * @param neighbour neighbour to keep alive
1984  * @param ats performance data
1985  * @param ats_count number of entries in ats
1986  */
1987 void
1988 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
1989                                    const struct GNUNET_ATS_Information *ats,
1990                                    uint32_t ats_count)
1991 {
1992   struct NeighbourMapEntry *n;
1993   struct GNUNET_ATS_Information *ats_new;
1994   uint32_t latency;
1995
1996   if (neighbours == NULL)
1997   {
1998     // This can happen during shutdown
1999     return;
2000   }
2001
2002   n = lookup_neighbour (neighbour);
2003   if ((NULL == n) || (n->state != S_CONNECTED))
2004   {
2005     GNUNET_STATISTICS_update (GST_stats,
2006                               gettext_noop
2007                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2008                               1, GNUNET_NO);
2009     return;
2010   }
2011   if (n->expect_latency_response != GNUNET_YES)
2012   {
2013     GNUNET_STATISTICS_update (GST_stats,
2014                               gettext_noop
2015                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2016                               1, GNUNET_NO);
2017     return;
2018   }
2019   n->expect_latency_response = GNUNET_NO;
2020
2021   GNUNET_assert (n->keep_alive_sent.abs_value !=
2022                  GNUNET_TIME_absolute_get_zero ().abs_value);
2023   n->latency =
2024       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2025                                            GNUNET_TIME_absolute_get ());
2026 #if DEBUG_TRANSPORT
2027   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2028               GNUNET_i2s (&n->id), n->latency.rel_value);
2029 #endif
2030
2031
2032   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2033   {
2034     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2035   }
2036   else
2037   {
2038     ats_new =
2039         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2040                        (ats_count + 1));
2041     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2042
2043     /* add latency */
2044     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2045     if (n->latency.rel_value > UINT32_MAX)
2046       latency = UINT32_MAX;
2047     else
2048       latency = n->latency.rel_value;
2049     ats_new[ats_count].value = htonl (latency);
2050
2051     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2052                                ats_count + 1);
2053     GNUNET_free (ats_new);
2054   }
2055 }
2056
2057
2058 /**
2059  * Change the incoming quota for the given peer.
2060  *
2061  * @param neighbour identity of peer to change qutoa for
2062  * @param quota new quota
2063  */
2064 void
2065 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2066                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2067 {
2068   struct NeighbourMapEntry *n;
2069
2070   // This can happen during shutdown
2071   if (neighbours == NULL)
2072   {
2073     return;
2074   }
2075
2076   n = lookup_neighbour (neighbour);
2077   if (n == NULL)
2078   {
2079     GNUNET_STATISTICS_update (GST_stats,
2080                               gettext_noop
2081                               ("# SET QUOTA messages ignored (no such peer)"),
2082                               1, GNUNET_NO);
2083     return;
2084   }
2085 #if DEBUG_TRANSPORT
2086   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2087               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2088               ntohl (quota.value__), GNUNET_i2s (&n->id));
2089 #endif
2090   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2091   if (0 != ntohl (quota.value__))
2092     return;
2093 #if DEBUG_TRANSPORT
2094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2095               GNUNET_i2s (&n->id), "SET_QUOTA");
2096 #endif
2097   if (is_connected (n))
2098     GNUNET_STATISTICS_update (GST_stats,
2099                               gettext_noop ("# disconnects due to quota of 0"),
2100                               1, GNUNET_NO);
2101   disconnect_neighbour (n);
2102 }
2103
2104
2105 /**
2106  * Closure for the neighbours_iterate function.
2107  */
2108 struct IteratorContext
2109 {
2110   /**
2111    * Function to call on each connected neighbour.
2112    */
2113   GST_NeighbourIterator cb;
2114
2115   /**
2116    * Closure for 'cb'.
2117    */
2118   void *cb_cls;
2119 };
2120
2121
2122 /**
2123  * Call the callback from the closure for each connected neighbour.
2124  *
2125  * @param cls the 'struct IteratorContext'
2126  * @param key the hash of the public key of the neighbour
2127  * @param value the 'struct NeighbourMapEntry'
2128  * @return GNUNET_OK (continue to iterate)
2129  */
2130 static int
2131 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2132 {
2133   struct IteratorContext *ic = cls;
2134   struct NeighbourMapEntry *n = value;
2135
2136   if (!is_connected (n))
2137     return GNUNET_OK;
2138
2139   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2140   return GNUNET_OK;
2141 }
2142
2143
2144 /**
2145  * Iterate over all connected neighbours.
2146  *
2147  * @param cb function to call
2148  * @param cb_cls closure for cb
2149  */
2150 void
2151 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2152 {
2153   struct IteratorContext ic;
2154
2155   // This can happen during shutdown
2156   if (neighbours == NULL)
2157   {
2158     return;
2159   }
2160
2161   ic.cb = cb;
2162   ic.cb_cls = cb_cls;
2163   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2164 }
2165
2166 /**
2167  * If we have an active connection to the given target, it must be shutdown.
2168  *
2169  * @param target peer to disconnect from
2170  */
2171 void
2172 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2173 {
2174   struct NeighbourMapEntry *n;
2175
2176   // This can happen during shutdown
2177   if (neighbours == NULL)
2178   {
2179     return;
2180   }
2181
2182   n = lookup_neighbour (target);
2183   if (NULL == n)
2184     return;                     /* not active */
2185   disconnect_neighbour (n);
2186 }
2187
2188
2189 /**
2190  * We received a disconnect message from the given peer,
2191  * validate and process.
2192  *
2193  * @param peer sender of the message
2194  * @param msg the disconnect message
2195  */
2196 void
2197 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2198                                           *peer,
2199                                           const struct GNUNET_MessageHeader
2200                                           *msg)
2201 {
2202   struct NeighbourMapEntry *n;
2203   const struct SessionDisconnectMessage *sdm;
2204   GNUNET_HashCode hc;
2205
2206 #if DEBUG_TRANSPORT
2207   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2208               "Received DISCONNECT message from peer `%s'\n",
2209               GNUNET_i2s (peer));
2210 #endif
2211
2212   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2213   {
2214     // GNUNET_break_op (0);
2215     GNUNET_STATISTICS_update (GST_stats,
2216                               gettext_noop
2217                               ("# disconnect messages ignored (old format)"), 1,
2218                               GNUNET_NO);
2219     return;
2220   }
2221   sdm = (const struct SessionDisconnectMessage *) msg;
2222   n = lookup_neighbour (peer);
2223   if (NULL == n)
2224     return;                     /* gone already */
2225   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2226       n->connect_ts.abs_value)
2227   {
2228     GNUNET_STATISTICS_update (GST_stats,
2229                               gettext_noop
2230                               ("# disconnect messages ignored (timestamp)"), 1,
2231                               GNUNET_NO);
2232     return;
2233   }
2234   GNUNET_CRYPTO_hash (&sdm->public_key,
2235                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2236                       &hc);
2237   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2238   {
2239     GNUNET_break_op (0);
2240     return;
2241   }
2242   if (ntohl (sdm->purpose.size) !=
2243       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2244       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2245       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2246   {
2247     GNUNET_break_op (0);
2248     return;
2249   }
2250   if (GNUNET_OK !=
2251       GNUNET_CRYPTO_rsa_verify
2252       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2253        &sdm->signature, &sdm->public_key))
2254   {
2255     GNUNET_break_op (0);
2256     return;
2257   }
2258   GST_neighbours_force_disconnect (peer);
2259 }
2260
2261
2262 /**
2263  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2264  * Consider switching to it.
2265  *
2266  * @param message possibly a 'struct SessionConnectMessage' (check format)
2267  * @param peer identity of the peer to switch the address for
2268  * @param address address of the other peer, NULL if other peer
2269  *                       connected to us
2270  * @param session session to use (or NULL)
2271  * @param ats performance data
2272  * @param ats_count number of entries in ats
2273  */
2274 void
2275 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2276                                    const struct GNUNET_PeerIdentity *peer,
2277                                    const struct GNUNET_HELLO_Address *address,
2278                                    struct Session *session,
2279                                    const struct GNUNET_ATS_Information *ats,
2280                                    uint32_t ats_count)
2281 {
2282   const struct SessionConnectMessage *scm;
2283   struct GNUNET_MessageHeader msg;
2284   struct NeighbourMapEntry *n;
2285   size_t msg_len;
2286   size_t ret;
2287
2288 #if DEBUG_TRANSPORT
2289   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2290               "Received CONNECT_ACK message from peer `%s'\n",
2291               GNUNET_i2s (peer));
2292 #endif
2293
2294   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2295   {
2296     GNUNET_break_op (0);
2297     return;
2298   }
2299   scm = (const struct SessionConnectMessage *) message;
2300   GNUNET_break_op (ntohl (scm->reserved) == 0);
2301   n = lookup_neighbour (peer);
2302   if (NULL == n)
2303   {
2304     /* we did not send 'CONNECT' -- at least not recently */
2305     GNUNET_STATISTICS_update (GST_stats,
2306                               gettext_noop
2307                               ("# unexpected CONNECT_ACK messages (no peer)"),
2308                               1, GNUNET_NO);
2309     return;
2310   }
2311
2312   /* Additional check
2313    *
2314    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2315    *
2316    * We also received an CONNECT message, switched from SENDT to RECV and
2317    * ATS already suggested us an address after a successful blacklist check
2318    */
2319   if ((n->state != S_CONNECT_SENT) &&
2320       ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2321   {
2322     GNUNET_STATISTICS_update (GST_stats,
2323                               gettext_noop
2324                               ("# unexpected CONNECT_ACK messages"), 1,
2325                               GNUNET_NO);
2326     return;
2327   }
2328
2329   change_state (n, S_CONNECTED);
2330
2331   if (NULL != session)
2332   {
2333     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2334                      "transport-ats",
2335                      "Giving ATS session %p of plugin %s for peer %s\n",
2336                      session, address->transport_name, GNUNET_i2s (peer));
2337   }
2338   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2339   GNUNET_assert (NULL != n->address);
2340   if (n->address_state == FRESH)
2341   {
2342     GST_validation_set_address_use (n->address, n->session, GNUNET_YES);
2343     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2344     n->address_state = USED;
2345   }
2346
2347   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2348
2349   /* send ACK (ACK) */
2350   msg_len = sizeof (msg);
2351   msg.size = htons (msg_len);
2352   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2353
2354   ret =
2355       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2356                         GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
2357                         GNUNET_YES, NULL, NULL);
2358
2359   if (ret == GNUNET_SYSERR)
2360     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2361                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2362                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2363
2364
2365   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2366     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2367
2368   neighbours_connected++;
2369   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2370                             GNUNET_NO);
2371 #if DEBUG_TRANSPORT
2372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2373               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2374               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2375               __LINE__);
2376 #endif
2377   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2378   send_outbound_quota (peer, n->bandwidth_out);
2379
2380 }
2381
2382
2383 void
2384 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2385                            const struct GNUNET_PeerIdentity *peer,
2386                            const struct GNUNET_HELLO_Address *address,
2387                            struct Session *session,
2388                            const struct GNUNET_ATS_Information *ats,
2389                            uint32_t ats_count)
2390 {
2391   struct NeighbourMapEntry *n;
2392
2393 #if DEBUG_TRANSPORT
2394   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2395               GNUNET_i2s (peer));
2396 #endif
2397
2398   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2399   {
2400     GNUNET_break_op (0);
2401     return;
2402   }
2403   n = lookup_neighbour (peer);
2404   if (NULL == n)
2405   {
2406     send_disconnect (peer, address, session);
2407     GNUNET_break (0);
2408     return;
2409   }
2410   if (S_CONNECTED == n->state)
2411     return;
2412   if (!is_connecting (n))
2413   {
2414     GNUNET_STATISTICS_update (GST_stats,
2415                               gettext_noop ("# unexpected ACK messages"), 1,
2416                               GNUNET_NO);
2417     return;
2418   }
2419   change_state (n, S_CONNECTED);
2420   if (NULL != session)
2421     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2422                      "transport-ats",
2423                      "Giving ATS session %p of plugin %s for peer %s\n",
2424                      session, address->transport_name, GNUNET_i2s (peer));
2425   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2426   GNUNET_assert (n->address != NULL);
2427   if (n->address_state == FRESH)
2428   {
2429     GST_validation_set_address_use (n->address, n->session, GNUNET_YES);
2430     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2431     n->address_state = USED;
2432   }
2433
2434   neighbours_connected++;
2435   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2436                             GNUNET_NO);
2437
2438   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2439   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2440     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2441 #if DEBUG_TRANSPORT
2442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2443               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2444               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2445               __LINE__);
2446 #endif
2447   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2448   send_outbound_quota (peer, n->bandwidth_out);
2449 }
2450
2451 struct BlackListCheckContext
2452 {
2453   struct GNUNET_ATS_Information *ats;
2454
2455   uint32_t ats_count;
2456
2457   struct Session *session;
2458
2459   struct GNUNET_HELLO_Address *address;
2460
2461   struct GNUNET_TIME_Absolute ts;
2462 };
2463
2464
2465 static void
2466 handle_connect_blacklist_cont (void *cls,
2467                                const struct GNUNET_PeerIdentity *peer,
2468                                int result)
2469 {
2470   struct NeighbourMapEntry *n;
2471   struct BlackListCheckContext *bcc = cls;
2472
2473 #if DEBUG_TRANSPORT
2474   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2475               "Blacklist check due to CONNECT message: `%s'\n",
2476               GNUNET_i2s (peer),
2477               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2478 #endif
2479
2480   /* not allowed */
2481   if (GNUNET_OK != result)
2482   {
2483     GNUNET_HELLO_address_free (bcc->address);
2484     GNUNET_free (bcc);
2485     return;
2486   }
2487
2488   n = lookup_neighbour (peer);
2489   if (NULL == n)
2490     n = setup_neighbour (peer);
2491
2492   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2493   {
2494     if (NULL != bcc->session)
2495       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2496                        "transport-ats",
2497                        "Giving ATS session %p of address `%s' for peer %s\n",
2498                        bcc->session, GST_plugins_a2s (bcc->address),
2499                        GNUNET_i2s (peer));
2500     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2501
2502     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2503                                bcc->ats_count);
2504     n->connect_ts = bcc->ts;
2505   }
2506
2507   GNUNET_HELLO_address_free (bcc->address);
2508   GNUNET_free (bcc);
2509
2510   if (n->state != S_CONNECT_RECV)
2511     change_state (n, S_CONNECT_RECV);
2512
2513
2514   /* Ask ATS for an address to connect via that address */
2515   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2516     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2517   n->ats_suggest =
2518       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2519                                     n);
2520   GNUNET_ATS_suggest_address (GST_ats, peer);
2521 }
2522
2523 /**
2524  * We received a 'SESSION_CONNECT' message from the other peer.
2525  * Consider switching to it.
2526  *
2527  * @param message possibly a 'struct SessionConnectMessage' (check format)
2528  * @param peer identity of the peer to switch the address for
2529  * @param address address of the other peer, NULL if other peer
2530  *                       connected to us
2531  * @param session session to use (or NULL)
2532  * @param ats performance data
2533  * @param ats_count number of entries in ats (excluding 0-termination)
2534  */
2535 void
2536 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2537                                const struct GNUNET_PeerIdentity *peer,
2538                                const struct GNUNET_HELLO_Address *address,
2539                                struct Session *session,
2540                                const struct GNUNET_ATS_Information *ats,
2541                                uint32_t ats_count)
2542 {
2543   const struct SessionConnectMessage *scm;
2544   struct BlackListCheckContext *bcc = NULL;
2545   struct NeighbourMapEntry *n;
2546
2547 #if DEBUG_TRANSPORT
2548   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2549               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2550 #endif
2551
2552   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2553   {
2554     GNUNET_break_op (0);
2555     return;
2556   }
2557
2558   scm = (const struct SessionConnectMessage *) message;
2559   GNUNET_break_op (ntohl (scm->reserved) == 0);
2560
2561   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2562
2563   n = lookup_neighbour (peer);
2564   if ((n != NULL) && (S_CONNECTED == n->state))
2565   {
2566     /* connected peer switches addresses */
2567     return;
2568   }
2569
2570
2571   /* we are not connected to this peer */
2572   /* do blacklist check */
2573   bcc =
2574       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2575                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2576   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2577   bcc->ats_count = ats_count + 1;
2578   bcc->address = GNUNET_HELLO_address_copy (address);
2579   bcc->session = session;
2580   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2581   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2582   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2583   bcc->ats[ats_count].value =
2584       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2585   GST_blacklist_test_allowed (peer, address->transport_name,
2586                               handle_connect_blacklist_cont, bcc);
2587 }
2588
2589
2590 /* end of file gnunet-service-transport_neighbours.c */