first steps to transport_api cleanup
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
61
62 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
63
64 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
65
66
67 /**
68  * Entry in neighbours.
69  */
70 struct NeighbourMapEntry;
71
72 /**
73  * Message a peer sends to another to indicate its
74  * preference for communicating via a particular
75  * session (and the desire to establish a real
76  * connection).
77  */
78 struct SessionConnectMessage
79 {
80   /**
81    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
82    */
83   struct GNUNET_MessageHeader header;
84
85   /**
86    * Always zero.
87    */
88   uint32_t reserved GNUNET_PACKED;
89
90   /**
91    * Absolute time at the sender.  Only the most recent connect
92    * message implies which session is preferred by the sender.
93    */
94   struct GNUNET_TIME_AbsoluteNBO timestamp;
95
96 };
97
98
99 struct SessionDisconnectMessage
100 {
101   /**
102    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
103    */
104   struct GNUNET_MessageHeader header;
105
106   /**
107    * Always zero.
108    */
109   uint32_t reserved GNUNET_PACKED;
110
111   /**
112    * Purpose of the signature.  Extends over the timestamp.
113    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
114    */
115   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
116
117   /**
118    * Absolute time at the sender.  Only the most recent connect
119    * message implies which session is preferred by the sender.
120    */
121   struct GNUNET_TIME_AbsoluteNBO timestamp;
122
123   /**
124    * Public key of the sender.
125    */
126   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
127
128   /**
129    * Signature of the peer that sends us the disconnect.  Only
130    * valid if the timestamp is AFTER the timestamp from the
131    * corresponding 'CONNECT' message.
132    */
133   struct GNUNET_CRYPTO_RsaSignature signature;
134
135 };
136
137
138 /**
139  * For each neighbour we keep a list of messages
140  * that we still want to transmit to the neighbour.
141  */
142 struct MessageQueue
143 {
144
145   /**
146    * This is a doubly linked list.
147    */
148   struct MessageQueue *next;
149
150   /**
151    * This is a doubly linked list.
152    */
153   struct MessageQueue *prev;
154
155   /**
156    * Once this message is actively being transmitted, which
157    * neighbour is it associated with?
158    */
159   struct NeighbourMapEntry *n;
160
161   /**
162    * Function to call once we're done.
163    */
164   GST_NeighbourSendContinuation cont;
165
166   /**
167    * Closure for 'cont'
168    */
169   void *cont_cls;
170
171   /**
172    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
173    * stuck together in memory.  Allocated at the end of this struct.
174    */
175   const char *message_buf;
176
177   /**
178    * Size of the message buf
179    */
180   size_t message_buf_size;
181
182   /**
183    * At what time should we fail?
184    */
185   struct GNUNET_TIME_Absolute timeout;
186
187 };
188
189
190 enum State
191 {
192   /**
193    * fresh peer or completely disconnected 
194    */
195   S_NOT_CONNECTED,
196
197   /**
198    * sent CONNECT message to other peer, waiting for CONNECT_ACK 
199    */
200   S_CONNECT_SENT,
201
202   /**
203    * received CONNECT message to other peer, sending CONNECT_ACK 
204    */
205   S_CONNECT_RECV,
206
207   /**
208    * received ACK or payload 
209    */
210   S_CONNECTED,
211
212   /**
213    * connection ended, fast reconnect
214    */
215   S_FAST_RECONNECT,
216
217   /**
218    * Disconnect in progress 
219    */
220   S_DISCONNECT
221 };
222
223 enum Address_State
224 {
225   USED,
226   UNUSED,
227   FRESH,
228 };
229
230
231 /**
232  * Entry in neighbours.
233  */
234 struct NeighbourMapEntry
235 {
236
237   /**
238    * Head of list of messages we would like to send to this peer;
239    * must contain at most one message per client.
240    */
241   struct MessageQueue *messages_head;
242
243   /**
244    * Tail of list of messages we would like to send to this peer; must
245    * contain at most one message per client.
246    */
247   struct MessageQueue *messages_tail;
248
249   /**
250    * Are we currently trying to send a message? If so, which one?
251    */
252   struct MessageQueue *is_active;
253
254   /**
255    * Active session for communicating with the peer.
256    */
257   struct Session *session;
258
259   /**
260    * Address we currently use.
261    */
262   struct GNUNET_HELLO_Address *address;
263
264   /**
265    * Identity of this neighbour.
266    */
267   struct GNUNET_PeerIdentity id;
268
269   /**
270    * ID of task scheduled to run when this peer is about to
271    * time out (will free resources associated with the peer).
272    */
273   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
274
275   /**
276    * ID of task scheduled to send keepalives.
277    */
278   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
279
280   /**
281    * ID of task scheduled to run when we should try transmitting
282    * the head of the message queue.
283    */
284   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
285
286   /**
287    * Tracker for inbound bandwidth.
288    */
289   struct GNUNET_BANDWIDTH_Tracker in_tracker;
290
291   /**
292    * Inbound bandwidth from ATS, activated when connection is up
293    */
294   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
295
296   /**
297    * Inbound bandwidth from ATS, activated when connection is up
298    */
299   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
300
301   /**
302    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
303    */
304   struct GNUNET_TIME_Absolute connect_ts;
305
306   /**
307    * When did we sent the last keep-alive message?
308    */
309   struct GNUNET_TIME_Absolute keep_alive_sent;
310
311   /**
312    * Latest calculated latency value
313    */
314   struct GNUNET_TIME_Relative latency;
315
316   /**
317    * Timeout for ATS
318    * We asked ATS for a new address for this peer
319    */
320   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
321
322   /**
323    * Task the resets the peer state after due to an pending
324    * unsuccessful connection setup
325    */
326   GNUNET_SCHEDULER_TaskIdentifier state_reset;
327
328
329   /**
330    * How often has the other peer (recently) violated the inbound
331    * traffic limit?  Incremented by 10 per violation, decremented by 1
332    * per non-violation (for each time interval).
333    */
334   unsigned int quota_violation_count;
335
336
337   /**
338    * The current state of the peer
339    * Element of enum State
340    */
341   int state;
342
343   /**
344    * Did we sent an KEEP_ALIVE message and are we expecting a response?
345    */
346   int expect_latency_response;
347   int address_state;
348 };
349
350
351 /**
352  * All known neighbours and their HELLOs.
353  */
354 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
355
356 /**
357  * Closure for connect_notify_cb and disconnect_notify_cb
358  */
359 static void *callback_cls;
360
361 /**
362  * Function to call when we connected to a neighbour.
363  */
364 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
365
366 /**
367  * Function to call when we disconnected from a neighbour.
368  */
369 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
370
371 /**
372  * counter for connected neighbours
373  */
374 static int neighbours_connected;
375
376 /**
377  * Lookup a neighbour entry in the neighbours hash map.
378  *
379  * @param pid identity of the peer to look up
380  * @return the entry, NULL if there is no existing record
381  */
382 static struct NeighbourMapEntry *
383 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
384 {
385   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
386 }
387
388 #define change_state(n, state, ...) change (n, state, __LINE__)
389
390 static int
391 is_connecting (struct NeighbourMapEntry *n)
392 {
393   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
394     return GNUNET_YES;
395   return GNUNET_NO;
396 }
397
398 static int
399 is_connected (struct NeighbourMapEntry *n)
400 {
401   if (n->state == S_CONNECTED)
402     return GNUNET_YES;
403   return GNUNET_NO;
404 }
405
406 static int
407 is_disconnecting (struct NeighbourMapEntry *n)
408 {
409   if (n->state == S_DISCONNECT)
410     return GNUNET_YES;
411   return GNUNET_NO;
412 }
413
414 static const char *
415 print_state (int state)
416 {
417   switch (state)
418   {
419   case S_CONNECTED:
420     return "S_CONNECTED";
421     break;
422   case S_CONNECT_RECV:
423     return "S_CONNECT_RECV";
424     break;
425   case S_CONNECT_SENT:
426     return "S_CONNECT_SENT";
427     break;
428   case S_DISCONNECT:
429     return "S_DISCONNECT";
430     break;
431   case S_NOT_CONNECTED:
432     return "S_NOT_CONNECTED";
433     break;
434   case S_FAST_RECONNECT:
435     return "S_FAST_RECONNECT";
436     break;
437   default:
438     GNUNET_break (0);
439     break;
440   }
441   return NULL;
442 }
443
444 static int
445 change (struct NeighbourMapEntry *n, int state, int line);
446
447 static void
448 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
449
450
451 static void
452 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
453 {
454   struct NeighbourMapEntry *n = cls;
455   if (n == NULL)
456     return;
457
458   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
459   if (n->state == S_CONNECTED)
460     return;
461
462 #if DEBUG_TRANSPORT
463   GNUNET_STATISTICS_update (GST_stats,
464                             gettext_noop
465                             ("# failed connection attempts due to timeout"), 1,
466                             GNUNET_NO);
467 #endif
468
469   /* resetting state */
470   n->state = S_NOT_CONNECTED;
471
472   /* destroying address */
473   if (n->address != NULL)
474   {
475     GNUNET_assert (strlen(n->address->transport_name) > 0);
476     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
477   }
478
479   /* request new address */
480   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
481     GNUNET_SCHEDULER_cancel (n->ats_suggest);
482   n->ats_suggest =
483       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
484                                     n);
485   GNUNET_ATS_suggest_address (GST_ats, &n->id);
486 }
487
488 static int
489 change (struct NeighbourMapEntry *n, int state, int line)
490 {
491   /* allowed transitions */
492   int allowed = GNUNET_NO;
493
494   switch (n->state)
495   {
496   case S_NOT_CONNECTED:
497     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
498         (state == S_DISCONNECT))
499       allowed = GNUNET_YES;
500     break;
501   case S_CONNECT_RECV:
502     allowed = GNUNET_YES;
503     break;
504   case S_CONNECT_SENT:
505     allowed = GNUNET_YES;
506     break;
507   case S_CONNECTED:
508     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
509       allowed = GNUNET_YES;
510     break;
511   case S_DISCONNECT:
512     break;
513   case S_FAST_RECONNECT:
514     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
515       allowed = GNUNET_YES;
516     break;
517   default:
518     GNUNET_break (0);
519     break;
520   }
521   if (allowed == GNUNET_NO)
522   {
523     char *old = GNUNET_strdup (print_state (n->state));
524     char *new = GNUNET_strdup (print_state (state));
525     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
527                 new, line);
528     GNUNET_break (0);
529     GNUNET_free (old);
530     GNUNET_free (new);
531     return GNUNET_SYSERR;
532   }
533 #if DEBUG_TRANSPORT
534   {
535     char *old = GNUNET_strdup (print_state (n->state));
536     char *new = GNUNET_strdup (print_state (state));
537     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
538                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
539                 GNUNET_i2s (&n->id), n, old, new, line);
540     GNUNET_free (old);
541     GNUNET_free (new);
542   }
543 #endif
544   n->state = state;
545
546   switch (n->state)
547   {
548   case S_FAST_RECONNECT:
549   case S_CONNECT_RECV:
550   case S_CONNECT_SENT:
551     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
552       GNUNET_SCHEDULER_cancel (n->state_reset);
553     n->state_reset =
554       GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
555                                     n);   
556     break;
557   case S_CONNECTED:
558   case S_NOT_CONNECTED:
559   case S_DISCONNECT:
560     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
561     {
562 #if DEBUG_TRANSPORT
563       char *old = GNUNET_strdup (print_state (n->state));
564       char *new = GNUNET_strdup (print_state (state));
565       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
566                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
567                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
568                   old, new);
569       GNUNET_free (old);
570       GNUNET_free (new);
571 #endif
572       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
573       GNUNET_SCHEDULER_cancel (n->state_reset);
574       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
575     }
576     break;
577
578   default:
579     GNUNET_assert (0);
580   }
581   
582
583
584   return GNUNET_OK;
585 }
586
587 static ssize_t
588 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
589                   size_t msgbuf_size, uint32_t priority,
590                   struct GNUNET_TIME_Relative timeout, struct Session *session,
591                   const struct GNUNET_HELLO_Address *address,
592                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
593                   void *cont_cls)
594 {
595   struct GNUNET_TRANSPORT_PluginFunctions *papi;
596   size_t ret = GNUNET_SYSERR;
597
598   /* FIXME : ats returns an address with all values 0 */
599   if (address == NULL)
600   {
601     if (cont != NULL)
602       cont (cont_cls, target, GNUNET_SYSERR);
603     return GNUNET_SYSERR;
604   }
605
606   if ((session == NULL) && (address->address_length == 0))
607   {
608     if (cont != NULL)
609       cont (cont_cls, target, GNUNET_SYSERR);
610     return GNUNET_SYSERR;
611   }
612
613   papi = GST_plugins_find (address->transport_name);
614   if (papi == NULL)
615   {
616     if (cont != NULL)
617       cont (cont_cls, target, GNUNET_SYSERR);
618     return GNUNET_SYSERR;
619   }
620
621   ret =
622       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
623                   address->address, 
624                   address->address_length, GNUNET_YES, cont, cont_cls);
625
626   if (ret == -1)
627   {
628     if (cont != NULL)
629       cont (cont_cls, target, GNUNET_SYSERR);
630   }
631   return ret;
632 }
633
634 /**
635  * Task invoked to start a transmission to another peer.
636  *
637  * @param cls the 'struct NeighbourMapEntry'
638  * @param tc scheduler context
639  */
640 static void
641 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
642
643
644 /**
645  * We're done with our transmission attempt, continue processing.
646  *
647  * @param cls the 'struct MessageQueue' of the message
648  * @param receiver intended receiver
649  * @param success whether it worked or not
650  */
651 static void
652 transmit_send_continuation (void *cls,
653                             const struct GNUNET_PeerIdentity *receiver,
654                             int success)
655 {
656   struct MessageQueue *mq;
657   struct NeighbourMapEntry *n;
658   struct NeighbourMapEntry *tmp;
659
660   tmp = lookup_neighbour(receiver);
661
662   mq = cls;
663   n = mq->n;
664   if ((NULL != n) && (tmp != NULL) && (tmp == n))
665   {
666     GNUNET_assert (n->is_active == mq);
667     n->is_active = NULL;
668     if (success == GNUNET_YES)
669     {
670       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
671       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
672     }
673   }
674 #if DEBUG_TRANSPORT
675   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
676               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
677               (success == GNUNET_OK) ? "successful" : "FAILED");
678 #endif
679   if (NULL != mq->cont)
680     mq->cont (mq->cont_cls, success);
681   GNUNET_free (mq);
682 }
683
684
685 /**
686  * Check the ready list for the given neighbour and if a plugin is
687  * ready for transmission (and if we have a message), do so!
688  *
689  * @param n target peer for which to transmit
690  */
691 static void
692 try_transmission_to_peer (struct NeighbourMapEntry *n)
693 {
694   struct MessageQueue *mq;
695   struct GNUNET_TIME_Relative timeout;
696   ssize_t ret;
697
698   if (n->is_active != NULL)
699   {
700     GNUNET_break (0);
701     return;                     /* transmission already pending */
702   }
703   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
704   {
705     GNUNET_break (0);
706     return;                     /* currently waiting for bandwidth */
707   }
708   while (NULL != (mq = n->messages_head))
709   {
710     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
711     if (timeout.rel_value > 0)
712       break;
713     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
714     n->is_active = mq;
715     mq->n = n;
716     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
717   }
718   if (NULL == mq)
719     return;                     /* no more messages */
720
721   if (n->address == NULL)
722   {
723     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724                 "No address for peer `%s'\n",
725                 GNUNET_i2s (&n->id));
726     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
727     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
728     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
729     return;
730   }
731
732   if (GST_plugins_find (n->address->transport_name) == NULL)
733   {
734     GNUNET_break (0);
735     return;
736   }
737   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
738   n->is_active = mq;
739   mq->n = n;
740
741   if ((n->address->address_length == 0) && (n->session == NULL))
742   {
743     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
744                 "No address for peer `%s'\n",
745                 GNUNET_i2s (&n->id));
746     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
747     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
748     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
749     return;
750   }
751
752   ret =
753       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
754                         timeout, n->session, n->address, 
755                         GNUNET_YES, &transmit_send_continuation,
756                         mq);
757   if (ret == -1)
758   {
759     /* failure, but 'send' would not call continuation in this case,
760      * so we need to do it here! */
761     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
762   }
763
764 }
765
766
767 /**
768  * Task invoked to start a transmission to another peer.
769  *
770  * @param cls the 'struct NeighbourMapEntry'
771  * @param tc scheduler context
772  */
773 static void
774 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
775 {
776   struct NeighbourMapEntry *n = cls;
777
778   GNUNET_assert (NULL != lookup_neighbour (&n->id));
779   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
780   try_transmission_to_peer (n);
781 }
782
783
784 /**
785  * Initialize the neighbours subsystem.
786  *
787  * @param cls closure for callbacks
788  * @param connect_cb function to call if we connect to a peer
789  * @param disconnect_cb function to call if we disconnect from a peer
790  */
791 void
792 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
793                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
794 {
795   callback_cls = cls;
796   connect_notify_cb = connect_cb;
797   disconnect_notify_cb = disconnect_cb;
798   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
799 }
800
801
802 static void
803 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
804                       int result)
805 {
806 #if DEBUG_TRANSPORT
807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
808               "Sending DISCONNECT message to peer `%4s': %i\n",
809               GNUNET_i2s (target), result);
810 #endif
811 }
812
813
814 static int
815 send_disconnect (const struct GNUNET_PeerIdentity *target,
816                  const struct GNUNET_HELLO_Address *address,
817                  struct Session *session)
818 {
819   size_t ret;
820   struct SessionDisconnectMessage disconnect_msg;
821
822 #if DEBUG_TRANSPORT
823   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
824               "Sending DISCONNECT message to peer `%4s'\n",
825               GNUNET_i2s (target));
826 #endif
827
828   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
829   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
830   disconnect_msg.reserved = htonl (0);
831   disconnect_msg.purpose.size =
832       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
833              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
834              sizeof (struct GNUNET_TIME_AbsoluteNBO));
835   disconnect_msg.purpose.purpose =
836       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
837   disconnect_msg.timestamp =
838       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
839   disconnect_msg.public_key = GST_my_public_key;
840   GNUNET_assert (GNUNET_OK ==
841                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
842                                          &disconnect_msg.purpose,
843                                          &disconnect_msg.signature));
844
845   ret =
846       send_with_plugin (target, (const char *) &disconnect_msg,
847                         sizeof (disconnect_msg), UINT32_MAX,
848                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
849                         GNUNET_YES,
850                         &send_disconnect_cont, NULL);
851
852   if (ret == GNUNET_SYSERR)
853     return GNUNET_SYSERR;
854
855   GNUNET_STATISTICS_update (GST_stats,
856                             gettext_noop
857                             ("# peers disconnected due to external request"), 1,
858                             GNUNET_NO);
859   return GNUNET_OK;
860 }
861
862
863 /**
864  * Disconnect from the given neighbour, clean up the record.
865  *
866  * @param n neighbour to disconnect from
867  */
868 static void
869 disconnect_neighbour (struct NeighbourMapEntry *n)
870 {
871   struct MessageQueue *mq;
872   int previous_state;
873
874   previous_state = n->state;
875
876   if (is_disconnecting (n))
877     return;
878
879
880   /* send DISCONNECT MESSAGE */
881   if ((previous_state == S_CONNECTED) || is_connecting (n))
882   {
883     if (GNUNET_OK ==
884         send_disconnect (&n->id, n->address,
885                          n->session))
886       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
887                   GNUNET_i2s (&n->id));
888     else
889       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890                   "Could not send DISCONNECT_MSG to `%s'\n",
891                   GNUNET_i2s (&n->id));
892   }
893
894   change_state (n, S_DISCONNECT);
895
896   if (previous_state == S_CONNECTED)
897   {
898     GNUNET_assert (NULL != n->address);
899     if (n->address_state == USED)
900     {
901       GST_validation_set_address_use (&n->id,
902                                       n->address,
903                                       n->session,
904                                       GNUNET_NO);
905
906       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
907       n->address_state = UNUSED;
908     }
909   }
910
911   if (n->address != NULL)
912   {
913     struct GNUNET_TRANSPORT_PluginFunctions *papi;
914     papi = GST_plugins_find (n->address->transport_name);
915     if (papi != NULL)
916       papi->disconnect (papi->cls, &n->id);
917   }
918   while (NULL != (mq = n->messages_head))
919   {
920     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
921     if (NULL != mq->cont)
922       mq->cont (mq->cont_cls, GNUNET_SYSERR);
923     GNUNET_free (mq);
924   }
925   if (NULL != n->is_active)
926   {
927     n->is_active->n = NULL;
928     n->is_active = NULL;
929   }
930
931   switch (previous_state) {
932     case S_CONNECTED:
933 //      GNUNET_assert (neighbours_connected > 0);
934       neighbours_connected--;
935       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
936       GNUNET_SCHEDULER_cancel (n->keepalive_task);
937       n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
938       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
939                                 GNUNET_NO);
940       disconnect_notify_cb (callback_cls, &n->id);
941       break;
942     case S_FAST_RECONNECT:
943       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
944                                 GNUNET_NO);
945       GNUNET_STATISTICS_update (GST_stats,
946                                 gettext_noop
947                                 ("# fast reconnects failed"),
948                                 1, GNUNET_NO);
949       disconnect_notify_cb (callback_cls, &n->id);
950     default:
951       break;
952   }
953
954   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
955
956   GNUNET_assert (GNUNET_YES ==
957                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
958                                                        &n->id.hashPubKey, n));
959   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
960   {
961     GNUNET_SCHEDULER_cancel (n->ats_suggest);
962     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
963   }
964   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
965   {
966     GNUNET_SCHEDULER_cancel (n->timeout_task);
967     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
968   }
969   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
970   {
971     GNUNET_SCHEDULER_cancel (n->transmission_task);
972     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
973   }
974   if (NULL != n->address)
975   {
976     GNUNET_HELLO_address_free (n->address);
977     n->address = NULL;
978   }
979   n->session = NULL;
980   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
981               GNUNET_i2s (&n->id), n);
982   GNUNET_free (n);
983 }
984
985
986 /**
987  * Peer has been idle for too long. Disconnect.
988  *
989  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
990  * @param tc scheduler context
991  */
992 static void
993 neighbour_timeout_task (void *cls,
994                         const struct GNUNET_SCHEDULER_TaskContext *tc)
995 {
996   struct NeighbourMapEntry *n = cls;
997
998   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
999
1000   GNUNET_STATISTICS_update (GST_stats,
1001                             gettext_noop
1002                             ("# peers disconnected due to timeout"), 1,
1003                             GNUNET_NO);
1004   disconnect_neighbour (n);
1005 }
1006
1007
1008 /**
1009  * Send another keepalive message.
1010  *
1011  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1012  * @param tc scheduler context
1013  */
1014 static void
1015 neighbour_keepalive_task (void *cls,
1016                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1017 {
1018   struct NeighbourMapEntry *n = cls;
1019   struct GNUNET_MessageHeader m;
1020   int ret;
1021
1022   n->keepalive_task =
1023       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1024                                     &neighbour_keepalive_task, n);
1025
1026   GNUNET_assert (S_CONNECTED == n->state);
1027   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1028                             GNUNET_NO);
1029   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1030   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1031
1032
1033   ret = send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1034                     UINT32_MAX /* priority */ ,
1035                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address, 
1036                     GNUNET_YES, NULL, NULL);
1037
1038   n->expect_latency_response = GNUNET_NO;
1039   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero();
1040   if (ret != GNUNET_SYSERR)
1041   {
1042     n->expect_latency_response = GNUNET_YES;
1043     n->keep_alive_sent = GNUNET_TIME_absolute_get();
1044   }
1045
1046 }
1047
1048
1049 /**
1050  * Disconnect from the given neighbour.
1051  *
1052  * @param cls unused
1053  * @param key hash of neighbour's public key (not used)
1054  * @param value the 'struct NeighbourMapEntry' of the neighbour
1055  */
1056 static int
1057 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1058 {
1059   struct NeighbourMapEntry *n = value;
1060
1061 #if DEBUG_TRANSPORT
1062   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1063               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1064 #endif
1065   if (S_CONNECTED == n->state)
1066     GNUNET_STATISTICS_update (GST_stats,
1067                               gettext_noop
1068                               ("# peers disconnected due to global disconnect"),
1069                               1, GNUNET_NO);
1070   disconnect_neighbour (n);
1071   return GNUNET_OK;
1072 }
1073
1074
1075 static void
1076 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1077 {
1078   struct NeighbourMapEntry *n = cls;
1079
1080   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1081
1082   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083               "ATS did not suggested address to connect to peer `%s'\n",
1084               GNUNET_i2s (&n->id));
1085
1086   disconnect_neighbour (n);
1087 }
1088
1089 /**
1090  * Cleanup the neighbours subsystem.
1091  */
1092 void
1093 GST_neighbours_stop ()
1094 {
1095   // This can happen during shutdown
1096   if (neighbours == NULL)
1097   {
1098     return;
1099   }
1100
1101   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1102                                          NULL);
1103   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1104 //  GNUNET_assert (neighbours_connected == 0);
1105   neighbours = NULL;
1106   callback_cls = NULL;
1107   connect_notify_cb = NULL;
1108   disconnect_notify_cb = NULL;
1109 }
1110
1111 struct ContinutionContext
1112 {
1113   struct GNUNET_HELLO_Address *address;
1114
1115   struct Session *session;
1116 };
1117
1118 static void send_outbound_quota (const struct GNUNET_PeerIdentity *target, struct GNUNET_BANDWIDTH_Value32NBO quota)
1119 {
1120    struct QuotaSetMessage q_msg;
1121 #if DEBUG_TRANSPORT
1122     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1123                 "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1124                 ntohl (quota.value__), GNUNET_i2s (target));
1125 #endif
1126     q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1127     q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1128     q_msg.quota = quota;
1129     q_msg.peer = (*target);
1130     GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1131 }
1132
1133 /**
1134  * We tried to send a SESSION_CONNECT message to another peer.  If this
1135  * succeeded, we change the state.  If it failed, we should tell
1136  * ATS to not use this address anymore (until it is re-validated).
1137  *
1138  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1139  * @param target peer to send the message to
1140  * @param success GNUNET_OK on success
1141  */
1142 static void
1143 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1144                            int success)
1145 {
1146   struct ContinutionContext * cc = cls;
1147   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1148   
1149   if (GNUNET_YES != success)
1150   {
1151     GNUNET_assert (strlen(cc->address->transport_name) > 0);
1152     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1153   }
1154   if ( (NULL == neighbours) ||
1155        (NULL == n) ||
1156        (n->state == S_DISCONNECT))
1157   {
1158     GNUNET_HELLO_address_free (cc->address);
1159     GNUNET_free (cc);
1160     return;
1161   }
1162
1163   if ((GNUNET_YES == success) &&
1164      ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1165   {
1166     change_state (n, S_CONNECT_SENT);
1167     GNUNET_HELLO_address_free (cc->address);
1168     GNUNET_free (cc);
1169     return;
1170   }
1171
1172   if ((GNUNET_NO == success) &&
1173      ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1174   {
1175 #if DEBUG_TRANSPORT
1176     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1178                 GNUNET_i2s (&n->id),
1179                 GST_plugins_a2s (n->address),
1180                 n->session);
1181 #endif
1182     change_state (n, S_NOT_CONNECTED);
1183     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1184       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1185     n->ats_suggest =
1186       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1187                                     n);
1188     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1189   }
1190   GNUNET_HELLO_address_free (cc->address);
1191   GNUNET_free (cc);
1192 }
1193
1194
1195 /**
1196  * We tried to switch addresses with an peer already connected. If it failed,
1197  * we should tell ATS to not use this address anymore (until it is re-validated).
1198  *
1199  * @param cls the 'struct NeighbourMapEntry'
1200  * @param target peer to send the message to
1201  * @param success GNUNET_OK on success
1202  */
1203 static void
1204 send_switch_address_continuation (void *cls,
1205                                   const struct GNUNET_PeerIdentity *target,
1206                                   int success)
1207 {
1208   struct ContinutionContext * cc = cls;
1209   struct NeighbourMapEntry *n;
1210
1211   if (neighbours == NULL)
1212   {
1213     GNUNET_HELLO_address_free (cc->address);
1214     GNUNET_free (cc);
1215     return;                     /* neighbour is going away */
1216   }
1217
1218   n = lookup_neighbour(&cc->address->peer);
1219   if ((n == NULL) || (is_disconnecting (n)))
1220   {
1221     GNUNET_HELLO_address_free (cc->address);
1222     GNUNET_free (cc);
1223     return;                     /* neighbour is going away */
1224   }
1225
1226   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1227   if (GNUNET_YES != success)
1228   {
1229 #if DEBUG_TRANSPORT
1230     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1231                 "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n",
1232                 GNUNET_i2s (&n->id), 
1233                 GST_plugins_a2s (n->address), n->session);
1234 #endif
1235     GNUNET_assert (strlen(cc->address->transport_name) > 0);
1236     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1237
1238     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1239       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1240     n->ats_suggest =
1241         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1242                                       n);
1243     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1244     GNUNET_HELLO_address_free (cc->address);
1245     GNUNET_free (cc);
1246     return;
1247   }
1248   /* Tell ATS that switching addresses was successful */
1249   switch (n->state) {
1250     case S_CONNECTED:
1251       if (n->address_state == FRESH)
1252       {
1253           GST_validation_set_address_use (&n->id,
1254                     cc->address,
1255                     cc->session,
1256                     GNUNET_YES);
1257           GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1258           GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1259           n->address_state = USED;
1260       }
1261       break;
1262     case S_FAST_RECONNECT:
1263 #if DEBUG_TRANSPORT
1264       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1265                   "Successful fast reconnect to peer `%s'\n", GNUNET_i2s (&n->id));
1266 #endif
1267       change_state (n, S_CONNECTED);
1268       neighbours_connected++;
1269       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1270                                 GNUNET_NO);
1271
1272       if (n->address_state == FRESH)
1273       {
1274           GST_validation_set_address_use (&n->id,
1275                     cc->address,
1276                     cc->session,
1277                     GNUNET_YES);
1278           GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1279           GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1280           n->address_state = USED;
1281       }
1282
1283       if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1284             n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1285
1286       /* Updating quotas */
1287       GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1288       send_outbound_quota(target, n->bandwidth_out);
1289
1290     default:
1291       break;
1292   }
1293   GNUNET_HELLO_address_free (cc->address);
1294   GNUNET_free (cc);
1295 }
1296
1297
1298 /**
1299  * We tried to send a SESSION_CONNECT message to another peer.  If this
1300  * succeeded, we change the state.  If it failed, we should tell
1301  * ATS to not use this address anymore (until it is re-validated).
1302  *
1303  * @param cls the 'struct NeighbourMapEntry'
1304  * @param target peer to send the message to
1305  * @param success GNUNET_OK on success
1306  */
1307 static void
1308 send_connect_ack_continuation (void *cls,
1309                                const struct GNUNET_PeerIdentity *target,
1310                                int success)
1311 {
1312   struct ContinutionContext * cc = cls;
1313   struct NeighbourMapEntry *n;
1314
1315   if (neighbours == NULL)
1316   {
1317     GNUNET_HELLO_address_free (cc->address);
1318     GNUNET_free (cc);
1319     return;                     /* neighbour is going away */
1320   }
1321
1322   n = lookup_neighbour(&cc->address->peer);
1323   if ((n == NULL) || (is_disconnecting (n)))
1324   {
1325     GNUNET_HELLO_address_free (cc->address);
1326     GNUNET_free (cc);
1327     return;                     /* neighbour is going away */
1328   }
1329
1330   if (GNUNET_YES == success)
1331   {
1332     GNUNET_HELLO_address_free (cc->address);
1333     GNUNET_free (cc);
1334     return;                     /* sending successful */
1335   }
1336
1337   /* sending failed, ask for next address  */
1338 #if DEBUG_TRANSPORT
1339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1341               GNUNET_i2s (&n->id), 
1342               GST_plugins_a2s (n->address),
1343               n->session);
1344 #endif
1345   change_state (n, S_NOT_CONNECTED);
1346   GNUNET_assert (strlen(cc->address->transport_name) > 0);
1347   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1348
1349   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1350     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1351   n->ats_suggest =
1352       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1353                                     n);
1354   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1355   GNUNET_HELLO_address_free (cc->address);
1356   GNUNET_free (cc);
1357 }
1358
1359
1360 /**
1361  * For an existing neighbour record, set the active connection to
1362  * the given address.
1363  *
1364  * @param peer identity of the peer to switch the address for
1365  * @param address address of the other peer, NULL if other peer
1366  *                       connected to us
1367  * @param session session to use (or NULL)
1368  * @param ats performance data
1369  * @param ats_count number of entries in ats
1370  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1371  *         connection is not up (yet)
1372  */
1373 int
1374 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1375                                        const struct GNUNET_HELLO_Address *address,
1376                                        struct Session *session,
1377                                        const struct GNUNET_ATS_Information *ats,
1378                                        uint32_t ats_count,
1379                                        struct GNUNET_BANDWIDTH_Value32NBO
1380                                        bandwidth_in,
1381                                        struct GNUNET_BANDWIDTH_Value32NBO
1382                                        bandwidth_out)
1383 {
1384   struct NeighbourMapEntry *n;
1385   struct SessionConnectMessage connect_msg;
1386   struct ContinutionContext * cc;
1387   size_t msg_len;
1388   size_t ret;
1389
1390   if (neighbours == NULL)
1391   {
1392     /* This can happen during shutdown */
1393     return GNUNET_NO;
1394   }
1395   n = lookup_neighbour (peer);
1396   if (NULL == n)
1397     return GNUNET_NO;
1398   if (n->state == S_DISCONNECT)
1399   {
1400     /* We are disconnecting, nothing to do here */
1401     return GNUNET_NO;
1402   }
1403   GNUNET_assert (address->transport_name != NULL);
1404   if ( (session == NULL) && (0 == address->address_length) )
1405   {
1406     GNUNET_break_op (0);
1407     /* FIXME: is this actually possible? When does this happen? */
1408     if (strlen(address->transport_name) > 0)
1409       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1410     GNUNET_ATS_suggest_address (GST_ats, peer);
1411     return GNUNET_NO;
1412   }
1413
1414   /* checks successful and neighbour != NULL */
1415 #if DEBUG_TRANSPORT
1416   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1417               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1418               GST_plugins_a2s (address),
1419               session,
1420               GNUNET_i2s (peer),
1421               print_state(n->state));
1422 #endif
1423   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1424   {
1425     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1426     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1427   }
1428   /* do not switch addresses just update quotas */
1429   if ( (n->state == S_CONNECTED) && 
1430        (NULL != n->address) &&
1431        (0 == GNUNET_HELLO_address_cmp (address,
1432                                        n->address)) &&
1433        (n->session == session) )
1434   {
1435     n->bandwidth_in = bandwidth_in;
1436     n->bandwidth_out = bandwidth_out;
1437     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1438     send_outbound_quota(peer, n->bandwidth_out);
1439     return GNUNET_NO;    
1440   }
1441   if (n->state == S_CONNECTED)
1442   {
1443     /* mark old address as no longer used */
1444     GNUNET_assert (NULL != n->address);
1445     if (n->address_state == USED)
1446     {
1447       GST_validation_set_address_use (&n->id,
1448                                       n->address,
1449                                       n->session,
1450                                       GNUNET_NO);
1451       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1452       n->address_state = UNUSED;
1453     }
1454
1455   }
1456
1457   /* set new address */
1458   if (NULL != n->address)
1459     GNUNET_HELLO_address_free (n->address);
1460   n->address = GNUNET_HELLO_address_copy (address);
1461   n->address_state = FRESH;
1462   n->session = session;
1463   n->bandwidth_in = bandwidth_in;
1464   n->bandwidth_out = bandwidth_out;
1465   GNUNET_SCHEDULER_cancel (n->timeout_task);
1466   n->timeout_task =
1467       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1468                                     &neighbour_timeout_task, n);
1469   switch (n->state)
1470   {
1471   case S_NOT_CONNECTED:  
1472   case S_CONNECT_SENT:
1473     msg_len = sizeof (struct SessionConnectMessage);
1474     connect_msg.header.size = htons (msg_len);
1475     connect_msg.header.type =
1476         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1477     connect_msg.reserved = htonl (0);
1478     connect_msg.timestamp =
1479         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1480
1481     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1482     cc->session = session;
1483     cc->address = GNUNET_HELLO_address_copy (address);
1484     ret =
1485         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1486                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1487                           address, GNUNET_YES,
1488                           &send_connect_continuation, 
1489                           cc);
1490     return GNUNET_NO;
1491   case S_CONNECT_RECV:
1492     /* We received a CONNECT message and asked ATS for an address */
1493     msg_len = sizeof (struct SessionConnectMessage);
1494     connect_msg.header.size = htons (msg_len);
1495     connect_msg.header.type =
1496         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1497     connect_msg.reserved = htonl (0);
1498     connect_msg.timestamp =
1499         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1500     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1501     cc->session = session;
1502     cc->address = GNUNET_HELLO_address_copy (address);
1503     ret =
1504         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1505                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1506                           address, GNUNET_YES,
1507                           &send_connect_ack_continuation, cc);
1508     return GNUNET_NO;
1509   case S_CONNECTED:
1510   case S_FAST_RECONNECT:
1511     /* connected peer is switching addresses or tries fast reconnect*/
1512     msg_len = sizeof (struct SessionConnectMessage);
1513     connect_msg.header.size = htons (msg_len);
1514     connect_msg.header.type =
1515         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1516     connect_msg.reserved = htonl (0);
1517     connect_msg.timestamp =
1518         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1519     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1520     cc->session = session;
1521     cc->address = GNUNET_HELLO_address_copy (address);
1522     ret =
1523         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1524                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1525                           address, GNUNET_YES,
1526                           &send_switch_address_continuation, cc);
1527     if (ret == GNUNET_SYSERR)
1528     {
1529       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1530                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1531                   GNUNET_i2s (peer), 
1532                   GST_plugins_a2s (address), session);
1533     }
1534     return GNUNET_NO;
1535   default:
1536     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1537                 "Invalid connection state to switch addresses %u \n", n->state);
1538     GNUNET_break_op (0);
1539     return GNUNET_NO;
1540   }
1541 }
1542
1543
1544 /**
1545  * Obtain current latency information for the given neighbour.
1546  *
1547  * @param peer 
1548  * @return observed latency of the address, FOREVER if the address was
1549  *         never successfully validated
1550  */
1551 struct GNUNET_TIME_Relative
1552 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1553 {
1554   struct NeighbourMapEntry *n;
1555
1556   n = lookup_neighbour (peer);
1557   if ( (NULL == n) ||
1558        ( (n->address == NULL) && (n->session == NULL) ) )
1559     return GNUNET_TIME_UNIT_FOREVER_REL;
1560
1561   return n->latency;
1562 }
1563
1564
1565 /**
1566  * Create an entry in the neighbour map for the given peer
1567  *
1568  * @param peer peer to create an entry for
1569  * @return new neighbour map entry
1570  */
1571 static struct NeighbourMapEntry *
1572 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1573 {
1574   struct NeighbourMapEntry *n;
1575
1576 #if DEBUG_TRANSPORT
1577   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1578               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1579 #endif
1580   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1581   n->id = *peer;
1582   n->state = S_NOT_CONNECTED;
1583   n->latency = GNUNET_TIME_relative_get_forever();
1584   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1585                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1586                                  MAX_BANDWIDTH_CARRY_S);
1587   n->timeout_task =
1588       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1589                                     &neighbour_timeout_task, n);
1590   GNUNET_assert (GNUNET_OK ==
1591                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1592                                                     &n->id.hashPubKey, n,
1593                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1594   return n;
1595 }
1596
1597
1598 /**
1599  * Try to create a connection to the given target (eventually).
1600  *
1601  * @param target peer to try to connect to
1602  */
1603 void
1604 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1605 {
1606   struct NeighbourMapEntry *n;
1607
1608   // This can happen during shutdown
1609   if (neighbours == NULL)
1610   {
1611     return;
1612   }
1613 #if DEBUG_TRANSPORT
1614   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1615               GNUNET_i2s (target));
1616 #endif
1617   if (0 ==
1618       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1619   {
1620     /* my own hello */
1621     return;
1622   }
1623   n = lookup_neighbour (target);
1624
1625   if (NULL != n)
1626   {
1627     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1628       return;                   /* already connecting or connected */
1629     if (is_disconnecting (n))
1630       change_state (n, S_NOT_CONNECTED);
1631   }
1632
1633
1634   if (n == NULL)
1635     n = setup_neighbour (target);
1636 #if DEBUG_TRANSPORT
1637   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1638               "Asking ATS for suggested address to connect to peer `%s'\n",
1639               GNUNET_i2s (&n->id));
1640 #endif
1641
1642   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1643 }
1644
1645 /**
1646  * Test if we're connected to the given peer.
1647  *
1648  * @param target peer to test
1649  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1650  */
1651 int
1652 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1653 {
1654   struct NeighbourMapEntry *n;
1655
1656   // This can happen during shutdown
1657   if (neighbours == NULL)
1658   {
1659     return GNUNET_NO;
1660   }
1661
1662   n = lookup_neighbour (target);
1663
1664   if ((NULL == n) || (S_CONNECTED != n->state))
1665     return GNUNET_NO;           /* not connected */
1666   return GNUNET_YES;
1667 }
1668
1669 /**
1670  * A session was terminated. Take note.
1671  *
1672  * @param peer identity of the peer where the session died
1673  * @param session session that is gone
1674  */
1675 void
1676 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1677                                    struct Session *session)
1678 {
1679   struct NeighbourMapEntry *n;
1680
1681   if (neighbours == NULL)
1682   {
1683     /* This can happen during shutdown */
1684     return;
1685   }
1686
1687 #if DEBUG_TRANSPORT
1688   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Session %X to peer `%s' ended \n",
1689               session, GNUNET_i2s (peer));
1690 #endif
1691
1692   n = lookup_neighbour (peer);
1693   if (NULL == n)
1694     return;
1695   if (session != n->session)
1696     return;                     /* doesn't affect us */
1697   if (n->state == S_CONNECTED)
1698   {
1699     if (n->address_state == USED)
1700     {
1701       GST_validation_set_address_use (&n->id,
1702                                       n->address,
1703                                       n->session,
1704                                       GNUNET_NO);
1705       GNUNET_ATS_address_in_use (GST_ats,n->address, n->session, GNUNET_NO);
1706       n->address_state = UNUSED;
1707     }
1708   }
1709
1710   if (NULL != n->address)
1711   {
1712     GNUNET_HELLO_address_free (n->address);
1713     n->address = NULL;
1714   }
1715   n->session = NULL;
1716   
1717   /* not connected anymore anyway, shouldn't matter */
1718   if (S_CONNECTED != n->state)
1719     return;
1720
1721   /* connected, try fast reconnect */
1722 #if DEBUG_TRANSPORT
1723   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1724               "Trying fast reconnect to peer `%s'\n", GNUNET_i2s (peer));
1725 #endif
1726   change_state (n, S_FAST_RECONNECT);
1727   GNUNET_assert (neighbours_connected > 0);
1728   neighbours_connected--;
1729
1730   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1731   {
1732     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1733     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1734   }
1735
1736   /* We are connected, so ask ATS to switch addresses */
1737   GNUNET_SCHEDULER_cancel (n->timeout_task);
1738   n->timeout_task =
1739       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1740                                     &neighbour_timeout_task, n);
1741   /* try QUICKLY to re-establish a connection, reduce timeout! */
1742   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1743     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1744   n->ats_suggest =
1745       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1746                                     n);
1747   GNUNET_ATS_suggest_address (GST_ats, peer);
1748 }
1749
1750
1751 /**
1752  * Transmit a message to the given target using the active connection.
1753  *
1754  * @param target destination
1755  * @param msg message to send
1756  * @param msg_size number of bytes in msg
1757  * @param timeout when to fail with timeout
1758  * @param cont function to call when done
1759  * @param cont_cls closure for 'cont'
1760  */
1761 void
1762 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1763                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1764                      GST_NeighbourSendContinuation cont, void *cont_cls)
1765 {
1766   struct NeighbourMapEntry *n;
1767   struct MessageQueue *mq;
1768
1769   // This can happen during shutdown
1770   if (neighbours == NULL)
1771   {
1772     return;
1773   }
1774
1775   n = lookup_neighbour (target);
1776   if ((n == NULL) || (!is_connected (n)))
1777   {
1778     GNUNET_STATISTICS_update (GST_stats,
1779                               gettext_noop
1780                               ("# messages not sent (no such peer or not connected)"),
1781                               1, GNUNET_NO);
1782 #if DEBUG_TRANSPORT
1783     if (n == NULL)
1784       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1785                   "Could not send message to peer `%s': unknown neighbour",
1786                   GNUNET_i2s (target));
1787     else if (!is_connected (n))
1788       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1789                   "Could not send message to peer `%s': not connected\n",
1790                   GNUNET_i2s (target));
1791 #endif
1792     if (NULL != cont)
1793       cont (cont_cls, GNUNET_SYSERR);
1794     return;
1795   }
1796
1797   if ((n->session == NULL) && (n->address == NULL) )
1798   {
1799     GNUNET_STATISTICS_update (GST_stats,
1800                               gettext_noop
1801                               ("# messages not sent (no such peer or not connected)"),
1802                               1, GNUNET_NO);
1803 #if DEBUG_TRANSPORT
1804     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1805                 "Could not send message to peer `%s': no address available\n",
1806                 GNUNET_i2s (target));
1807 #endif
1808
1809     if (NULL != cont)
1810       cont (cont_cls, GNUNET_SYSERR);
1811     return;
1812   }
1813
1814   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1815   GNUNET_STATISTICS_update (GST_stats,
1816                             gettext_noop
1817                             ("# bytes in message queue for other peers"),
1818                             msg_size, GNUNET_NO);
1819   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1820   mq->cont = cont;
1821   mq->cont_cls = cont_cls;
1822   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1823   memcpy (&mq[1], msg, msg_size);
1824   mq->message_buf = (const char *) &mq[1];
1825   mq->message_buf_size = msg_size;
1826   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1827   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1828
1829   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1830       (NULL == n->is_active))
1831     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1832 }
1833
1834
1835 /**
1836  * We have received a message from the given sender.  How long should
1837  * we delay before receiving more?  (Also used to keep the peer marked
1838  * as live).
1839  *
1840  * @param sender sender of the message
1841  * @param size size of the message
1842  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1843  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1844  *                   GNUNET_SYSERR if the connection is not fully up yet
1845  * @return how long to wait before reading more from this sender
1846  */
1847 struct GNUNET_TIME_Relative
1848 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1849                                         *sender, ssize_t size, int *do_forward)
1850 {
1851   struct NeighbourMapEntry *n;
1852   struct GNUNET_TIME_Relative ret;
1853
1854   // This can happen during shutdown
1855   if (neighbours == NULL)
1856   {
1857     return GNUNET_TIME_UNIT_FOREVER_REL;
1858   }
1859
1860   n = lookup_neighbour (sender);
1861   if (n == NULL)
1862   {
1863     GST_neighbours_try_connect (sender);
1864     n = lookup_neighbour (sender);
1865     if (NULL == n)
1866     {
1867       GNUNET_STATISTICS_update (GST_stats,
1868                                 gettext_noop
1869                                 ("# messages discarded due to lack of neighbour record"),
1870                                 1, GNUNET_NO);
1871       *do_forward = GNUNET_NO;
1872       return GNUNET_TIME_UNIT_ZERO;
1873     }
1874   }
1875   if (!is_connected (n))
1876   {
1877     *do_forward = GNUNET_SYSERR;
1878     return GNUNET_TIME_UNIT_ZERO;
1879   }
1880   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1881   {
1882     n->quota_violation_count++;
1883 #if DEBUG_TRANSPORT
1884     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1885                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1886                 n->in_tracker.available_bytes_per_s__,
1887                 n->quota_violation_count);
1888 #endif
1889     /* Discount 32k per violation */
1890     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1891   }
1892   else
1893   {
1894     if (n->quota_violation_count > 0)
1895     {
1896       /* try to add 32k back */
1897       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1898       n->quota_violation_count--;
1899     }
1900   }
1901   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1902   {
1903     GNUNET_STATISTICS_update (GST_stats,
1904                               gettext_noop
1905                               ("# bandwidth quota violations by other peers"),
1906                               1, GNUNET_NO);
1907     *do_forward = GNUNET_NO;
1908     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1909   }
1910   *do_forward = GNUNET_YES;
1911   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1912   if (ret.rel_value > 0)
1913   {
1914 #if DEBUG_TRANSPORT
1915     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1916                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1917                 (unsigned long long) n->in_tracker.
1918                 consumption_since_last_update__,
1919                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1920                 (unsigned long long) ret.rel_value);
1921 #endif
1922     GNUNET_STATISTICS_update (GST_stats,
1923                               gettext_noop ("# ms throttling suggested"),
1924                               (int64_t) ret.rel_value, GNUNET_NO);
1925   }
1926   return ret;
1927 }
1928
1929
1930 /**
1931  * Keep the connection to the given neighbour alive longer,
1932  * we received a KEEPALIVE (or equivalent).
1933  *
1934  * @param neighbour neighbour to keep alive
1935  */
1936 void
1937 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1938 {
1939   struct NeighbourMapEntry *n;
1940
1941   // This can happen during shutdown
1942   if (neighbours == NULL)
1943   {
1944     return;
1945   }
1946
1947   n = lookup_neighbour (neighbour);
1948   if (NULL == n)
1949   {
1950     GNUNET_STATISTICS_update (GST_stats,
1951                               gettext_noop
1952                               ("# KEEPALIVE messages discarded (not connected)"),
1953                               1, GNUNET_NO);
1954     return;
1955   }
1956   GNUNET_SCHEDULER_cancel (n->timeout_task);
1957   n->timeout_task =
1958       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1959                                     &neighbour_timeout_task, n);
1960
1961   /* send reply to measure latency */
1962   if (S_CONNECTED != n->state)
1963     return;
1964
1965   struct GNUNET_MessageHeader m;
1966   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1967   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
1968
1969   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1970                     UINT32_MAX /* priority */ ,
1971                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1972                     GNUNET_YES, NULL, NULL);
1973 }
1974
1975 /**
1976  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
1977  * to this peer
1978  *
1979  * @param neighbour neighbour to keep alive
1980  * @param ats performance data
1981  * @param ats_count number of entries in ats
1982  */
1983 void
1984 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
1985                                    const struct GNUNET_ATS_Information * ats,
1986                                    uint32_t ats_count)
1987 {
1988   struct NeighbourMapEntry *n;
1989   struct GNUNET_ATS_Information * ats_new;
1990   uint32_t latency;
1991
1992   if (neighbours == NULL)
1993   {
1994     // This can happen during shutdown
1995     return;
1996   }
1997
1998   n = lookup_neighbour (neighbour);
1999   if (NULL == n)
2000   {
2001     GNUNET_STATISTICS_update (GST_stats,
2002                               gettext_noop
2003                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2004                               1, GNUNET_NO);
2005     return;
2006   }
2007   if (n->expect_latency_response != GNUNET_YES)
2008   {
2009     GNUNET_STATISTICS_update (GST_stats,
2010                               gettext_noop
2011                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2012                               1, GNUNET_NO);
2013     return;
2014   }
2015   n->expect_latency_response = GNUNET_NO;
2016
2017   GNUNET_assert (n->keep_alive_sent.abs_value != GNUNET_TIME_absolute_get_zero().abs_value);
2018   n->latency = GNUNET_TIME_absolute_get_difference(n->keep_alive_sent, GNUNET_TIME_absolute_get());
2019 #if DEBUG_TRANSPORT
2020   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2021               "Latency for peer `%s' is %llu ms\n",
2022               GNUNET_i2s (&n->id), n->latency.rel_value);
2023 #endif
2024
2025
2026   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever().rel_value)
2027   {
2028     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2029   }
2030   else
2031   {
2032     ats_new = GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2033     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2034
2035     /* add latency */
2036     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2037     if (n->latency.rel_value > UINT32_MAX)
2038       latency = UINT32_MAX;
2039     else
2040       latency = n->latency.rel_value;
2041     ats_new[ats_count].value = htonl (latency);
2042
2043     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new, ats_count + 1);
2044     GNUNET_free (ats_new);
2045   }
2046 }
2047
2048
2049 /**
2050  * Change the incoming quota for the given peer.
2051  *
2052  * @param neighbour identity of peer to change qutoa for
2053  * @param quota new quota
2054  */
2055 void
2056 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2057                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2058 {
2059   struct NeighbourMapEntry *n;
2060
2061   // This can happen during shutdown
2062   if (neighbours == NULL)
2063   {
2064     return;
2065   }
2066
2067   n = lookup_neighbour (neighbour);
2068   if (n == NULL)
2069   {
2070     GNUNET_STATISTICS_update (GST_stats,
2071                               gettext_noop
2072                               ("# SET QUOTA messages ignored (no such peer)"),
2073                               1, GNUNET_NO);
2074     return;
2075   }
2076 #if DEBUG_TRANSPORT
2077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2078               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2079               ntohl (quota.value__), GNUNET_i2s (&n->id));
2080 #endif
2081   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2082   if (0 != ntohl (quota.value__))
2083     return;
2084 #if DEBUG_TRANSPORT
2085   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2086               GNUNET_i2s (&n->id), "SET_QUOTA");
2087 #endif
2088   if (is_connected (n))
2089     GNUNET_STATISTICS_update (GST_stats,
2090                               gettext_noop ("# disconnects due to quota of 0"),
2091                               1, GNUNET_NO);
2092   disconnect_neighbour (n);
2093 }
2094
2095
2096 /**
2097  * Closure for the neighbours_iterate function.
2098  */
2099 struct IteratorContext
2100 {
2101   /**
2102    * Function to call on each connected neighbour.
2103    */
2104   GST_NeighbourIterator cb;
2105
2106   /**
2107    * Closure for 'cb'.
2108    */
2109   void *cb_cls;
2110 };
2111
2112
2113 /**
2114  * Call the callback from the closure for each connected neighbour.
2115  *
2116  * @param cls the 'struct IteratorContext'
2117  * @param key the hash of the public key of the neighbour
2118  * @param value the 'struct NeighbourMapEntry'
2119  * @return GNUNET_OK (continue to iterate)
2120  */
2121 static int
2122 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2123 {
2124   struct IteratorContext *ic = cls;
2125   struct NeighbourMapEntry *n = value;
2126
2127   if (!is_connected (n))
2128     return GNUNET_OK;
2129
2130   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2131   return GNUNET_OK;
2132 }
2133
2134
2135 /**
2136  * Iterate over all connected neighbours.
2137  *
2138  * @param cb function to call
2139  * @param cb_cls closure for cb
2140  */
2141 void
2142 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2143 {
2144   struct IteratorContext ic;
2145
2146   // This can happen during shutdown
2147   if (neighbours == NULL)
2148   {
2149     return;
2150   }
2151
2152   ic.cb = cb;
2153   ic.cb_cls = cb_cls;
2154   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2155 }
2156
2157 /**
2158  * If we have an active connection to the given target, it must be shutdown.
2159  *
2160  * @param target peer to disconnect from
2161  */
2162 void
2163 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2164 {
2165   struct NeighbourMapEntry *n;
2166
2167   // This can happen during shutdown
2168   if (neighbours == NULL)
2169   {
2170     return;
2171   }
2172
2173   n = lookup_neighbour (target);
2174   if (NULL == n)
2175     return;                     /* not active */
2176   disconnect_neighbour (n);
2177 }
2178
2179
2180 /**
2181  * We received a disconnect message from the given peer,
2182  * validate and process.
2183  *
2184  * @param peer sender of the message
2185  * @param msg the disconnect message
2186  */
2187 void
2188 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2189                                           *peer,
2190                                           const struct GNUNET_MessageHeader
2191                                           *msg)
2192 {
2193   struct NeighbourMapEntry *n;
2194   const struct SessionDisconnectMessage *sdm;
2195   GNUNET_HashCode hc;
2196
2197 #if DEBUG_TRANSPORT
2198   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2199               "Received DISCONNECT message from peer `%s'\n",
2200               GNUNET_i2s (peer));
2201 #endif
2202
2203   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2204   {
2205     // GNUNET_break_op (0);
2206     GNUNET_STATISTICS_update (GST_stats,
2207                               gettext_noop
2208                               ("# disconnect messages ignored (old format)"), 1,
2209                               GNUNET_NO);
2210     return;
2211   }
2212   sdm = (const struct SessionDisconnectMessage *) msg;
2213   n = lookup_neighbour (peer);
2214   if (NULL == n)
2215     return;                     /* gone already */
2216   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2217       n->connect_ts.abs_value)
2218   {
2219     GNUNET_STATISTICS_update (GST_stats,
2220                               gettext_noop
2221                               ("# disconnect messages ignored (timestamp)"), 1,
2222                               GNUNET_NO);
2223     return;
2224   }
2225   GNUNET_CRYPTO_hash (&sdm->public_key,
2226                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2227                       &hc);
2228   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2229   {
2230     GNUNET_break_op (0);
2231     return;
2232   }
2233   if (ntohl (sdm->purpose.size) !=
2234       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2235       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2236       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2237   {
2238     GNUNET_break_op (0);
2239     return;
2240   }
2241   if (GNUNET_OK !=
2242       GNUNET_CRYPTO_rsa_verify
2243       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2244        &sdm->signature, &sdm->public_key))
2245   {
2246     GNUNET_break_op (0);
2247     return;
2248   }
2249   GST_neighbours_force_disconnect (peer);
2250 }
2251
2252
2253 /**
2254  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2255  * Consider switching to it.
2256  *
2257  * @param message possibly a 'struct SessionConnectMessage' (check format)
2258  * @param peer identity of the peer to switch the address for
2259  * @param address address of the other peer, NULL if other peer
2260  *                       connected to us
2261  * @param session session to use (or NULL)
2262  * @param ats performance data
2263  * @param ats_count number of entries in ats
2264  */
2265 void
2266 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2267                                    const struct GNUNET_PeerIdentity *peer,
2268                                    const struct GNUNET_HELLO_Address *address,
2269                                    struct Session *session,
2270                                    const struct GNUNET_ATS_Information *ats,
2271                                    uint32_t ats_count)
2272 {
2273   const struct SessionConnectMessage *scm;
2274   struct GNUNET_MessageHeader msg;
2275   struct NeighbourMapEntry *n;
2276   size_t msg_len;
2277   size_t ret;
2278
2279 #if DEBUG_TRANSPORT
2280   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2281               "Received CONNECT_ACK message from peer `%s'\n",
2282               GNUNET_i2s (peer));
2283 #endif
2284
2285   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2286   {
2287     GNUNET_break_op (0);
2288     return;
2289   }
2290   scm = (const struct SessionConnectMessage *) message;
2291   GNUNET_break_op (ntohl (scm->reserved) == 0);
2292   n = lookup_neighbour (peer);
2293   if (NULL == n)
2294   {
2295     /* we did not send 'CONNECT' -- at least not recently */
2296     GNUNET_STATISTICS_update (GST_stats,
2297                               gettext_noop ("# unexpected CONNECT_ACK messages (no peer)"), 1,
2298                               GNUNET_NO);
2299     return;
2300   }  
2301
2302   /* Additional check
2303    *
2304    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2305    *
2306    * We also received an CONNECT message, switched from SENDT to RECV and
2307    * ATS already suggested us an address after a successful blacklist check
2308    */
2309   if ((n->state != S_CONNECT_SENT) && ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2310   {
2311     GNUNET_STATISTICS_update (GST_stats,
2312                               gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
2313                               GNUNET_NO);
2314     return;
2315   }
2316
2317   change_state (n, S_CONNECTED);
2318
2319   if (NULL != session)
2320   {
2321     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2322                      "transport-ats",
2323                      "Giving ATS session %p of plugin %s for peer %s\n",
2324                      session, address->transport_name, GNUNET_i2s (peer));
2325   }
2326   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2327   GNUNET_assert (NULL != n->address);
2328   if (n->address_state == FRESH)
2329   {
2330     GST_validation_set_address_use (&n->id,
2331             n->address,
2332             n->session,
2333             GNUNET_YES);
2334     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2335     n->address_state = USED;
2336   }
2337
2338   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2339
2340   /* send ACK (ACK) */
2341   msg_len = sizeof (msg);
2342   msg.size = htons (msg_len);
2343   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2344
2345   ret =
2346       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2347                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2348                         n->address, GNUNET_YES, NULL,
2349                         NULL);
2350
2351   if (ret == GNUNET_SYSERR)
2352     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2353                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2354                 GNUNET_i2s (&n->id), 
2355                 GST_plugins_a2s (n->address), n->session);
2356
2357
2358   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2359     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2360   
2361   neighbours_connected++;
2362   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2363                             GNUNET_NO);
2364 #if DEBUG_TRANSPORT
2365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2366               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2367               GNUNET_i2s (&n->id),
2368               GST_plugins_a2s (n->address), n->session,
2369               __LINE__);
2370 #endif
2371   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2372   send_outbound_quota(peer, n->bandwidth_out);
2373
2374 }
2375
2376
2377 void
2378 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2379                            const struct GNUNET_PeerIdentity *peer,
2380                            const struct GNUNET_HELLO_Address *address,
2381                            struct Session *session,
2382                            const struct GNUNET_ATS_Information *ats,
2383                            uint32_t ats_count)
2384 {
2385   struct NeighbourMapEntry *n;
2386
2387 #if DEBUG_TRANSPORT
2388   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2389               "Received ACK message from peer `%s'\n",
2390               GNUNET_i2s (peer));
2391 #endif
2392
2393   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2394   {
2395     GNUNET_break_op (0);
2396     return;
2397   }
2398   n = lookup_neighbour (peer);
2399   if (NULL == n)
2400   {
2401     send_disconnect (peer, address,
2402                      session);
2403     GNUNET_break (0);
2404     return;
2405   }
2406   if (S_CONNECTED == n->state)
2407     return;
2408   if (!is_connecting(n))
2409   {
2410     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2411                               GNUNET_NO);
2412     return;
2413   }
2414   change_state (n, S_CONNECTED);
2415   if (NULL != session)
2416     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2417                      "transport-ats",
2418                      "Giving ATS session %p of plugin %s for peer %s\n",
2419                      session, address->transport_name, GNUNET_i2s (peer));
2420   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2421   GNUNET_assert (n->address != NULL);
2422   if (n->address_state == FRESH)
2423   {
2424     GST_validation_set_address_use (&n->id,
2425                                   n->address,
2426                                   n->session,
2427                                   GNUNET_YES);
2428     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2429     n->address_state = USED;
2430   }
2431
2432   neighbours_connected++;
2433   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2434                             GNUNET_NO);
2435   
2436   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2437   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2438         n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2439 #if DEBUG_TRANSPORT
2440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2441               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2442               GNUNET_i2s (&n->id),
2443               GST_plugins_a2s (n->address), n->session,
2444               __LINE__);
2445 #endif
2446   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2447   send_outbound_quota(peer, n->bandwidth_out);
2448 }
2449
2450 struct BlackListCheckContext
2451 {
2452   struct GNUNET_ATS_Information *ats;
2453
2454   uint32_t ats_count;
2455
2456   struct Session *session;
2457
2458   struct GNUNET_HELLO_Address *address;
2459
2460   struct GNUNET_TIME_Absolute ts;
2461 };
2462
2463
2464 static void
2465 handle_connect_blacklist_cont (void *cls,
2466                                const struct GNUNET_PeerIdentity *peer,
2467                                int result)
2468 {
2469   struct NeighbourMapEntry *n;
2470   struct BlackListCheckContext *bcc = cls;
2471
2472 #if DEBUG_TRANSPORT
2473   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2474               "Blacklist check due to CONNECT message: `%s'\n",
2475               GNUNET_i2s (peer),
2476               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2477 #endif
2478
2479   /* not allowed */
2480   if (GNUNET_OK != result)
2481   {
2482     GNUNET_HELLO_address_free (bcc->address);
2483     GNUNET_free (bcc);
2484     return;
2485   }
2486
2487   n = lookup_neighbour (peer);
2488   if (NULL == n)
2489     n = setup_neighbour (peer);
2490
2491   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2492   {
2493     if (NULL != bcc->session)
2494       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2495                        "transport-ats",
2496                        "Giving ATS session %p of address `%s' for peer %s\n",
2497                        bcc->session, 
2498                        GST_plugins_a2s (bcc->address),
2499                        GNUNET_i2s (peer));
2500     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2501
2502     GNUNET_ATS_address_update (GST_ats,
2503                                bcc->address,
2504                                bcc->session,
2505                                bcc->ats,
2506                                bcc->ats_count);
2507     n->connect_ts = bcc->ts;
2508   }
2509
2510   GNUNET_HELLO_address_free (bcc->address);
2511   GNUNET_free (bcc);
2512
2513   if (n->state != S_CONNECT_RECV)
2514     change_state (n, S_CONNECT_RECV);
2515
2516
2517   /* Ask ATS for an address to connect via that address */
2518   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2519     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2520   n->ats_suggest =
2521       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2522                                     n);
2523   GNUNET_ATS_suggest_address (GST_ats, peer);
2524 }
2525
2526 /**
2527  * We received a 'SESSION_CONNECT' message from the other peer.
2528  * Consider switching to it.
2529  *
2530  * @param message possibly a 'struct SessionConnectMessage' (check format)
2531  * @param peer identity of the peer to switch the address for
2532  * @param address address of the other peer, NULL if other peer
2533  *                       connected to us
2534  * @param session session to use (or NULL)
2535  * @param ats performance data
2536  * @param ats_count number of entries in ats (excluding 0-termination)
2537  */
2538 void
2539 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2540                                const struct GNUNET_PeerIdentity *peer,
2541                                const struct GNUNET_HELLO_Address *address,
2542                                struct Session *session,
2543                                const struct GNUNET_ATS_Information *ats,
2544                                uint32_t ats_count)
2545 {
2546   const struct SessionConnectMessage *scm;
2547   struct BlackListCheckContext *bcc = NULL;
2548   struct NeighbourMapEntry *n;
2549
2550 #if DEBUG_TRANSPORT
2551   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2552               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2553 #endif
2554
2555   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2556   {
2557     GNUNET_break_op (0);
2558     return;
2559   }
2560
2561   scm = (const struct SessionConnectMessage *) message;
2562   GNUNET_break_op (ntohl (scm->reserved) == 0);
2563
2564   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2565
2566   n = lookup_neighbour (peer);
2567   if ( (n != NULL) && (S_CONNECTED == n->state) )
2568   {
2569      /* connected peer switches addresses */
2570      return;
2571   }
2572
2573
2574   /* we are not connected to this peer */
2575   /* do blacklist check */
2576   bcc =
2577       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2578                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2579   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2580   bcc->ats_count = ats_count + 1;
2581   bcc->address = GNUNET_HELLO_address_copy (address);
2582   bcc->session = session;
2583   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2584   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2585   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2586   bcc->ats[ats_count].value = htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2587   GST_blacklist_test_allowed (peer, address->transport_name, handle_connect_blacklist_cont,
2588                               bcc);
2589 }
2590
2591
2592 /* end of file gnunet-service-transport_neighbours.c */