-fixing 1963 by changing warning to stats call
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
61
62 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
63
64 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
65
66
67 /**
68  * Entry in neighbours.
69  */
70 struct NeighbourMapEntry;
71
72 /**
73  * Message a peer sends to another to indicate its
74  * preference for communicating via a particular
75  * session (and the desire to establish a real
76  * connection).
77  */
78 struct SessionConnectMessage
79 {
80   /**
81    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
82    */
83   struct GNUNET_MessageHeader header;
84
85   /**
86    * Always zero.
87    */
88   uint32_t reserved GNUNET_PACKED;
89
90   /**
91    * Absolute time at the sender.  Only the most recent connect
92    * message implies which session is preferred by the sender.
93    */
94   struct GNUNET_TIME_AbsoluteNBO timestamp;
95
96 };
97
98
99 struct SessionDisconnectMessage
100 {
101   /**
102    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
103    */
104   struct GNUNET_MessageHeader header;
105
106   /**
107    * Always zero.
108    */
109   uint32_t reserved GNUNET_PACKED;
110
111   /**
112    * Purpose of the signature.  Extends over the timestamp.
113    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
114    */
115   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
116
117   /**
118    * Absolute time at the sender.  Only the most recent connect
119    * message implies which session is preferred by the sender.
120    */
121   struct GNUNET_TIME_AbsoluteNBO timestamp;
122
123   /**
124    * Public key of the sender.
125    */
126   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
127
128   /**
129    * Signature of the peer that sends us the disconnect.  Only
130    * valid if the timestamp is AFTER the timestamp from the
131    * corresponding 'CONNECT' message.
132    */
133   struct GNUNET_CRYPTO_RsaSignature signature;
134
135 };
136
137
138 /**
139  * For each neighbour we keep a list of messages
140  * that we still want to transmit to the neighbour.
141  */
142 struct MessageQueue
143 {
144
145   /**
146    * This is a doubly linked list.
147    */
148   struct MessageQueue *next;
149
150   /**
151    * This is a doubly linked list.
152    */
153   struct MessageQueue *prev;
154
155   /**
156    * Once this message is actively being transmitted, which
157    * neighbour is it associated with?
158    */
159   struct NeighbourMapEntry *n;
160
161   /**
162    * Function to call once we're done.
163    */
164   GST_NeighbourSendContinuation cont;
165
166   /**
167    * Closure for 'cont'
168    */
169   void *cont_cls;
170
171   /**
172    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
173    * stuck together in memory.  Allocated at the end of this struct.
174    */
175   const char *message_buf;
176
177   /**
178    * Size of the message buf
179    */
180   size_t message_buf_size;
181
182   /**
183    * At what time should we fail?
184    */
185   struct GNUNET_TIME_Absolute timeout;
186
187 };
188
189
190 enum State
191 {
192   /**
193    * fresh peer or completely disconnected 
194    */
195   S_NOT_CONNECTED,
196
197   /**
198    * sent CONNECT message to other peer, waiting for CONNECT_ACK 
199    */
200   S_CONNECT_SENT,
201
202   /**
203    * received CONNECT message to other peer, sending CONNECT_ACK 
204    */
205   S_CONNECT_RECV,
206
207   /**
208    * received ACK or payload 
209    */
210   S_CONNECTED,
211
212   /**
213    * connection ended, fast reconnect
214    */
215   S_FAST_RECONNECT,
216
217   /**
218    * Disconnect in progress 
219    */
220   S_DISCONNECT
221 };
222
223 enum Address_State
224 {
225   USED,
226   UNUSED,
227   FRESH,
228 };
229
230
231 /**
232  * Entry in neighbours.
233  */
234 struct NeighbourMapEntry
235 {
236
237   /**
238    * Head of list of messages we would like to send to this peer;
239    * must contain at most one message per client.
240    */
241   struct MessageQueue *messages_head;
242
243   /**
244    * Tail of list of messages we would like to send to this peer; must
245    * contain at most one message per client.
246    */
247   struct MessageQueue *messages_tail;
248
249   /**
250    * Are we currently trying to send a message? If so, which one?
251    */
252   struct MessageQueue *is_active;
253
254   /**
255    * Active session for communicating with the peer.
256    */
257   struct Session *session;
258
259   /**
260    * Address we currently use.
261    */
262   struct GNUNET_HELLO_Address *address;
263
264   /**
265    * Identity of this neighbour.
266    */
267   struct GNUNET_PeerIdentity id;
268
269   /**
270    * ID of task scheduled to run when this peer is about to
271    * time out (will free resources associated with the peer).
272    */
273   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
274
275   /**
276    * ID of task scheduled to send keepalives.
277    */
278   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
279
280   /**
281    * ID of task scheduled to run when we should try transmitting
282    * the head of the message queue.
283    */
284   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
285
286   /**
287    * Tracker for inbound bandwidth.
288    */
289   struct GNUNET_BANDWIDTH_Tracker in_tracker;
290
291   /**
292    * Inbound bandwidth from ATS, activated when connection is up
293    */
294   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
295
296   /**
297    * Inbound bandwidth from ATS, activated when connection is up
298    */
299   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
300
301   /**
302    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
303    */
304   struct GNUNET_TIME_Absolute connect_ts;
305
306   /**
307    * When did we sent the last keep-alive message?
308    */
309   struct GNUNET_TIME_Absolute keep_alive_sent;
310
311   /**
312    * Latest calculated latency value
313    */
314   struct GNUNET_TIME_Relative latency;
315
316   /**
317    * Timeout for ATS
318    * We asked ATS for a new address for this peer
319    */
320   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
321
322   /**
323    * Task the resets the peer state after due to an pending
324    * unsuccessful connection setup
325    */
326   GNUNET_SCHEDULER_TaskIdentifier state_reset;
327
328
329   /**
330    * How often has the other peer (recently) violated the inbound
331    * traffic limit?  Incremented by 10 per violation, decremented by 1
332    * per non-violation (for each time interval).
333    */
334   unsigned int quota_violation_count;
335
336
337   /**
338    * The current state of the peer
339    * Element of enum State
340    */
341   int state;
342
343   /**
344    * Did we sent an KEEP_ALIVE message and are we expecting a response?
345    */
346   int expect_latency_response;
347   int address_state;
348 };
349
350
351 /**
352  * All known neighbours and their HELLOs.
353  */
354 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
355
356 /**
357  * Closure for connect_notify_cb and disconnect_notify_cb
358  */
359 static void *callback_cls;
360
361 /**
362  * Function to call when we connected to a neighbour.
363  */
364 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
365
366 /**
367  * Function to call when we disconnected from a neighbour.
368  */
369 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
370
371 /**
372  * counter for connected neighbours
373  */
374 static int neighbours_connected;
375
376 /**
377  * Lookup a neighbour entry in the neighbours hash map.
378  *
379  * @param pid identity of the peer to look up
380  * @return the entry, NULL if there is no existing record
381  */
382 static struct NeighbourMapEntry *
383 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
384 {
385   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
386 }
387
388 #define change_state(n, state, ...) change (n, state, __LINE__)
389
390 static int
391 is_connecting (struct NeighbourMapEntry *n)
392 {
393   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
394     return GNUNET_YES;
395   return GNUNET_NO;
396 }
397
398 static int
399 is_connected (struct NeighbourMapEntry *n)
400 {
401   if (n->state == S_CONNECTED)
402     return GNUNET_YES;
403   return GNUNET_NO;
404 }
405
406 static int
407 is_disconnecting (struct NeighbourMapEntry *n)
408 {
409   if (n->state == S_DISCONNECT)
410     return GNUNET_YES;
411   return GNUNET_NO;
412 }
413
414 static const char *
415 print_state (int state)
416 {
417   switch (state)
418   {
419   case S_CONNECTED:
420     return "S_CONNECTED";
421     break;
422   case S_CONNECT_RECV:
423     return "S_CONNECT_RECV";
424     break;
425   case S_CONNECT_SENT:
426     return "S_CONNECT_SENT";
427     break;
428   case S_DISCONNECT:
429     return "S_DISCONNECT";
430     break;
431   case S_NOT_CONNECTED:
432     return "S_NOT_CONNECTED";
433     break;
434   case S_FAST_RECONNECT:
435     return "S_FAST_RECONNECT";
436     break;
437   default:
438     GNUNET_break (0);
439     break;
440   }
441   return NULL;
442 }
443
444 static int
445 change (struct NeighbourMapEntry *n, int state, int line);
446
447 static void
448 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
449
450
451 static void
452 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
453 {
454   struct NeighbourMapEntry *n = cls;
455   if (n == NULL)
456     return;
457
458   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
459   if (n->state == S_CONNECTED)
460     return;
461
462 #if DEBUG_TRANSPORT
463   GNUNET_STATISTICS_update (GST_stats,
464                             gettext_noop
465                             ("# failed connection attempts due to timeout"), 1,
466                             GNUNET_NO);
467 #endif
468
469   /* resetting state */
470   n->state = S_NOT_CONNECTED;
471
472   /* destroying address */
473   if (n->address != NULL)
474   {
475     GNUNET_assert (strlen(n->address->transport_name) > 0);
476     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
477   }
478
479   /* request new address */
480   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
481     GNUNET_SCHEDULER_cancel (n->ats_suggest);
482   n->ats_suggest =
483       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
484                                     n);
485   GNUNET_ATS_suggest_address (GST_ats, &n->id);
486 }
487
488 static int
489 change (struct NeighbourMapEntry *n, int state, int line)
490 {
491   /* allowed transitions */
492   int allowed = GNUNET_NO;
493
494   switch (n->state)
495   {
496   case S_NOT_CONNECTED:
497     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
498         (state == S_DISCONNECT))
499       allowed = GNUNET_YES;
500     break;
501   case S_CONNECT_RECV:
502     allowed = GNUNET_YES;
503     break;
504   case S_CONNECT_SENT:
505     allowed = GNUNET_YES;
506     break;
507   case S_CONNECTED:
508     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
509       allowed = GNUNET_YES;
510     break;
511   case S_DISCONNECT:
512     break;
513   case S_FAST_RECONNECT:
514     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
515       allowed = GNUNET_YES;
516     break;
517   default:
518     GNUNET_break (0);
519     break;
520   }
521   if (allowed == GNUNET_NO)
522   {
523     char *old = GNUNET_strdup (print_state (n->state));
524     char *new = GNUNET_strdup (print_state (state));
525     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
527                 new, line);
528     GNUNET_break (0);
529     GNUNET_free (old);
530     GNUNET_free (new);
531     return GNUNET_SYSERR;
532   }
533 #if DEBUG_TRANSPORT
534   {
535     char *old = GNUNET_strdup (print_state (n->state));
536     char *new = GNUNET_strdup (print_state (state));
537     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
538                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
539                 GNUNET_i2s (&n->id), n, old, new, line);
540     GNUNET_free (old);
541     GNUNET_free (new);
542   }
543 #endif
544   n->state = state;
545
546   switch (n->state)
547   {
548   case S_FAST_RECONNECT:
549   case S_CONNECT_RECV:
550   case S_CONNECT_SENT:
551     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
552       GNUNET_SCHEDULER_cancel (n->state_reset);
553     n->state_reset =
554       GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
555                                     n);   
556     break;
557   case S_CONNECTED:
558   case S_NOT_CONNECTED:
559   case S_DISCONNECT:
560     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
561     {
562 #if DEBUG_TRANSPORT
563       char *old = GNUNET_strdup (print_state (n->state));
564       char *new = GNUNET_strdup (print_state (state));
565       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
566                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
567                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
568                   old, new);
569       GNUNET_free (old);
570       GNUNET_free (new);
571 #endif
572       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
573       GNUNET_SCHEDULER_cancel (n->state_reset);
574       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
575     }
576     break;
577
578   default:
579     GNUNET_assert (0);
580   }
581   
582
583
584   return GNUNET_OK;
585 }
586
587 static ssize_t
588 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
589                   size_t msgbuf_size, uint32_t priority,
590                   struct GNUNET_TIME_Relative timeout, struct Session *session,
591                   const struct GNUNET_HELLO_Address *address,
592                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
593                   void *cont_cls)
594 {
595   struct GNUNET_TRANSPORT_PluginFunctions *papi;
596   size_t ret = GNUNET_SYSERR;
597
598   /* FIXME : ats returns an address with all values 0 */
599   if (address == NULL)
600   {
601     if (cont != NULL)
602       cont (cont_cls, target, GNUNET_SYSERR);
603     return GNUNET_SYSERR;
604   }
605
606   if ((session == NULL) && (address->address_length == 0))
607   {
608     if (cont != NULL)
609       cont (cont_cls, target, GNUNET_SYSERR);
610     return GNUNET_SYSERR;
611   }
612
613   papi = GST_plugins_find (address->transport_name);
614   if (papi == NULL)
615   {
616     if (cont != NULL)
617       cont (cont_cls, target, GNUNET_SYSERR);
618     return GNUNET_SYSERR;
619   }
620
621   ret =
622       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
623                   address->address, 
624                   address->address_length, GNUNET_YES, cont, cont_cls);
625
626   if (ret == -1)
627   {
628     if (cont != NULL)
629       cont (cont_cls, target, GNUNET_SYSERR);
630   }
631   return ret;
632 }
633
634 /**
635  * Task invoked to start a transmission to another peer.
636  *
637  * @param cls the 'struct NeighbourMapEntry'
638  * @param tc scheduler context
639  */
640 static void
641 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
642
643
644 /**
645  * We're done with our transmission attempt, continue processing.
646  *
647  * @param cls the 'struct MessageQueue' of the message
648  * @param receiver intended receiver
649  * @param success whether it worked or not
650  */
651 static void
652 transmit_send_continuation (void *cls,
653                             const struct GNUNET_PeerIdentity *receiver,
654                             int success)
655 {
656   struct MessageQueue *mq;
657   struct NeighbourMapEntry *n;
658   struct NeighbourMapEntry *tmp;
659
660   tmp = lookup_neighbour(receiver);
661
662   mq = cls;
663   n = mq->n;
664   if ((NULL != n) && (tmp != NULL) && (tmp == n))
665   {
666     GNUNET_assert (n->is_active == mq);
667     n->is_active = NULL;
668     if (success == GNUNET_YES)
669     {
670       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
671       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
672     }
673   }
674 #if DEBUG_TRANSPORT
675   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
676               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
677               (success == GNUNET_OK) ? "successful" : "FAILED");
678 #endif
679   if (NULL != mq->cont)
680     mq->cont (mq->cont_cls, success);
681   GNUNET_free (mq);
682 }
683
684
685 /**
686  * Check the ready list for the given neighbour and if a plugin is
687  * ready for transmission (and if we have a message), do so!
688  *
689  * @param n target peer for which to transmit
690  */
691 static void
692 try_transmission_to_peer (struct NeighbourMapEntry *n)
693 {
694   struct MessageQueue *mq;
695   struct GNUNET_TIME_Relative timeout;
696   ssize_t ret;
697
698   if (n->is_active != NULL)
699   {
700     GNUNET_break (0);
701     return;                     /* transmission already pending */
702   }
703   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
704   {
705     GNUNET_break (0);
706     return;                     /* currently waiting for bandwidth */
707   }
708   while (NULL != (mq = n->messages_head))
709   {
710     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
711     if (timeout.rel_value > 0)
712       break;
713     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
714     n->is_active = mq;
715     mq->n = n;
716     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
717   }
718   if (NULL == mq)
719     return;                     /* no more messages */
720
721   if (n->address == NULL)
722   {
723     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724                 "No address for peer `%s'\n",
725                 GNUNET_i2s (&n->id));
726     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
727     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
728     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
729     return;
730   }
731
732   if (GST_plugins_find (n->address->transport_name) == NULL)
733   {
734     GNUNET_break (0);
735     return;
736   }
737   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
738   n->is_active = mq;
739   mq->n = n;
740
741   if ((n->address->address_length == 0) && (n->session == NULL))
742   {
743     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
744                 "No address for peer `%s'\n",
745                 GNUNET_i2s (&n->id));
746     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
747     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
748     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
749     return;
750   }
751
752   ret =
753       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
754                         timeout, n->session, n->address, 
755                         GNUNET_YES, &transmit_send_continuation,
756                         mq);
757   if (ret == -1)
758   {
759     /* failure, but 'send' would not call continuation in this case,
760      * so we need to do it here! */
761     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
762   }
763
764 }
765
766
767 /**
768  * Task invoked to start a transmission to another peer.
769  *
770  * @param cls the 'struct NeighbourMapEntry'
771  * @param tc scheduler context
772  */
773 static void
774 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
775 {
776   struct NeighbourMapEntry *n = cls;
777
778   GNUNET_assert (NULL != lookup_neighbour (&n->id));
779   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
780   try_transmission_to_peer (n);
781 }
782
783
784 /**
785  * Initialize the neighbours subsystem.
786  *
787  * @param cls closure for callbacks
788  * @param connect_cb function to call if we connect to a peer
789  * @param disconnect_cb function to call if we disconnect from a peer
790  */
791 void
792 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
793                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
794 {
795   callback_cls = cls;
796   connect_notify_cb = connect_cb;
797   disconnect_notify_cb = disconnect_cb;
798   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
799 }
800
801
802 static void
803 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
804                       int result)
805 {
806 #if DEBUG_TRANSPORT
807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808               "Sending DISCONNECT message to peer `%4s': %i\n",
809               GNUNET_i2s (target), result);
810 #endif
811 }
812
813
814 static int
815 send_disconnect (const struct GNUNET_PeerIdentity *target,
816                  const struct GNUNET_HELLO_Address *address,
817                  struct Session *session)
818 {
819   size_t ret;
820   struct SessionDisconnectMessage disconnect_msg;
821
822 #if DEBUG_TRANSPORT
823   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
824               "Sending DISCONNECT message to peer `%4s'\n",
825               GNUNET_i2s (target));
826 #endif
827
828   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
829   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
830   disconnect_msg.reserved = htonl (0);
831   disconnect_msg.purpose.size =
832       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
833              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
834              sizeof (struct GNUNET_TIME_AbsoluteNBO));
835   disconnect_msg.purpose.purpose =
836       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
837   disconnect_msg.timestamp =
838       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
839   disconnect_msg.public_key = GST_my_public_key;
840   GNUNET_assert (GNUNET_OK ==
841                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
842                                          &disconnect_msg.purpose,
843                                          &disconnect_msg.signature));
844
845   ret =
846       send_with_plugin (target, (const char *) &disconnect_msg,
847                         sizeof (disconnect_msg), UINT32_MAX,
848                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
849                         GNUNET_YES,
850                         &send_disconnect_cont, NULL);
851
852   if (ret == GNUNET_SYSERR)
853     return GNUNET_SYSERR;
854
855   GNUNET_STATISTICS_update (GST_stats,
856                             gettext_noop
857                             ("# peers disconnected due to external request"), 1,
858                             GNUNET_NO);
859   return GNUNET_OK;
860 }
861
862
863 /**
864  * Disconnect from the given neighbour, clean up the record.
865  *
866  * @param n neighbour to disconnect from
867  */
868 static void
869 disconnect_neighbour (struct NeighbourMapEntry *n)
870 {
871   struct MessageQueue *mq;
872   int previous_state;
873
874   previous_state = n->state;
875
876   if (is_disconnecting (n))
877     return;
878
879
880   /* send DISCONNECT MESSAGE */
881   if ((previous_state == S_CONNECTED) || is_connecting (n))
882   {
883     if (GNUNET_OK ==
884         send_disconnect (&n->id, n->address,
885                          n->session))
886       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
887                   GNUNET_i2s (&n->id));
888     else
889       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890                   "Could not send DISCONNECT_MSG to `%s'\n",
891                   GNUNET_i2s (&n->id));
892   }
893
894   change_state (n, S_DISCONNECT);
895
896   if (previous_state == S_CONNECTED)
897   {
898     GNUNET_assert (NULL != n->address);
899     if (n->address_state == USED)
900     {
901       GST_validation_set_address_use (&n->id,
902                                       n->address,
903                                       n->session,
904                                       GNUNET_NO);
905
906       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
907       n->address_state = UNUSED;
908     }
909   }
910
911   if (n->address != NULL)
912   {
913     struct GNUNET_TRANSPORT_PluginFunctions *papi;
914     papi = GST_plugins_find (n->address->transport_name);
915     if (papi != NULL)
916       papi->disconnect (papi->cls, &n->id);
917   }
918   while (NULL != (mq = n->messages_head))
919   {
920     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
921     if (NULL != mq->cont)
922       mq->cont (mq->cont_cls, GNUNET_SYSERR);
923     GNUNET_free (mq);
924   }
925   if (NULL != n->is_active)
926   {
927     n->is_active->n = NULL;
928     n->is_active = NULL;
929   }
930
931   switch (previous_state) {
932     case S_CONNECTED:
933 //      GNUNET_assert (neighbours_connected > 0);
934       neighbours_connected--;
935       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
936       GNUNET_SCHEDULER_cancel (n->keepalive_task);
937       n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
938       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
939                                 GNUNET_NO);
940       disconnect_notify_cb (callback_cls, &n->id);
941       break;
942     case S_FAST_RECONNECT:
943       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
944                                 GNUNET_NO);
945       GNUNET_STATISTICS_update (GST_stats,
946                                 gettext_noop
947                                 ("# fast reconnects failed"),
948                                 1, GNUNET_NO);
949       disconnect_notify_cb (callback_cls, &n->id);
950     default:
951       break;
952   }
953
954   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
955
956   GNUNET_assert (GNUNET_YES ==
957                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
958                                                        &n->id.hashPubKey, n));
959   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
960   {
961     GNUNET_SCHEDULER_cancel (n->ats_suggest);
962     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
963   }
964   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
965   {
966     GNUNET_SCHEDULER_cancel (n->timeout_task);
967     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
968   }
969   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
970   {
971     GNUNET_SCHEDULER_cancel (n->transmission_task);
972     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
973   }
974   if (NULL != n->address)
975   {
976     GNUNET_HELLO_address_free (n->address);
977     n->address = NULL;
978   }
979   n->session = NULL;
980   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
981               GNUNET_i2s (&n->id), n);
982   GNUNET_free (n);
983 }
984
985
986 /**
987  * Peer has been idle for too long. Disconnect.
988  *
989  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
990  * @param tc scheduler context
991  */
992 static void
993 neighbour_timeout_task (void *cls,
994                         const struct GNUNET_SCHEDULER_TaskContext *tc)
995 {
996   struct NeighbourMapEntry *n = cls;
997
998   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
999
1000   GNUNET_STATISTICS_update (GST_stats,
1001                             gettext_noop
1002                             ("# peers disconnected due to timeout"), 1,
1003                             GNUNET_NO);
1004   disconnect_neighbour (n);
1005 }
1006
1007
1008 /**
1009  * Send another keepalive message.
1010  *
1011  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1012  * @param tc scheduler context
1013  */
1014 static void
1015 neighbour_keepalive_task (void *cls,
1016                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1017 {
1018   struct NeighbourMapEntry *n = cls;
1019   struct GNUNET_MessageHeader m;
1020   int ret;
1021
1022   n->keepalive_task =
1023       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1024                                     &neighbour_keepalive_task, n);
1025
1026   GNUNET_assert (S_CONNECTED == n->state);
1027   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1028                             GNUNET_NO);
1029   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1030   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1031
1032
1033   ret = send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1034                     UINT32_MAX /* priority */ ,
1035                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address, 
1036                     GNUNET_YES, NULL, NULL);
1037
1038   n->expect_latency_response = GNUNET_NO;
1039   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero();
1040   if (ret != GNUNET_SYSERR)
1041   {
1042     n->expect_latency_response = GNUNET_YES;
1043     n->keep_alive_sent = GNUNET_TIME_absolute_get();
1044   }
1045
1046 }
1047
1048
1049 /**
1050  * Disconnect from the given neighbour.
1051  *
1052  * @param cls unused
1053  * @param key hash of neighbour's public key (not used)
1054  * @param value the 'struct NeighbourMapEntry' of the neighbour
1055  */
1056 static int
1057 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1058 {
1059   struct NeighbourMapEntry *n = value;
1060
1061 #if DEBUG_TRANSPORT
1062   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1063               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1064 #endif
1065   if (S_CONNECTED == n->state)
1066     GNUNET_STATISTICS_update (GST_stats,
1067                               gettext_noop
1068                               ("# peers disconnected due to global disconnect"),
1069                               1, GNUNET_NO);
1070   disconnect_neighbour (n);
1071   return GNUNET_OK;
1072 }
1073
1074
1075 static void
1076 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1077 {
1078   struct NeighbourMapEntry *n = cls;
1079
1080   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1081
1082   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083               "ATS did not suggested address to connect to peer `%s'\n",
1084               GNUNET_i2s (&n->id));
1085
1086   disconnect_neighbour (n);
1087 }
1088
1089 /**
1090  * Cleanup the neighbours subsystem.
1091  */
1092 void
1093 GST_neighbours_stop ()
1094 {
1095   // This can happen during shutdown
1096   if (neighbours == NULL)
1097   {
1098     return;
1099   }
1100
1101   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1102                                          NULL);
1103   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1104 //  GNUNET_assert (neighbours_connected == 0);
1105   neighbours = NULL;
1106   callback_cls = NULL;
1107   connect_notify_cb = NULL;
1108   disconnect_notify_cb = NULL;
1109 }
1110
1111 struct ContinutionContext
1112 {
1113   struct GNUNET_HELLO_Address *address;
1114
1115   struct Session *session;
1116 };
1117
1118 static void send_outbound_quota (const struct GNUNET_PeerIdentity *target, struct GNUNET_BANDWIDTH_Value32NBO quota)
1119 {
1120    struct QuotaSetMessage q_msg;
1121 #if DEBUG_TRANSPORT
1122     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1123                 "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1124                 ntohl (quota.value__), GNUNET_i2s (target));
1125 #endif
1126     q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1127     q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1128     q_msg.quota = quota;
1129     q_msg.peer = (*target);
1130     GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1131 }
1132
1133 /**
1134  * We tried to send a SESSION_CONNECT message to another peer.  If this
1135  * succeeded, we change the state.  If it failed, we should tell
1136  * ATS to not use this address anymore (until it is re-validated).
1137  *
1138  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1139  * @param success GNUNET_OK on success
1140  */
1141 static void
1142 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1143                            int success)
1144 {
1145   struct ContinutionContext * cc = cls;
1146   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1147   
1148   if (GNUNET_YES != success)
1149   {
1150     GNUNET_assert (strlen(cc->address->transport_name) > 0);
1151     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1152   }
1153   if ( (NULL == neighbours) ||
1154        (NULL == n) ||
1155        (n->state == S_DISCONNECT))
1156   {
1157     GNUNET_HELLO_address_free (cc->address);
1158     GNUNET_free (cc);
1159     return;
1160   }
1161
1162   if ((GNUNET_YES == success) &&
1163      ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1164   {
1165     change_state (n, S_CONNECT_SENT);
1166     GNUNET_HELLO_address_free (cc->address);
1167     GNUNET_free (cc);
1168     return;
1169   }
1170
1171   if ((GNUNET_NO == success) &&
1172      ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1173   {
1174 #if DEBUG_TRANSPORT
1175     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1176                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1177                 GNUNET_i2s (&n->id),
1178                 GST_plugins_a2s (n->address),
1179                 n->session);
1180 #endif
1181     change_state (n, S_NOT_CONNECTED);
1182     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1183       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1184     n->ats_suggest =
1185       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1186                                     n);
1187     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1188   }
1189   GNUNET_HELLO_address_free (cc->address);
1190   GNUNET_free (cc);
1191 }
1192
1193 /**
1194  * We tried to switch addresses with an peer already connected. If it failed,
1195  * we should tell ATS to not use this address anymore (until it is re-validated).
1196  *
1197  * @param cls the 'struct NeighbourMapEntry'
1198  * @param success GNUNET_OK on success
1199  */
1200 static void
1201 send_switch_address_continuation (void *cls,
1202                                   const struct GNUNET_PeerIdentity *target,
1203                                   int success)
1204 {
1205   struct ContinutionContext * cc = cls;
1206   struct NeighbourMapEntry *n;
1207
1208   if (neighbours == NULL)
1209   {
1210     GNUNET_HELLO_address_free (cc->address);
1211     GNUNET_free (cc);
1212     return;                     /* neighbour is going away */
1213   }
1214
1215   n = lookup_neighbour(&cc->address->peer);
1216   if ((n == NULL) || (is_disconnecting (n)))
1217   {
1218     GNUNET_HELLO_address_free (cc->address);
1219     GNUNET_free (cc);
1220     return;                     /* neighbour is going away */
1221   }
1222
1223   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1224   if (GNUNET_YES != success)
1225   {
1226 #if DEBUG_TRANSPORT
1227     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1228                 "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n",
1229                 GNUNET_i2s (&n->id), 
1230                 GST_plugins_a2s (n->address), n->session);
1231 #endif
1232     GNUNET_assert (strlen(cc->address->transport_name) > 0);
1233     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1234
1235     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1236       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1237     n->ats_suggest =
1238         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1239                                       n);
1240     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1241     GNUNET_HELLO_address_free (cc->address);
1242     GNUNET_free (cc);
1243     return;
1244   }
1245   /* Tell ATS that switching addresses was successful */
1246   switch (n->state) {
1247     case S_CONNECTED:
1248       if (n->address_state == FRESH)
1249       {
1250           GST_validation_set_address_use (&n->id,
1251                     cc->address,
1252                     cc->session,
1253                     GNUNET_YES);
1254           GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1255           GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1256           n->address_state = USED;
1257       }
1258       break;
1259     case S_FAST_RECONNECT:
1260 #if DEBUG_TRANSPORT
1261       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262                   "Successful fast reconnect to peer `%s'\n", GNUNET_i2s (&n->id));
1263 #endif
1264       change_state (n, S_CONNECTED);
1265       neighbours_connected++;
1266       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1267                                 GNUNET_NO);
1268
1269       if (n->address_state == FRESH)
1270       {
1271           GST_validation_set_address_use (&n->id,
1272                     cc->address,
1273                     cc->session,
1274                     GNUNET_YES);
1275           GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1276           GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1277           n->address_state = USED;
1278       }
1279
1280       if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1281             n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1282
1283       /* Updating quotas */
1284       GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1285       send_outbound_quota(target, n->bandwidth_out);
1286
1287     default:
1288       break;
1289   }
1290   GNUNET_HELLO_address_free (cc->address);
1291   GNUNET_free (cc);
1292 }
1293
1294 /**
1295  * We tried to send a SESSION_CONNECT message to another peer.  If this
1296  * succeeded, we change the state.  If it failed, we should tell
1297  * ATS to not use this address anymore (until it is re-validated).
1298  *
1299  * @param cls the 'struct NeighbourMapEntry'
1300  * @param success GNUNET_OK on success
1301  */
1302 static void
1303 send_connect_ack_continuation (void *cls,
1304                                const struct GNUNET_PeerIdentity *target,
1305                                int success)
1306 {
1307   struct ContinutionContext * cc = cls;
1308   struct NeighbourMapEntry *n;
1309
1310   if (neighbours == NULL)
1311   {
1312     GNUNET_HELLO_address_free (cc->address);
1313     GNUNET_free (cc);
1314     return;                     /* neighbour is going away */
1315   }
1316
1317   n = lookup_neighbour(&cc->address->peer);
1318   if ((n == NULL) || (is_disconnecting (n)))
1319   {
1320     GNUNET_HELLO_address_free (cc->address);
1321     GNUNET_free (cc);
1322     return;                     /* neighbour is going away */
1323   }
1324
1325   if (GNUNET_YES == success)
1326   {
1327     GNUNET_HELLO_address_free (cc->address);
1328     GNUNET_free (cc);
1329     return;                     /* sending successful */
1330   }
1331
1332   /* sending failed, ask for next address  */
1333 #if DEBUG_TRANSPORT
1334   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1335               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1336               GNUNET_i2s (&n->id), 
1337               GST_plugins_a2s (n->address),
1338               n->session);
1339 #endif
1340   change_state (n, S_NOT_CONNECTED);
1341   GNUNET_assert (strlen(cc->address->transport_name) > 0);
1342   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1343
1344   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1345     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1346   n->ats_suggest =
1347       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1348                                     n);
1349   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1350   GNUNET_HELLO_address_free (cc->address);
1351   GNUNET_free (cc);
1352 }
1353
1354 /**
1355  * For an existing neighbour record, set the active connection to
1356  * the given address.
1357  *
1358  * @param peer identity of the peer to switch the address for
1359  * @param address address of the other peer, NULL if other peer
1360  *                       connected to us
1361  * @param session session to use (or NULL)
1362  * @param ats performance data
1363  * @param ats_count number of entries in ats
1364  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1365  *         connection is not up (yet)
1366  */
1367 int
1368 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1369                                        const struct GNUNET_HELLO_Address *address,
1370                                        struct Session *session,
1371                                        const struct GNUNET_ATS_Information *ats,
1372                                        uint32_t ats_count,
1373                                        struct GNUNET_BANDWIDTH_Value32NBO
1374                                        bandwidth_in,
1375                                        struct GNUNET_BANDWIDTH_Value32NBO
1376                                        bandwidth_out)
1377 {
1378   struct NeighbourMapEntry *n;
1379   struct SessionConnectMessage connect_msg;
1380   struct ContinutionContext * cc;
1381   size_t msg_len;
1382   size_t ret;
1383
1384   if (neighbours == NULL)
1385   {
1386     /* This can happen during shutdown */
1387     return GNUNET_NO;
1388   }
1389   n = lookup_neighbour (peer);
1390   if (NULL == n)
1391     return GNUNET_NO;
1392   if (n->state == S_DISCONNECT)
1393   {
1394     /* We are disconnecting, nothing to do here */
1395     return GNUNET_NO;
1396   }
1397   GNUNET_assert (address->transport_name != NULL);
1398   if ( (session == NULL) && (0 == address->address_length) )
1399   {
1400     GNUNET_break_op (0);
1401     /* FIXME: is this actually possible? When does this happen? */
1402     if (strlen(address->transport_name) > 0)
1403       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1404     GNUNET_ATS_suggest_address (GST_ats, peer);
1405     return GNUNET_NO;
1406   }
1407
1408   /* checks successful and neighbour != NULL */
1409 #if DEBUG_TRANSPORT
1410   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1411               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1412               GST_plugins_a2s (address),
1413               session,
1414               GNUNET_i2s (peer),
1415               print_state(n->state));
1416 #endif
1417   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1418   {
1419     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1420     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1421   }
1422   /* do not switch addresses just update quotas */
1423   if ( (n->state == S_CONNECTED) && 
1424        (NULL != n->address) &&
1425        (0 == GNUNET_HELLO_address_cmp (address,
1426                                        n->address)) &&
1427        (n->session == session) )
1428   {
1429     n->bandwidth_in = bandwidth_in;
1430     n->bandwidth_out = bandwidth_out;
1431     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1432     send_outbound_quota(peer, n->bandwidth_out);
1433     return GNUNET_NO;    
1434   }
1435   if (n->state == S_CONNECTED)
1436   {
1437     /* mark old address as no longer used */
1438     GNUNET_assert (NULL != n->address);
1439     if (n->address_state == USED)
1440     {
1441       GST_validation_set_address_use (&n->id,
1442                                       n->address,
1443                                       n->session,
1444                                       GNUNET_NO);
1445       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1446       n->address_state = UNUSED;
1447     }
1448
1449   }
1450
1451   /* set new address */
1452   if (NULL != n->address)
1453     GNUNET_HELLO_address_free (n->address);
1454   n->address = GNUNET_HELLO_address_copy (address);
1455   n->address_state = FRESH;
1456   n->session = session;
1457   n->bandwidth_in = bandwidth_in;
1458   n->bandwidth_out = bandwidth_out;
1459   GNUNET_SCHEDULER_cancel (n->timeout_task);
1460   n->timeout_task =
1461       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1462                                     &neighbour_timeout_task, n);
1463   switch (n->state)
1464   {
1465   case S_NOT_CONNECTED:  
1466   case S_CONNECT_SENT:
1467     msg_len = sizeof (struct SessionConnectMessage);
1468     connect_msg.header.size = htons (msg_len);
1469     connect_msg.header.type =
1470         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1471     connect_msg.reserved = htonl (0);
1472     connect_msg.timestamp =
1473         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1474
1475     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1476     cc->session = session;
1477     cc->address = GNUNET_HELLO_address_copy (address);
1478     ret =
1479         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1480                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1481                           address, GNUNET_YES,
1482                           &send_connect_continuation, 
1483                           cc);
1484     return GNUNET_NO;
1485   case S_CONNECT_RECV:
1486     /* We received a CONNECT message and asked ATS for an address */
1487     msg_len = sizeof (struct SessionConnectMessage);
1488     connect_msg.header.size = htons (msg_len);
1489     connect_msg.header.type =
1490         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1491     connect_msg.reserved = htonl (0);
1492     connect_msg.timestamp =
1493         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1494     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1495     cc->session = session;
1496     cc->address = GNUNET_HELLO_address_copy (address);
1497     ret =
1498         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1499                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1500                           address, GNUNET_YES,
1501                           &send_connect_ack_continuation, cc);
1502     return GNUNET_NO;
1503   case S_CONNECTED:
1504   case S_FAST_RECONNECT:
1505     /* connected peer is switching addresses or tries fast reconnect*/
1506     msg_len = sizeof (struct SessionConnectMessage);
1507     connect_msg.header.size = htons (msg_len);
1508     connect_msg.header.type =
1509         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1510     connect_msg.reserved = htonl (0);
1511     connect_msg.timestamp =
1512         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1513     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1514     cc->session = session;
1515     cc->address = GNUNET_HELLO_address_copy (address);
1516     ret =
1517         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1518                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1519                           address, GNUNET_YES,
1520                           &send_switch_address_continuation, cc);
1521     if (ret == GNUNET_SYSERR)
1522     {
1523       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1524                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1525                   GNUNET_i2s (peer), 
1526                   GST_plugins_a2s (address), session);
1527     }
1528     return GNUNET_NO;
1529   default:
1530     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1531                 "Invalid connection state to switch addresses %u \n", n->state);
1532     GNUNET_break_op (0);
1533     return GNUNET_NO;
1534   }
1535 }
1536
1537
1538 /**
1539  * Obtain current latency information for the given neighbour.
1540  *
1541  * @param peer 
1542  * @return observed latency of the address, FOREVER if the address was
1543  *         never successfully validated
1544  */
1545 struct GNUNET_TIME_Relative
1546 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1547 {
1548   struct NeighbourMapEntry *n;
1549
1550   n = lookup_neighbour (peer);
1551   if ( (NULL == n) ||
1552        ( (n->address == NULL) && (n->session == NULL) ) )
1553     return GNUNET_TIME_UNIT_FOREVER_REL;
1554
1555   return n->latency;
1556 }
1557
1558
1559 /**
1560  * Create an entry in the neighbour map for the given peer
1561  *
1562  * @param peer peer to create an entry for
1563  * @return new neighbour map entry
1564  */
1565 static struct NeighbourMapEntry *
1566 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1567 {
1568   struct NeighbourMapEntry *n;
1569
1570 #if DEBUG_TRANSPORT
1571   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1572               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1573 #endif
1574   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1575   n->id = *peer;
1576   n->state = S_NOT_CONNECTED;
1577   n->latency = GNUNET_TIME_relative_get_forever();
1578   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1579                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1580                                  MAX_BANDWIDTH_CARRY_S);
1581   n->timeout_task =
1582       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1583                                     &neighbour_timeout_task, n);
1584   GNUNET_assert (GNUNET_OK ==
1585                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1586                                                     &n->id.hashPubKey, n,
1587                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1588   return n;
1589 }
1590
1591
1592 /**
1593  * Try to create a connection to the given target (eventually).
1594  *
1595  * @param target peer to try to connect to
1596  */
1597 void
1598 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1599 {
1600   struct NeighbourMapEntry *n;
1601
1602   // This can happen during shutdown
1603   if (neighbours == NULL)
1604   {
1605     return;
1606   }
1607 #if DEBUG_TRANSPORT
1608   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1609               GNUNET_i2s (target));
1610 #endif
1611   if (0 ==
1612       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1613   {
1614     /* my own hello */
1615     return;
1616   }
1617   n = lookup_neighbour (target);
1618
1619   if (NULL != n)
1620   {
1621     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1622       return;                   /* already connecting or connected */
1623     if (is_disconnecting (n))
1624       change_state (n, S_NOT_CONNECTED);
1625   }
1626
1627
1628   if (n == NULL)
1629     n = setup_neighbour (target);
1630 #if DEBUG_TRANSPORT
1631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1632               "Asking ATS for suggested address to connect to peer `%s'\n",
1633               GNUNET_i2s (&n->id));
1634 #endif
1635
1636   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1637 }
1638
1639 /**
1640  * Test if we're connected to the given peer.
1641  *
1642  * @param target peer to test
1643  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1644  */
1645 int
1646 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1647 {
1648   struct NeighbourMapEntry *n;
1649
1650   // This can happen during shutdown
1651   if (neighbours == NULL)
1652   {
1653     return GNUNET_NO;
1654   }
1655
1656   n = lookup_neighbour (target);
1657
1658   if ((NULL == n) || (S_CONNECTED != n->state))
1659     return GNUNET_NO;           /* not connected */
1660   return GNUNET_YES;
1661 }
1662
1663 /**
1664  * A session was terminated. Take note.
1665  *
1666  * @param peer identity of the peer where the session died
1667  * @param session session that is gone
1668  */
1669 void
1670 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1671                                    struct Session *session)
1672 {
1673   struct NeighbourMapEntry *n;
1674
1675   if (neighbours == NULL)
1676   {
1677     /* This can happen during shutdown */
1678     return;
1679   }
1680
1681 #if DEBUG_TRANSPORT
1682   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Session %X to peer `%s' ended \n",
1683               session, GNUNET_i2s (peer));
1684 #endif
1685
1686   n = lookup_neighbour (peer);
1687   if (NULL == n)
1688     return;
1689   if (session != n->session)
1690     return;                     /* doesn't affect us */
1691   if (n->state == S_CONNECTED)
1692   {
1693     if (n->address_state == USED)
1694     {
1695       GST_validation_set_address_use (&n->id,
1696                                       n->address,
1697                                       n->session,
1698                                       GNUNET_NO);
1699       GNUNET_ATS_address_in_use (GST_ats,n->address, n->session, GNUNET_NO);
1700       n->address_state = UNUSED;
1701     }
1702   }
1703
1704   if (NULL != n->address)
1705   {
1706     GNUNET_HELLO_address_free (n->address);
1707     n->address = NULL;
1708   }
1709   n->session = NULL;
1710   
1711   /* not connected anymore anyway, shouldn't matter */
1712   if (S_CONNECTED != n->state)
1713     return;
1714
1715   /* connected, try fast reconnect */
1716 #if DEBUG_TRANSPORT
1717   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1718               "Trying fast reconnect to peer `%s'\n", GNUNET_i2s (peer));
1719 #endif
1720   change_state (n, S_FAST_RECONNECT);
1721   GNUNET_assert (neighbours_connected > 0);
1722   neighbours_connected--;
1723
1724   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1725   {
1726     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1727     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1728   }
1729
1730   /* We are connected, so ask ATS to switch addresses */
1731   GNUNET_SCHEDULER_cancel (n->timeout_task);
1732   n->timeout_task =
1733       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1734                                     &neighbour_timeout_task, n);
1735   /* try QUICKLY to re-establish a connection, reduce timeout! */
1736   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1737     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1738   n->ats_suggest =
1739       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1740                                     n);
1741   GNUNET_ATS_suggest_address (GST_ats, peer);
1742 }
1743
1744
1745 /**
1746  * Transmit a message to the given target using the active connection.
1747  *
1748  * @param target destination
1749  * @param msg message to send
1750  * @param msg_size number of bytes in msg
1751  * @param timeout when to fail with timeout
1752  * @param cont function to call when done
1753  * @param cont_cls closure for 'cont'
1754  */
1755 void
1756 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1757                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1758                      GST_NeighbourSendContinuation cont, void *cont_cls)
1759 {
1760   struct NeighbourMapEntry *n;
1761   struct MessageQueue *mq;
1762
1763   // This can happen during shutdown
1764   if (neighbours == NULL)
1765   {
1766     return;
1767   }
1768
1769   n = lookup_neighbour (target);
1770   if ((n == NULL) || (!is_connected (n)))
1771   {
1772     GNUNET_STATISTICS_update (GST_stats,
1773                               gettext_noop
1774                               ("# messages not sent (no such peer or not connected)"),
1775                               1, GNUNET_NO);
1776 #if DEBUG_TRANSPORT
1777     if (n == NULL)
1778       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1779                   "Could not send message to peer `%s': unknown neighbour",
1780                   GNUNET_i2s (target));
1781     else if (!is_connected (n))
1782       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783                   "Could not send message to peer `%s': not connected\n",
1784                   GNUNET_i2s (target));
1785 #endif
1786     if (NULL != cont)
1787       cont (cont_cls, GNUNET_SYSERR);
1788     return;
1789   }
1790
1791   if ((n->session == NULL) && (n->address == NULL) )
1792   {
1793     GNUNET_STATISTICS_update (GST_stats,
1794                               gettext_noop
1795                               ("# messages not sent (no such peer or not connected)"),
1796                               1, GNUNET_NO);
1797 #if DEBUG_TRANSPORT
1798     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1799                 "Could not send message to peer `%s': no address available\n",
1800                 GNUNET_i2s (target));
1801 #endif
1802
1803     if (NULL != cont)
1804       cont (cont_cls, GNUNET_SYSERR);
1805     return;
1806   }
1807
1808   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1809   GNUNET_STATISTICS_update (GST_stats,
1810                             gettext_noop
1811                             ("# bytes in message queue for other peers"),
1812                             msg_size, GNUNET_NO);
1813   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1814   mq->cont = cont;
1815   mq->cont_cls = cont_cls;
1816   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1817   memcpy (&mq[1], msg, msg_size);
1818   mq->message_buf = (const char *) &mq[1];
1819   mq->message_buf_size = msg_size;
1820   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1821   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1822
1823   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1824       (NULL == n->is_active))
1825     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1826 }
1827
1828
1829 /**
1830  * We have received a message from the given sender.  How long should
1831  * we delay before receiving more?  (Also used to keep the peer marked
1832  * as live).
1833  *
1834  * @param sender sender of the message
1835  * @param size size of the message
1836  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1837  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1838  *                   GNUNET_SYSERR if the connection is not fully up yet
1839  * @return how long to wait before reading more from this sender
1840  */
1841 struct GNUNET_TIME_Relative
1842 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1843                                         *sender, ssize_t size, int *do_forward)
1844 {
1845   struct NeighbourMapEntry *n;
1846   struct GNUNET_TIME_Relative ret;
1847
1848   // This can happen during shutdown
1849   if (neighbours == NULL)
1850   {
1851     return GNUNET_TIME_UNIT_FOREVER_REL;
1852   }
1853
1854   n = lookup_neighbour (sender);
1855   if (n == NULL)
1856   {
1857     GST_neighbours_try_connect (sender);
1858     n = lookup_neighbour (sender);
1859     if (NULL == n)
1860     {
1861       GNUNET_STATISTICS_update (GST_stats,
1862                                 gettext_noop
1863                                 ("# messages discarded due to lack of neighbour record"),
1864                                 1, GNUNET_NO);
1865       *do_forward = GNUNET_NO;
1866       return GNUNET_TIME_UNIT_ZERO;
1867     }
1868   }
1869   if (!is_connected (n))
1870   {
1871     *do_forward = GNUNET_SYSERR;
1872     return GNUNET_TIME_UNIT_ZERO;
1873   }
1874   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1875   {
1876     n->quota_violation_count++;
1877 #if DEBUG_TRANSPORT
1878     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1879                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1880                 n->in_tracker.available_bytes_per_s__,
1881                 n->quota_violation_count);
1882 #endif
1883     /* Discount 32k per violation */
1884     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1885   }
1886   else
1887   {
1888     if (n->quota_violation_count > 0)
1889     {
1890       /* try to add 32k back */
1891       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1892       n->quota_violation_count--;
1893     }
1894   }
1895   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1896   {
1897     GNUNET_STATISTICS_update (GST_stats,
1898                               gettext_noop
1899                               ("# bandwidth quota violations by other peers"),
1900                               1, GNUNET_NO);
1901     *do_forward = GNUNET_NO;
1902     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1903   }
1904   *do_forward = GNUNET_YES;
1905   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1906   if (ret.rel_value > 0)
1907   {
1908 #if DEBUG_TRANSPORT
1909     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1910                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1911                 (unsigned long long) n->in_tracker.
1912                 consumption_since_last_update__,
1913                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1914                 (unsigned long long) ret.rel_value);
1915 #endif
1916     GNUNET_STATISTICS_update (GST_stats,
1917                               gettext_noop ("# ms throttling suggested"),
1918                               (int64_t) ret.rel_value, GNUNET_NO);
1919   }
1920   return ret;
1921 }
1922
1923
1924 /**
1925  * Keep the connection to the given neighbour alive longer,
1926  * we received a KEEPALIVE (or equivalent).
1927  *
1928  * @param neighbour neighbour to keep alive
1929  */
1930 void
1931 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1932 {
1933   struct NeighbourMapEntry *n;
1934
1935   // This can happen during shutdown
1936   if (neighbours == NULL)
1937   {
1938     return;
1939   }
1940
1941   n = lookup_neighbour (neighbour);
1942   if (NULL == n)
1943   {
1944     GNUNET_STATISTICS_update (GST_stats,
1945                               gettext_noop
1946                               ("# KEEPALIVE messages discarded (not connected)"),
1947                               1, GNUNET_NO);
1948     return;
1949   }
1950   GNUNET_SCHEDULER_cancel (n->timeout_task);
1951   n->timeout_task =
1952       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1953                                     &neighbour_timeout_task, n);
1954
1955   /* send reply to measure latency */
1956   if (S_CONNECTED != n->state)
1957     return;
1958
1959   struct GNUNET_MessageHeader m;
1960   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1961   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
1962
1963   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1964                     UINT32_MAX /* priority */ ,
1965                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1966                     GNUNET_YES, NULL, NULL);
1967 }
1968
1969 /**
1970  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
1971  * to this peer
1972  *
1973  * @param neighbour neighbour to keep alive
1974  */
1975 void
1976 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
1977                                    const struct GNUNET_ATS_Information * ats,
1978                                    uint32_t ats_count)
1979 {
1980   struct NeighbourMapEntry *n;
1981   struct GNUNET_ATS_Information * ats_new;
1982   uint32_t latency;
1983
1984   if (neighbours == NULL)
1985   {
1986     // This can happen during shutdown
1987     return;
1988   }
1989
1990   n = lookup_neighbour (neighbour);
1991   if (NULL == n)
1992   {
1993     GNUNET_STATISTICS_update (GST_stats,
1994                               gettext_noop
1995                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
1996                               1, GNUNET_NO);
1997     return;
1998   }
1999   if (n->expect_latency_response != GNUNET_YES)
2000   {
2001     GNUNET_STATISTICS_update (GST_stats,
2002                               gettext_noop
2003                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2004                               1, GNUNET_NO);
2005     return;
2006   }
2007   n->expect_latency_response = GNUNET_NO;
2008
2009   GNUNET_assert (n->keep_alive_sent.abs_value != GNUNET_TIME_absolute_get_zero().abs_value);
2010   n->latency = GNUNET_TIME_absolute_get_difference(n->keep_alive_sent, GNUNET_TIME_absolute_get());
2011 #if DEBUG_TRANSPORT
2012   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2013               "Latency for peer `%s' is %llu ms\n",
2014               GNUNET_i2s (&n->id), n->latency.rel_value);
2015 #endif
2016
2017
2018   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever().rel_value)
2019   {
2020     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2021   }
2022   else
2023   {
2024     ats_new = GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2025     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2026
2027     /* add latency */
2028     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2029     if (n->latency.rel_value > UINT32_MAX)
2030       latency = UINT32_MAX;
2031     else
2032       latency = n->latency.rel_value;
2033     ats_new[ats_count].value = htonl (latency);
2034
2035     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new, ats_count + 1);
2036     GNUNET_free (ats_new);
2037   }
2038 }
2039
2040
2041 /**
2042  * Change the incoming quota for the given peer.
2043  *
2044  * @param neighbour identity of peer to change qutoa for
2045  * @param quota new quota
2046  */
2047 void
2048 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2049                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2050 {
2051   struct NeighbourMapEntry *n;
2052
2053   // This can happen during shutdown
2054   if (neighbours == NULL)
2055   {
2056     return;
2057   }
2058
2059   n = lookup_neighbour (neighbour);
2060   if (n == NULL)
2061   {
2062     GNUNET_STATISTICS_update (GST_stats,
2063                               gettext_noop
2064                               ("# SET QUOTA messages ignored (no such peer)"),
2065                               1, GNUNET_NO);
2066     return;
2067   }
2068 #if DEBUG_TRANSPORT
2069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2070               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2071               ntohl (quota.value__), GNUNET_i2s (&n->id));
2072 #endif
2073   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2074   if (0 != ntohl (quota.value__))
2075     return;
2076 #if DEBUG_TRANSPORT
2077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2078               GNUNET_i2s (&n->id), "SET_QUOTA");
2079 #endif
2080   if (is_connected (n))
2081     GNUNET_STATISTICS_update (GST_stats,
2082                               gettext_noop ("# disconnects due to quota of 0"),
2083                               1, GNUNET_NO);
2084   disconnect_neighbour (n);
2085 }
2086
2087
2088 /**
2089  * Closure for the neighbours_iterate function.
2090  */
2091 struct IteratorContext
2092 {
2093   /**
2094    * Function to call on each connected neighbour.
2095    */
2096   GST_NeighbourIterator cb;
2097
2098   /**
2099    * Closure for 'cb'.
2100    */
2101   void *cb_cls;
2102 };
2103
2104
2105 /**
2106  * Call the callback from the closure for each connected neighbour.
2107  *
2108  * @param cls the 'struct IteratorContext'
2109  * @param key the hash of the public key of the neighbour
2110  * @param value the 'struct NeighbourMapEntry'
2111  * @return GNUNET_OK (continue to iterate)
2112  */
2113 static int
2114 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2115 {
2116   struct IteratorContext *ic = cls;
2117   struct NeighbourMapEntry *n = value;
2118
2119   if (!is_connected (n))
2120     return GNUNET_OK;
2121
2122   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2123   return GNUNET_OK;
2124 }
2125
2126
2127 /**
2128  * Iterate over all connected neighbours.
2129  *
2130  * @param cb function to call
2131  * @param cb_cls closure for cb
2132  */
2133 void
2134 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2135 {
2136   struct IteratorContext ic;
2137
2138   // This can happen during shutdown
2139   if (neighbours == NULL)
2140   {
2141     return;
2142   }
2143
2144   ic.cb = cb;
2145   ic.cb_cls = cb_cls;
2146   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2147 }
2148
2149 /**
2150  * If we have an active connection to the given target, it must be shutdown.
2151  *
2152  * @param target peer to disconnect from
2153  */
2154 void
2155 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2156 {
2157   struct NeighbourMapEntry *n;
2158
2159   // This can happen during shutdown
2160   if (neighbours == NULL)
2161   {
2162     return;
2163   }
2164
2165   n = lookup_neighbour (target);
2166   if (NULL == n)
2167     return;                     /* not active */
2168   disconnect_neighbour (n);
2169 }
2170
2171
2172 /**
2173  * We received a disconnect message from the given peer,
2174  * validate and process.
2175  *
2176  * @param peer sender of the message
2177  * @param msg the disconnect message
2178  */
2179 void
2180 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2181                                           *peer,
2182                                           const struct GNUNET_MessageHeader
2183                                           *msg)
2184 {
2185   struct NeighbourMapEntry *n;
2186   const struct SessionDisconnectMessage *sdm;
2187   GNUNET_HashCode hc;
2188
2189 #if DEBUG_TRANSPORT
2190   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2191               "Received DISCONNECT message from peer `%s'\n",
2192               GNUNET_i2s (peer));
2193 #endif
2194
2195   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2196   {
2197     // GNUNET_break_op (0);
2198     GNUNET_STATISTICS_update (GST_stats,
2199                               gettext_noop
2200                               ("# disconnect messages ignored (old format)"), 1,
2201                               GNUNET_NO);
2202     return;
2203   }
2204   sdm = (const struct SessionDisconnectMessage *) msg;
2205   n = lookup_neighbour (peer);
2206   if (NULL == n)
2207     return;                     /* gone already */
2208   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2209       n->connect_ts.abs_value)
2210   {
2211     GNUNET_STATISTICS_update (GST_stats,
2212                               gettext_noop
2213                               ("# disconnect messages ignored (timestamp)"), 1,
2214                               GNUNET_NO);
2215     return;
2216   }
2217   GNUNET_CRYPTO_hash (&sdm->public_key,
2218                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2219                       &hc);
2220   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2221   {
2222     GNUNET_break_op (0);
2223     return;
2224   }
2225   if (ntohl (sdm->purpose.size) !=
2226       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2227       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2228       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2229   {
2230     GNUNET_break_op (0);
2231     return;
2232   }
2233   if (GNUNET_OK !=
2234       GNUNET_CRYPTO_rsa_verify
2235       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2236        &sdm->signature, &sdm->public_key))
2237   {
2238     GNUNET_break_op (0);
2239     return;
2240   }
2241   GST_neighbours_force_disconnect (peer);
2242 }
2243
2244
2245 /**
2246  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2247  * Consider switching to it.
2248  *
2249  * @param message possibly a 'struct SessionConnectMessage' (check format)
2250  * @param peer identity of the peer to switch the address for
2251  * @param address address of the other peer, NULL if other peer
2252  *                       connected to us
2253  * @param session session to use (or NULL)
2254  * @param ats performance data
2255  * @param ats_count number of entries in ats
2256  */
2257 void
2258 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2259                                    const struct GNUNET_PeerIdentity *peer,
2260                                    const struct GNUNET_HELLO_Address *address,
2261                                    struct Session *session,
2262                                    const struct GNUNET_ATS_Information *ats,
2263                                    uint32_t ats_count)
2264 {
2265   const struct SessionConnectMessage *scm;
2266   struct GNUNET_MessageHeader msg;
2267   struct NeighbourMapEntry *n;
2268   size_t msg_len;
2269   size_t ret;
2270
2271 #if DEBUG_TRANSPORT
2272   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2273               "Received CONNECT_ACK message from peer `%s'\n",
2274               GNUNET_i2s (peer));
2275 #endif
2276
2277   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2278   {
2279     GNUNET_break_op (0);
2280     return;
2281   }
2282   scm = (const struct SessionConnectMessage *) message;
2283   GNUNET_break_op (ntohl (scm->reserved) == 0);
2284   n = lookup_neighbour (peer);
2285   if (NULL == n)
2286   {
2287     /* we did not send 'CONNECT' -- at least not recently */
2288     GNUNET_STATISTICS_update (GST_stats,
2289                               gettext_noop ("# unexpected CONNECT_ACK messages (no peer)"), 1,
2290                               GNUNET_NO);
2291     return;
2292   }  
2293
2294   /* Additional check
2295    *
2296    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2297    *
2298    * We also received an CONNECT message, switched from SENDT to RECV and
2299    * ATS already suggested us an address after a successful blacklist check
2300    */
2301   if ((n->state != S_CONNECT_SENT) && ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2302   {
2303     GNUNET_STATISTICS_update (GST_stats,
2304                               gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
2305                               GNUNET_NO);
2306     return;
2307   }
2308
2309   change_state (n, S_CONNECTED);
2310
2311   if (NULL != session)
2312   {
2313     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2314                      "transport-ats",
2315                      "Giving ATS session %p of plugin %s for peer %s\n",
2316                      session, address->transport_name, GNUNET_i2s (peer));
2317   }
2318   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2319   GNUNET_assert (NULL != n->address);
2320   if (n->address_state == FRESH)
2321   {
2322     GST_validation_set_address_use (&n->id,
2323             n->address,
2324             n->session,
2325             GNUNET_YES);
2326     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2327     n->address_state = USED;
2328   }
2329
2330   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2331
2332   /* send ACK (ACK) */
2333   msg_len = sizeof (msg);
2334   msg.size = htons (msg_len);
2335   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2336
2337   ret =
2338       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2339                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2340                         n->address, GNUNET_YES, NULL,
2341                         NULL);
2342
2343   if (ret == GNUNET_SYSERR)
2344     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2345                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2346                 GNUNET_i2s (&n->id), 
2347                 GST_plugins_a2s (n->address), n->session);
2348
2349
2350   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2351     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2352   
2353   neighbours_connected++;
2354   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2355                             GNUNET_NO);
2356 #if DEBUG_TRANSPORT
2357   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2358               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2359               GNUNET_i2s (&n->id),
2360               GST_plugins_a2s (n->address), n->session,
2361               __LINE__);
2362 #endif
2363   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2364   send_outbound_quota(peer, n->bandwidth_out);
2365
2366 }
2367
2368
2369 void
2370 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2371                            const struct GNUNET_PeerIdentity *peer,
2372                            const struct GNUNET_HELLO_Address *address,
2373                            struct Session *session,
2374                            const struct GNUNET_ATS_Information *ats,
2375                            uint32_t ats_count)
2376 {
2377   struct NeighbourMapEntry *n;
2378
2379 #if DEBUG_TRANSPORT
2380   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2381               "Received ACK message from peer `%s'\n",
2382               GNUNET_i2s (peer));
2383 #endif
2384
2385   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2386   {
2387     GNUNET_break_op (0);
2388     return;
2389   }
2390   n = lookup_neighbour (peer);
2391   if (NULL == n)
2392   {
2393     send_disconnect (peer, address,
2394                      session);
2395     GNUNET_break (0);
2396     return;
2397   }
2398   if (S_CONNECTED == n->state)
2399     return;
2400   if (!is_connecting(n))
2401   {
2402     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2403                               GNUNET_NO);
2404     return;
2405   }
2406   change_state (n, S_CONNECTED);
2407   if (NULL != session)
2408     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2409                      "transport-ats",
2410                      "Giving ATS session %p of plugin %s for peer %s\n",
2411                      session, address->transport_name, GNUNET_i2s (peer));
2412   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2413   GNUNET_assert (n->address != NULL);
2414   if (n->address_state == FRESH)
2415   {
2416     GST_validation_set_address_use (&n->id,
2417                                   n->address,
2418                                   n->session,
2419                                   GNUNET_YES);
2420     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2421     n->address_state = USED;
2422   }
2423
2424   neighbours_connected++;
2425   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2426                             GNUNET_NO);
2427   
2428   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2429   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2430         n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2431 #if DEBUG_TRANSPORT
2432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2433               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2434               GNUNET_i2s (&n->id),
2435               GST_plugins_a2s (n->address), n->session,
2436               __LINE__);
2437 #endif
2438   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2439   send_outbound_quota(peer, n->bandwidth_out);
2440 }
2441
2442 struct BlackListCheckContext
2443 {
2444   struct GNUNET_ATS_Information *ats;
2445
2446   uint32_t ats_count;
2447
2448   struct Session *session;
2449
2450   struct GNUNET_HELLO_Address *address;
2451
2452   struct GNUNET_TIME_Absolute ts;
2453 };
2454
2455
2456 static void
2457 handle_connect_blacklist_cont (void *cls,
2458                                const struct GNUNET_PeerIdentity *peer,
2459                                int result)
2460 {
2461   struct NeighbourMapEntry *n;
2462   struct BlackListCheckContext *bcc = cls;
2463
2464 #if DEBUG_TRANSPORT
2465   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2466               "Blacklist check due to CONNECT message: `%s'\n",
2467               GNUNET_i2s (peer),
2468               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2469 #endif
2470
2471   /* not allowed */
2472   if (GNUNET_OK != result)
2473   {
2474     GNUNET_HELLO_address_free (bcc->address);
2475     GNUNET_free (bcc);
2476     return;
2477   }
2478
2479   n = lookup_neighbour (peer);
2480   if (NULL == n)
2481     n = setup_neighbour (peer);
2482
2483   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2484   {
2485     if (NULL != bcc->session)
2486       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2487                        "transport-ats",
2488                        "Giving ATS session %p of address `%s' for peer %s\n",
2489                        bcc->session, 
2490                        GST_plugins_a2s (bcc->address),
2491                        GNUNET_i2s (peer));
2492     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2493
2494     GNUNET_ATS_address_update (GST_ats,
2495                                bcc->address,
2496                                bcc->session,
2497                                bcc->ats,
2498                                bcc->ats_count);
2499     n->connect_ts = bcc->ts;
2500   }
2501
2502   GNUNET_HELLO_address_free (bcc->address);
2503   GNUNET_free (bcc);
2504
2505   if (n->state != S_CONNECT_RECV)
2506     change_state (n, S_CONNECT_RECV);
2507
2508
2509   /* Ask ATS for an address to connect via that address */
2510   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2511     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2512   n->ats_suggest =
2513       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2514                                     n);
2515   GNUNET_ATS_suggest_address (GST_ats, peer);
2516 }
2517
2518 /**
2519  * We received a 'SESSION_CONNECT' message from the other peer.
2520  * Consider switching to it.
2521  *
2522  * @param message possibly a 'struct SessionConnectMessage' (check format)
2523  * @param peer identity of the peer to switch the address for
2524  * @param address address of the other peer, NULL if other peer
2525  *                       connected to us
2526  * @param session session to use (or NULL)
2527  * @param ats performance data
2528  * @param ats_count number of entries in ats (excluding 0-termination)
2529  */
2530 void
2531 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2532                                const struct GNUNET_PeerIdentity *peer,
2533                                const struct GNUNET_HELLO_Address *address,
2534                                struct Session *session,
2535                                const struct GNUNET_ATS_Information *ats,
2536                                uint32_t ats_count)
2537 {
2538   const struct SessionConnectMessage *scm;
2539   struct BlackListCheckContext *bcc = NULL;
2540   struct NeighbourMapEntry *n;
2541
2542 #if DEBUG_TRANSPORT
2543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2544               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2545 #endif
2546
2547   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2548   {
2549     GNUNET_break_op (0);
2550     return;
2551   }
2552
2553   scm = (const struct SessionConnectMessage *) message;
2554   GNUNET_break_op (ntohl (scm->reserved) == 0);
2555
2556   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2557
2558   n = lookup_neighbour (peer);
2559   if ( (n != NULL) && (S_CONNECTED == n->state) )
2560   {
2561      /* connected peer switches addresses */
2562      return;
2563   }
2564
2565
2566   /* we are not connected to this peer */
2567   /* do blacklist check */
2568   bcc =
2569       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2570                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2571   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2572   bcc->ats_count = ats_count + 1;
2573   bcc->address = GNUNET_HELLO_address_copy (address);
2574   bcc->session = session;
2575   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2576   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2577   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2578   bcc->ats[ats_count].value = htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2579   GST_blacklist_test_allowed (peer, address->transport_name, handle_connect_blacklist_cont,
2580                               bcc);
2581 }
2582
2583
2584 /* end of file gnunet-service-transport_neighbours.c */