asserts for 1903:
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
61
62 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
63
64 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
65
66
67 /**
68  * Entry in neighbours.
69  */
70 struct NeighbourMapEntry;
71
72 /**
73  * Message a peer sends to another to indicate its
74  * preference for communicating via a particular
75  * session (and the desire to establish a real
76  * connection).
77  */
78 struct SessionConnectMessage
79 {
80   /**
81    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
82    */
83   struct GNUNET_MessageHeader header;
84
85   /**
86    * Always zero.
87    */
88   uint32_t reserved GNUNET_PACKED;
89
90   /**
91    * Absolute time at the sender.  Only the most recent connect
92    * message implies which session is preferred by the sender.
93    */
94   struct GNUNET_TIME_AbsoluteNBO timestamp;
95
96 };
97
98
99 struct SessionDisconnectMessage
100 {
101   /**
102    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
103    */
104   struct GNUNET_MessageHeader header;
105
106   /**
107    * Always zero.
108    */
109   uint32_t reserved GNUNET_PACKED;
110
111   /**
112    * Purpose of the signature.  Extends over the timestamp.
113    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
114    */
115   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
116
117   /**
118    * Absolute time at the sender.  Only the most recent connect
119    * message implies which session is preferred by the sender.
120    */
121   struct GNUNET_TIME_AbsoluteNBO timestamp;
122
123   /**
124    * Public key of the sender.
125    */
126   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
127
128   /**
129    * Signature of the peer that sends us the disconnect.  Only
130    * valid if the timestamp is AFTER the timestamp from the
131    * corresponding 'CONNECT' message.
132    */
133   struct GNUNET_CRYPTO_RsaSignature signature;
134
135 };
136
137
138 /**
139  * For each neighbour we keep a list of messages
140  * that we still want to transmit to the neighbour.
141  */
142 struct MessageQueue
143 {
144
145   /**
146    * This is a doubly linked list.
147    */
148   struct MessageQueue *next;
149
150   /**
151    * This is a doubly linked list.
152    */
153   struct MessageQueue *prev;
154
155   /**
156    * Once this message is actively being transmitted, which
157    * neighbour is it associated with?
158    */
159   struct NeighbourMapEntry *n;
160
161   /**
162    * Function to call once we're done.
163    */
164   GST_NeighbourSendContinuation cont;
165
166   /**
167    * Closure for 'cont'
168    */
169   void *cont_cls;
170
171   /**
172    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
173    * stuck together in memory.  Allocated at the end of this struct.
174    */
175   const char *message_buf;
176
177   /**
178    * Size of the message buf
179    */
180   size_t message_buf_size;
181
182   /**
183    * At what time should we fail?
184    */
185   struct GNUNET_TIME_Absolute timeout;
186
187 };
188
189
190 enum State
191 {
192   /**
193    * fresh peer or completely disconnected 
194    */
195   S_NOT_CONNECTED,
196
197   /**
198    * sent CONNECT message to other peer, waiting for CONNECT_ACK 
199    */
200   S_CONNECT_SENT,
201
202   /**
203    * received CONNECT message to other peer, sending CONNECT_ACK 
204    */
205   S_CONNECT_RECV,
206
207   /**
208    * received ACK or payload 
209    */
210   S_CONNECTED,
211
212   /**
213    * connection ended, fast reconnect
214    */
215   S_FAST_RECONNECT,
216
217   /**
218    * Disconnect in progress 
219    */
220   S_DISCONNECT
221 };
222
223 enum Address_State
224 {
225   USED,
226   UNUSED,
227   FRESH,
228 };
229
230 /**
231  * Entry in neighbours.
232  */
233 struct NeighbourMapEntry
234 {
235
236   /**
237    * Head of list of messages we would like to send to this peer;
238    * must contain at most one message per client.
239    */
240   struct MessageQueue *messages_head;
241
242   /**
243    * Tail of list of messages we would like to send to this peer; must
244    * contain at most one message per client.
245    */
246   struct MessageQueue *messages_tail;
247
248   /**
249    * Performance data for the peer.
250    */
251   //struct GNUNET_ATS_Information *ats;
252
253   /**
254    * Are we currently trying to send a message? If so, which one?
255    */
256   struct MessageQueue *is_active;
257
258   /**
259    * Active session for communicating with the peer.
260    */
261   struct Session *session;
262
263   /**
264    * Address we currently use.
265    */
266   struct GNUNET_HELLO_Address *address;
267
268   /**
269    * Identity of this neighbour.
270    */
271   struct GNUNET_PeerIdentity id;
272
273   /**
274    * ID of task scheduled to run when this peer is about to
275    * time out (will free resources associated with the peer).
276    */
277   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
278
279   /**
280    * ID of task scheduled to send keepalives.
281    */
282   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
283
284   /**
285    * ID of task scheduled to run when we should try transmitting
286    * the head of the message queue.
287    */
288   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
289
290   /**
291    * Tracker for inbound bandwidth.
292    */
293   struct GNUNET_BANDWIDTH_Tracker in_tracker;
294
295   /**
296    * Inbound bandwidth from ATS, activated when connection is up
297    */
298   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
299
300   /**
301    * Inbound bandwidth from ATS, activated when connection is up
302    */
303   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
304
305   /**
306    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
307    */
308   struct GNUNET_TIME_Absolute connect_ts;
309
310   /**
311    * When did we sent the last keep-alive message?
312    */
313   struct GNUNET_TIME_Absolute keep_alive_sent;
314
315   /**
316    * Latest calculated latency value
317    */
318   struct GNUNET_TIME_Relative latency;
319
320   /**
321    * Timeout for ATS
322    * We asked ATS for a new address for this peer
323    */
324   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
325
326   /**
327    * Task the resets the peer state after due to an pending
328    * unsuccessful connection setup
329    */
330   GNUNET_SCHEDULER_TaskIdentifier state_reset;
331
332   /**
333    * How often has the other peer (recently) violated the inbound
334    * traffic limit?  Incremented by 10 per violation, decremented by 1
335    * per non-violation (for each time interval).
336    */
337   unsigned int quota_violation_count;
338
339
340   /**
341    * The current state of the peer
342    * Element of enum State
343    */
344   int state;
345
346   /**
347    * Did we sent an KEEP_ALIVE message and are we expecting a response?
348    */
349   int expect_latency_response;
350   int address_state;
351 };
352
353
354 /**
355  * All known neighbours and their HELLOs.
356  */
357 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
358
359 /**
360  * Closure for connect_notify_cb and disconnect_notify_cb
361  */
362 static void *callback_cls;
363
364 /**
365  * Function to call when we connected to a neighbour.
366  */
367 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
368
369 /**
370  * Function to call when we disconnected from a neighbour.
371  */
372 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
373
374 /**
375  * counter for connected neighbours
376  */
377 static int neighbours_connected;
378
379 /**
380  * Lookup a neighbour entry in the neighbours hash map.
381  *
382  * @param pid identity of the peer to look up
383  * @return the entry, NULL if there is no existing record
384  */
385 static struct NeighbourMapEntry *
386 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
387 {
388   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
389 }
390
391 #define change_state(n, state, ...) change (n, state, __LINE__)
392
393 static int
394 is_connecting (struct NeighbourMapEntry *n)
395 {
396   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
397     return GNUNET_YES;
398   return GNUNET_NO;
399 }
400
401 static int
402 is_connected (struct NeighbourMapEntry *n)
403 {
404   if (n->state == S_CONNECTED)
405     return GNUNET_YES;
406   return GNUNET_NO;
407 }
408
409 static int
410 is_disconnecting (struct NeighbourMapEntry *n)
411 {
412   if (n->state == S_DISCONNECT)
413     return GNUNET_YES;
414   return GNUNET_NO;
415 }
416
417 static const char *
418 print_state (int state)
419 {
420   switch (state)
421   {
422   case S_CONNECTED:
423     return "S_CONNECTED";
424     break;
425   case S_CONNECT_RECV:
426     return "S_CONNECT_RECV";
427     break;
428   case S_CONNECT_SENT:
429     return "S_CONNECT_SENT";
430     break;
431   case S_DISCONNECT:
432     return "S_DISCONNECT";
433     break;
434   case S_NOT_CONNECTED:
435     return "S_NOT_CONNECTED";
436     break;
437   case S_FAST_RECONNECT:
438     return "S_FAST_RECONNECT";
439     break;
440   default:
441     GNUNET_break (0);
442     break;
443   }
444   return NULL;
445 }
446
447 static int
448 change (struct NeighbourMapEntry *n, int state, int line);
449
450 static void
451 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
452
453
454 static void
455 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
456 {
457   struct NeighbourMapEntry *n = cls;
458
459   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
460   if (n->state == S_CONNECTED)
461     return;
462
463 #if DEBUG_TRANSPORT
464   GNUNET_STATISTICS_update (GST_stats,
465                             gettext_noop
466                             ("# failed connection attempts due to timeout"), 1,
467                             GNUNET_NO);
468 #endif
469
470   /* resetting state */
471   n->state = S_NOT_CONNECTED;
472
473   /* destroying address */
474   GNUNET_assert (strlen(n->address->transport_name) > 0); 
475   GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
476
477   /* request new address */
478   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
479     GNUNET_SCHEDULER_cancel (n->ats_suggest);
480   n->ats_suggest =
481       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
482                                     n);
483   GNUNET_ATS_suggest_address (GST_ats, &n->id);
484 }
485
486 static int
487 change (struct NeighbourMapEntry *n, int state, int line)
488 {
489   /* allowed transitions */
490   int allowed = GNUNET_NO;
491
492   switch (n->state)
493   {
494   case S_NOT_CONNECTED:
495     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
496         (state == S_DISCONNECT))
497       allowed = GNUNET_YES;
498     break;
499   case S_CONNECT_RECV:
500     allowed = GNUNET_YES;
501     break;
502   case S_CONNECT_SENT:
503     allowed = GNUNET_YES;
504     break;
505   case S_CONNECTED:
506     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
507       allowed = GNUNET_YES;
508     break;
509   case S_DISCONNECT:
510     break;
511   case S_FAST_RECONNECT:
512     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
513       allowed = GNUNET_YES;
514     break;
515   default:
516     GNUNET_break (0);
517     break;
518   }
519   if (allowed == GNUNET_NO)
520   {
521     char *old = GNUNET_strdup (print_state (n->state));
522     char *new = GNUNET_strdup (print_state (state));
523     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
524                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
525                 new, line);
526     GNUNET_break (0);
527     GNUNET_free (old);
528     GNUNET_free (new);
529     return GNUNET_SYSERR;
530   }
531 #if DEBUG_TRANSPORT
532
533   {
534     char *old = GNUNET_strdup (print_state (n->state));
535     char *new = GNUNET_strdup (print_state (state));
536     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
537                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
538                 GNUNET_i2s (&n->id), n, old, new, line);
539     GNUNET_free (old);
540     GNUNET_free (new);
541   }
542 #endif
543   n->state = state;
544
545   switch (n->state)
546   {
547   case S_FAST_RECONNECT:
548   case S_CONNECT_RECV:
549   case S_CONNECT_SENT:
550     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
551       GNUNET_SCHEDULER_cancel (n->state_reset);
552     n->state_reset =
553       GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
554                                     n);   
555     break;
556   case S_CONNECTED:
557   case S_NOT_CONNECTED:
558   case S_DISCONNECT:
559     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
560     {
561 #if DEBUG_TRANSPORT
562       char *old = GNUNET_strdup (print_state (n->state));
563       char *new = GNUNET_strdup (print_state (state));
564       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
565                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
566                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
567                   old, new);
568       GNUNET_free (old);
569       GNUNET_free (new);
570 #endif
571       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
572       GNUNET_SCHEDULER_cancel (n->state_reset);
573       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
574     }
575     break;
576
577   default:
578     GNUNET_assert (0);
579   }
580   
581
582
583   return GNUNET_OK;
584 }
585
586 static ssize_t
587 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
588                   size_t msgbuf_size, uint32_t priority,
589                   struct GNUNET_TIME_Relative timeout, struct Session *session,
590                   const struct GNUNET_HELLO_Address *address,
591                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
592                   void *cont_cls)
593 {
594   struct GNUNET_TRANSPORT_PluginFunctions *papi;
595   size_t ret = GNUNET_SYSERR;
596
597   /* FIXME : ats returns an address with all values 0 */
598   if (address == NULL)
599   {
600     if (cont != NULL)
601       cont (cont_cls, target, GNUNET_SYSERR);
602     return GNUNET_SYSERR;
603   }
604
605   if ((session == NULL) && (address->address_length == 0))
606   {
607     if (cont != NULL)
608       cont (cont_cls, target, GNUNET_SYSERR);
609     return GNUNET_SYSERR;
610   }
611
612   papi = GST_plugins_find (address->transport_name);
613   if (papi == NULL)
614   {
615     if (cont != NULL)
616       cont (cont_cls, target, GNUNET_SYSERR);
617     return GNUNET_SYSERR;
618   }
619
620   ret =
621       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
622                   address->address, 
623                   address->address_length, GNUNET_YES, cont, cont_cls);
624
625   if (ret == -1)
626   {
627     if (cont != NULL)
628       cont (cont_cls, target, GNUNET_SYSERR);
629   }
630   return ret;
631 }
632
633 /**
634  * Task invoked to start a transmission to another peer.
635  *
636  * @param cls the 'struct NeighbourMapEntry'
637  * @param tc scheduler context
638  */
639 static void
640 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
641
642
643 /**
644  * We're done with our transmission attempt, continue processing.
645  *
646  * @param cls the 'struct MessageQueue' of the message
647  * @param receiver intended receiver
648  * @param success whether it worked or not
649  */
650 static void
651 transmit_send_continuation (void *cls,
652                             const struct GNUNET_PeerIdentity *receiver,
653                             int success)
654 {
655   struct MessageQueue *mq;
656   struct NeighbourMapEntry *n;
657
658   mq = cls;
659   n = mq->n;
660   if (NULL != n)
661   {
662     GNUNET_assert (n->is_active == mq);
663     n->is_active = NULL;
664     if (success == GNUNET_YES)
665     {
666       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
667       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
668     }
669   }
670 #if DEBUG_TRANSPORT
671   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
672               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
673               (success == GNUNET_OK) ? "successful" : "FAILED");
674 #endif
675   if (NULL != mq->cont)
676     mq->cont (mq->cont_cls, success);
677   GNUNET_free (mq);
678 }
679
680
681 /**
682  * Check the ready list for the given neighbour and if a plugin is
683  * ready for transmission (and if we have a message), do so!
684  *
685  * @param n target peer for which to transmit
686  */
687 static void
688 try_transmission_to_peer (struct NeighbourMapEntry *n)
689 {
690   struct MessageQueue *mq;
691   struct GNUNET_TIME_Relative timeout;
692   ssize_t ret;
693
694   if (n->is_active != NULL)
695   {
696     GNUNET_break (0);
697     return;                     /* transmission already pending */
698   }
699   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
700   {
701     GNUNET_break (0);
702     return;                     /* currently waiting for bandwidth */
703   }
704   while (NULL != (mq = n->messages_head))
705   {
706     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
707     if (timeout.rel_value > 0)
708       break;
709     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
710     n->is_active = mq;
711     mq->n = n;
712     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
713   }
714   if (NULL == mq)
715     return;                     /* no more messages */
716
717   if (n->address == NULL)
718   {
719     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720                 "No address for peer `%s'\n",
721                 GNUNET_i2s (&n->id));
722     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
723     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
724     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
725     return;
726   }
727
728   if (GST_plugins_find (n->address->transport_name) == NULL)
729   {
730     GNUNET_break (0);
731     return;
732   }
733   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
734   n->is_active = mq;
735   mq->n = n;
736
737   if ((n->address->address_length == 0) && (n->session == NULL))
738   {
739     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
740                 "No address for peer `%s'\n",
741                 GNUNET_i2s (&n->id));
742     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
743     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
744     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
745     return;
746   }
747
748   ret =
749       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
750                         timeout, n->session, n->address, 
751                         GNUNET_YES, &transmit_send_continuation,
752                         mq);
753   if (ret == -1)
754   {
755     /* failure, but 'send' would not call continuation in this case,
756      * so we need to do it here! */
757     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
758   }
759
760 }
761
762
763 /**
764  * Task invoked to start a transmission to another peer.
765  *
766  * @param cls the 'struct NeighbourMapEntry'
767  * @param tc scheduler context
768  */
769 static void
770 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
771 {
772   struct NeighbourMapEntry *n = cls;
773
774   GNUNET_assert (NULL != lookup_neighbour (&n->id));
775   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
776   try_transmission_to_peer (n);
777 }
778
779
780 /**
781  * Initialize the neighbours subsystem.
782  *
783  * @param cls closure for callbacks
784  * @param connect_cb function to call if we connect to a peer
785  * @param disconnect_cb function to call if we disconnect from a peer
786  */
787 void
788 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
789                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
790 {
791   callback_cls = cls;
792   connect_notify_cb = connect_cb;
793   disconnect_notify_cb = disconnect_cb;
794   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
795 }
796
797
798 static void
799 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
800                       int result)
801 {
802 #if DEBUG_TRANSPORT
803   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804               "Sending DISCONNECT message to peer `%4s': %i\n",
805               GNUNET_i2s (target), result);
806 #endif
807 }
808
809
810 static int
811 send_disconnect (const struct GNUNET_PeerIdentity *target,
812                  const struct GNUNET_HELLO_Address *address,
813                  struct Session *session)
814 {
815   size_t ret;
816   struct SessionDisconnectMessage disconnect_msg;
817
818 #if DEBUG_TRANSPORT
819   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
820               "Sending DISCONNECT message to peer `%4s'\n",
821               GNUNET_i2s (target));
822 #endif
823
824   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
825   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
826   disconnect_msg.reserved = htonl (0);
827   disconnect_msg.purpose.size =
828       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
829              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
830              sizeof (struct GNUNET_TIME_AbsoluteNBO));
831   disconnect_msg.purpose.purpose =
832       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
833   disconnect_msg.timestamp =
834       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
835   disconnect_msg.public_key = GST_my_public_key;
836   GNUNET_assert (GNUNET_OK ==
837                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
838                                          &disconnect_msg.purpose,
839                                          &disconnect_msg.signature));
840
841   ret =
842       send_with_plugin (target, (const char *) &disconnect_msg,
843                         sizeof (disconnect_msg), UINT32_MAX,
844                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
845                         GNUNET_YES,
846                         &send_disconnect_cont, NULL);
847
848   if (ret == GNUNET_SYSERR)
849     return GNUNET_SYSERR;
850
851   GNUNET_STATISTICS_update (GST_stats,
852                             gettext_noop
853                             ("# peers disconnected due to external request"), 1,
854                             GNUNET_NO);
855   return GNUNET_OK;
856 }
857
858
859 /**
860  * Disconnect from the given neighbour, clean up the record.
861  *
862  * @param n neighbour to disconnect from
863  */
864 static void
865 disconnect_neighbour (struct NeighbourMapEntry *n)
866 {
867   struct MessageQueue *mq;
868   int previous_state;
869
870   previous_state = n->state;
871
872   if (is_disconnecting (n))
873     return;
874
875
876   /* send DISCONNECT MESSAGE */
877   if ((previous_state == S_CONNECTED) || is_connecting (n))
878   {
879     if (GNUNET_OK ==
880         send_disconnect (&n->id, n->address,
881                          n->session))
882       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
883                   GNUNET_i2s (&n->id));
884     else
885       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
886                   "Could not send DISCONNECT_MSG to `%s'\n",
887                   GNUNET_i2s (&n->id));
888   }
889
890   change_state (n, S_DISCONNECT);
891
892   if (previous_state == S_CONNECTED)
893   {
894     GNUNET_assert (NULL != n->address);
895     if (n->address_state == USED)
896     {
897       GST_validation_set_address_use (&n->id,
898                                       n->address,
899                                       n->session,
900                                       GNUNET_NO);
901
902       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
903       n->address_state = UNUSED;
904     }
905   }
906
907   if (n->address != NULL)
908   {
909     struct GNUNET_TRANSPORT_PluginFunctions *papi;
910     papi = GST_plugins_find (n->address->transport_name);
911     if (papi != NULL)
912       papi->disconnect (papi->cls, &n->id);
913   }
914   while (NULL != (mq = n->messages_head))
915   {
916     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
917     if (NULL != mq->cont)
918       mq->cont (mq->cont_cls, GNUNET_SYSERR);
919     GNUNET_free (mq);
920   }
921   if (NULL != n->is_active)
922   {
923     n->is_active->n = NULL;
924     n->is_active = NULL;
925   }
926
927   switch (previous_state) {
928     case S_CONNECTED:
929 //      GNUNET_assert (neighbours_connected > 0);
930       neighbours_connected--;
931       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
932       GNUNET_SCHEDULER_cancel (n->keepalive_task);
933       n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
934       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
935                                 GNUNET_NO);
936       disconnect_notify_cb (callback_cls, &n->id);
937       break;
938     case S_FAST_RECONNECT:
939       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
940                                 GNUNET_NO);
941       GNUNET_STATISTICS_update (GST_stats,
942                                 gettext_noop
943                                 ("# fast reconnects failed"),
944                                 1, GNUNET_NO);
945       disconnect_notify_cb (callback_cls, &n->id);
946     default:
947       break;
948   }
949
950   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
951
952   GNUNET_assert (GNUNET_YES ==
953                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
954                                                        &n->id.hashPubKey, n));
955   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
956   {
957     GNUNET_SCHEDULER_cancel (n->ats_suggest);
958     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
959   }
960   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
961   {
962     GNUNET_SCHEDULER_cancel (n->timeout_task);
963     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
964   }
965   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
966   {
967     GNUNET_SCHEDULER_cancel (n->transmission_task);
968     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
969   }
970   if (NULL != n->address)
971   {
972     GNUNET_HELLO_address_free (n->address);
973     n->address = NULL;
974   }
975   n->session = NULL;
976   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
977               GNUNET_i2s (&n->id), n);
978   GNUNET_free (n);
979 }
980
981
982 /**
983  * Peer has been idle for too long. Disconnect.
984  *
985  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
986  * @param tc scheduler context
987  */
988 static void
989 neighbour_timeout_task (void *cls,
990                         const struct GNUNET_SCHEDULER_TaskContext *tc)
991 {
992   struct NeighbourMapEntry *n = cls;
993
994   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
995
996   GNUNET_STATISTICS_update (GST_stats,
997                             gettext_noop
998                             ("# peers disconnected due to timeout"), 1,
999                             GNUNET_NO);
1000   disconnect_neighbour (n);
1001 }
1002
1003
1004 /**
1005  * Send another keepalive message.
1006  *
1007  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1008  * @param tc scheduler context
1009  */
1010 static void
1011 neighbour_keepalive_task (void *cls,
1012                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1013 {
1014   struct NeighbourMapEntry *n = cls;
1015   struct GNUNET_MessageHeader m;
1016   int ret;
1017
1018   n->keepalive_task =
1019       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1020                                     &neighbour_keepalive_task, n);
1021
1022   GNUNET_assert (S_CONNECTED == n->state);
1023   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1024                             GNUNET_NO);
1025   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1026   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1027
1028
1029   ret = send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1030                     UINT32_MAX /* priority */ ,
1031                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address, 
1032                     GNUNET_YES, NULL, NULL);
1033
1034   n->expect_latency_response = GNUNET_NO;
1035   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero();
1036   if (ret != GNUNET_SYSERR)
1037   {
1038     n->expect_latency_response = GNUNET_YES;
1039     n->keep_alive_sent = GNUNET_TIME_absolute_get();
1040   }
1041
1042 }
1043
1044
1045 /**
1046  * Disconnect from the given neighbour.
1047  *
1048  * @param cls unused
1049  * @param key hash of neighbour's public key (not used)
1050  * @param value the 'struct NeighbourMapEntry' of the neighbour
1051  */
1052 static int
1053 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1054 {
1055   struct NeighbourMapEntry *n = value;
1056
1057 #if DEBUG_TRANSPORT
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1059               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1060 #endif
1061   if (S_CONNECTED == n->state)
1062     GNUNET_STATISTICS_update (GST_stats,
1063                               gettext_noop
1064                               ("# peers disconnected due to global disconnect"),
1065                               1, GNUNET_NO);
1066   disconnect_neighbour (n);
1067   return GNUNET_OK;
1068 }
1069
1070
1071 static void
1072 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1073 {
1074   struct NeighbourMapEntry *n = cls;
1075
1076   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1077
1078   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1079               " ATS did not suggested address to connect to peer `%s'\n",
1080               GNUNET_i2s (&n->id));
1081
1082   disconnect_neighbour (n);
1083 }
1084
1085 /**
1086  * Cleanup the neighbours subsystem.
1087  */
1088 void
1089 GST_neighbours_stop ()
1090 {
1091   // This can happen during shutdown
1092   if (neighbours == NULL)
1093   {
1094     return;
1095   }
1096
1097   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1098                                          NULL);
1099   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1100 //  GNUNET_assert (neighbours_connected == 0);
1101   neighbours = NULL;
1102   callback_cls = NULL;
1103   connect_notify_cb = NULL;
1104   disconnect_notify_cb = NULL;
1105 }
1106
1107 struct ContinutionContext
1108 {
1109   struct GNUNET_HELLO_Address *address;
1110
1111   struct Session *session;
1112 };
1113
1114 static void send_outbound_quota (const struct GNUNET_PeerIdentity *target, struct GNUNET_BANDWIDTH_Value32NBO quota)
1115 {
1116    struct QuotaSetMessage q_msg;
1117 #if DEBUG_TRANSPORT
1118     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1119                 "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1120                 ntohl (quota.value__), GNUNET_i2s (target));
1121 #endif
1122     q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1123     q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1124     q_msg.quota = quota;
1125     q_msg.peer = (*target);
1126     GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1127 }
1128
1129 /**
1130  * We tried to send a SESSION_CONNECT message to another peer.  If this
1131  * succeeded, we change the state.  If it failed, we should tell
1132  * ATS to not use this address anymore (until it is re-validated).
1133  *
1134  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1135  * @param success GNUNET_OK on success
1136  */
1137 static void
1138 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1139                            int success)
1140 {
1141   struct ContinutionContext * cc = cls;
1142   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1143   
1144   if (GNUNET_YES != success)
1145   {
1146     GNUNET_assert (strlen(cc->address->transport_name) > 0);
1147     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1148   }
1149   if ( (NULL == neighbours) ||
1150        (NULL == n) ||
1151        (n->state == S_DISCONNECT))
1152   {
1153     GNUNET_HELLO_address_free (cc->address);
1154     GNUNET_free (cc);
1155     return;
1156   }
1157
1158   if ((GNUNET_YES == success) &&
1159      ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1160   {
1161     change_state (n, S_CONNECT_SENT);
1162     GNUNET_HELLO_address_free (cc->address);
1163     GNUNET_free (cc);
1164     return;
1165   }
1166
1167   if ((GNUNET_NO == success) &&
1168      ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1169   {
1170 #if DEBUG_TRANSPORT
1171     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1172                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1173                 GNUNET_i2s (&n->id),
1174                 GST_plugins_a2s (n->address),
1175                 n->session);
1176 #endif
1177     change_state (n, S_NOT_CONNECTED);
1178     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1179       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1180     n->ats_suggest =
1181       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1182                                     n);
1183     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1184   }
1185   GNUNET_HELLO_address_free (cc->address);
1186   GNUNET_free (cc);
1187 }
1188
1189 /**
1190  * We tried to switch addresses with an peer already connected. If it failed,
1191  * we should tell ATS to not use this address anymore (until it is re-validated).
1192  *
1193  * @param cls the 'struct NeighbourMapEntry'
1194  * @param success GNUNET_OK on success
1195  */
1196 static void
1197 send_switch_address_continuation (void *cls,
1198                                   const struct GNUNET_PeerIdentity *target,
1199                                   int success)
1200 {
1201   struct ContinutionContext * cc = cls;
1202   struct NeighbourMapEntry *n;
1203
1204   if (neighbours == NULL)
1205   {
1206     GNUNET_HELLO_address_free (cc->address);
1207     GNUNET_free (cc);
1208     return;                     /* neighbour is going away */
1209   }
1210
1211   n = lookup_neighbour(&cc->address->peer);
1212   if ((n == NULL) || (is_disconnecting (n)))
1213   {
1214     GNUNET_HELLO_address_free (cc->address);
1215     GNUNET_free (cc);
1216     return;                     /* neighbour is going away */
1217   }
1218
1219   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1220   if (GNUNET_YES != success)
1221   {
1222 #if DEBUG_TRANSPORT
1223     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224                 "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n",
1225                 GNUNET_i2s (&n->id), 
1226                 GST_plugins_a2s (n->address), n->session);
1227 #endif
1228     GNUNET_assert (strlen(cc->address->transport_name) > 0);
1229     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1230
1231     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1232       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1233     n->ats_suggest =
1234         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1235                                       n);
1236     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1237     GNUNET_HELLO_address_free (cc->address);
1238     GNUNET_free (cc);
1239     return;
1240   }
1241   /* Tell ATS that switching addresses was successful */
1242   switch (n->state) {
1243     case S_CONNECTED:
1244       if (n->address_state == FRESH)
1245       {
1246           GST_validation_set_address_use (&n->id,
1247                     cc->address,
1248                     cc->session,
1249                     GNUNET_YES);
1250           GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1251           GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1252           n->address_state = USED;
1253       }
1254       break;
1255     case S_FAST_RECONNECT:
1256 #if DEBUG_TRANSPORT
1257       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258                   "Successful fast reconnect to peer `%s'\n", GNUNET_i2s (&n->id));
1259 #endif
1260       change_state (n, S_CONNECTED);
1261       neighbours_connected++;
1262       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1263                                 GNUNET_NO);
1264
1265       if (n->address_state == FRESH)
1266       {
1267           GST_validation_set_address_use (&n->id,
1268                     cc->address,
1269                     cc->session,
1270                     GNUNET_YES);
1271           GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1272           GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1273           n->address_state = USED;
1274       }
1275
1276       if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1277             n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1278
1279       /* Updating quotas */
1280       GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1281       send_outbound_quota(target, n->bandwidth_out);
1282
1283     default:
1284       break;
1285   }
1286   GNUNET_HELLO_address_free (cc->address);
1287   GNUNET_free (cc);
1288 }
1289
1290 /**
1291  * We tried to send a SESSION_CONNECT message to another peer.  If this
1292  * succeeded, we change the state.  If it failed, we should tell
1293  * ATS to not use this address anymore (until it is re-validated).
1294  *
1295  * @param cls the 'struct NeighbourMapEntry'
1296  * @param success GNUNET_OK on success
1297  */
1298 static void
1299 send_connect_ack_continuation (void *cls,
1300                                const struct GNUNET_PeerIdentity *target,
1301                                int success)
1302 {
1303   struct ContinutionContext * cc = cls;
1304   struct NeighbourMapEntry *n;
1305
1306   if (neighbours == NULL)
1307   {
1308     GNUNET_HELLO_address_free (cc->address);
1309     GNUNET_free (cc);
1310     return;                     /* neighbour is going away */
1311   }
1312
1313   n = lookup_neighbour(&cc->address->peer);
1314   if ((n == NULL) || (is_disconnecting (n)))
1315   {
1316     GNUNET_HELLO_address_free (cc->address);
1317     GNUNET_free (cc);
1318     return;                     /* neighbour is going away */
1319   }
1320
1321   if (GNUNET_YES == success)
1322   {
1323     GNUNET_HELLO_address_free (cc->address);
1324     GNUNET_free (cc);
1325     return;                     /* sending successful */
1326   }
1327
1328   /* sending failed, ask for next address  */
1329 #if DEBUG_TRANSPORT
1330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1332               GNUNET_i2s (&n->id), 
1333               GST_plugins_a2s (n->address),
1334               n->session);
1335 #endif
1336   change_state (n, S_NOT_CONNECTED);
1337   GNUNET_assert (strlen(cc->address->transport_name) > 0);
1338   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1339
1340   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1341     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1342   n->ats_suggest =
1343       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1344                                     n);
1345   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1346   GNUNET_HELLO_address_free (cc->address);
1347   GNUNET_free (cc);
1348 }
1349
1350 /**
1351  * For an existing neighbour record, set the active connection to
1352  * the given address.
1353  *
1354  * @param peer identity of the peer to switch the address for
1355  * @param address address of the other peer, NULL if other peer
1356  *                       connected to us
1357  * @param session session to use (or NULL)
1358  * @param ats performance data
1359  * @param ats_count number of entries in ats
1360  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1361  *         connection is not up (yet)
1362  */
1363 int
1364 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1365                                        const struct GNUNET_HELLO_Address *address,
1366                                        struct Session *session,
1367                                        const struct GNUNET_ATS_Information *ats,
1368                                        uint32_t ats_count,
1369                                        struct GNUNET_BANDWIDTH_Value32NBO
1370                                        bandwidth_in,
1371                                        struct GNUNET_BANDWIDTH_Value32NBO
1372                                        bandwidth_out)
1373 {
1374   struct NeighbourMapEntry *n;
1375   struct SessionConnectMessage connect_msg;
1376   struct ContinutionContext * cc;
1377   size_t msg_len;
1378   size_t ret;
1379
1380   if (neighbours == NULL)
1381   {
1382     /* This can happen during shutdown */
1383     return GNUNET_NO;
1384   }
1385   n = lookup_neighbour (peer);
1386   if (NULL == n)
1387     return GNUNET_NO;
1388   if (n->state == S_DISCONNECT)
1389   {
1390     /* We are disconnecting, nothing to do here */
1391     return GNUNET_NO;
1392   }
1393   GNUNET_assert (address->transport_name != NULL);
1394   if ( (session == NULL) && (0 == address->address_length) )
1395   {
1396     GNUNET_break_op (0);
1397     /* FIXME: is this actually possible? When does this happen? */
1398     if (strlen(address->transport_name) > 0)
1399       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1400     GNUNET_ATS_suggest_address (GST_ats, peer);
1401     return GNUNET_NO;
1402   }
1403
1404   /* checks successful and neighbour != NULL */
1405 #if DEBUG_TRANSPORT
1406   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1407               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1408               GST_plugins_a2s (address),
1409               session,
1410               GNUNET_i2s (peer),
1411               print_state(n->state));
1412 #endif
1413   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1414   {
1415     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1416     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1417   }
1418   /* do not switch addresses just update quotas */
1419   if ( (n->state == S_CONNECTED) && 
1420        (NULL != n->address) &&
1421        (0 == GNUNET_HELLO_address_cmp (address,
1422                                        n->address)) &&
1423        (n->session == session) )
1424   {
1425     n->bandwidth_in = bandwidth_in;
1426     n->bandwidth_out = bandwidth_out;
1427     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1428     send_outbound_quota(peer, n->bandwidth_out);
1429     return GNUNET_NO;    
1430   }
1431   if (n->state == S_CONNECTED)
1432   {
1433     /* mark old address as no longer used */
1434     GNUNET_assert (NULL != n->address);
1435     if (n->address_state == USED)
1436     {
1437       GST_validation_set_address_use (&n->id,
1438                                       n->address,
1439                                       n->session,
1440                                       GNUNET_NO);
1441       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1442       n->address_state = UNUSED;
1443     }
1444
1445   }
1446
1447   /* set new address */
1448   if (NULL != n->address)
1449     GNUNET_HELLO_address_free (n->address);
1450   n->address = GNUNET_HELLO_address_copy (address);
1451   n->address_state = FRESH;
1452   n->session = session;
1453   n->bandwidth_in = bandwidth_in;
1454   n->bandwidth_out = bandwidth_out;
1455   GNUNET_SCHEDULER_cancel (n->timeout_task);
1456   n->timeout_task =
1457       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1458                                     &neighbour_timeout_task, n);
1459   switch (n->state)
1460   {
1461   case S_NOT_CONNECTED:  
1462   case S_CONNECT_SENT:
1463     msg_len = sizeof (struct SessionConnectMessage);
1464     connect_msg.header.size = htons (msg_len);
1465     connect_msg.header.type =
1466         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1467     connect_msg.reserved = htonl (0);
1468     connect_msg.timestamp =
1469         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1470
1471     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1472     cc->session = session;
1473     cc->address = GNUNET_HELLO_address_copy (address);
1474     ret =
1475         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1476                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1477                           address, GNUNET_YES,
1478                           &send_connect_continuation, 
1479                           cc);
1480     return GNUNET_NO;
1481   case S_CONNECT_RECV:
1482     /* We received a CONNECT message and asked ATS for an address */
1483     msg_len = sizeof (struct SessionConnectMessage);
1484     connect_msg.header.size = htons (msg_len);
1485     connect_msg.header.type =
1486         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1487     connect_msg.reserved = htonl (0);
1488     connect_msg.timestamp =
1489         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1490     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1491     cc->session = session;
1492     cc->address = GNUNET_HELLO_address_copy (address);
1493     ret =
1494         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1495                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1496                           address, GNUNET_YES,
1497                           &send_connect_ack_continuation, cc);
1498     return GNUNET_NO;
1499   case S_CONNECTED:
1500   case S_FAST_RECONNECT:
1501     /* connected peer is switching addresses or tries fast reconnect*/
1502     msg_len = sizeof (struct SessionConnectMessage);
1503     connect_msg.header.size = htons (msg_len);
1504     connect_msg.header.type =
1505         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1506     connect_msg.reserved = htonl (0);
1507     connect_msg.timestamp =
1508         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1509     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1510     cc->session = session;
1511     cc->address = GNUNET_HELLO_address_copy (address);
1512     ret =
1513         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1514                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1515                           address, GNUNET_YES,
1516                           &send_switch_address_continuation, cc);
1517     if (ret == GNUNET_SYSERR)
1518     {
1519       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1520                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1521                   GNUNET_i2s (peer), 
1522                   GST_plugins_a2s (address), session);
1523     }
1524     return GNUNET_NO;
1525   default:
1526     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1527                 "Invalid connection state to switch addresses %u \n", n->state);
1528     GNUNET_break_op (0);
1529     return GNUNET_NO;
1530   }
1531 }
1532
1533
1534 /**
1535  * Obtain current latency information for the given neighbour.
1536  *
1537  * @param peer 
1538  * @return observed latency of the address, FOREVER if the address was
1539  *         never successfully validated
1540  */
1541 struct GNUNET_TIME_Relative
1542 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1543 {
1544   struct NeighbourMapEntry *n;
1545
1546   n = lookup_neighbour (peer);
1547   if ( (NULL == n) ||
1548        ( (n->address == NULL) && (n->session == NULL) ) )
1549     return GNUNET_TIME_UNIT_FOREVER_REL;
1550
1551   return n->latency;
1552 }
1553
1554
1555 /**
1556  * Create an entry in the neighbour map for the given peer
1557  *
1558  * @param peer peer to create an entry for
1559  * @return new neighbour map entry
1560  */
1561 static struct NeighbourMapEntry *
1562 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1563 {
1564   struct NeighbourMapEntry *n;
1565
1566 #if DEBUG_TRANSPORT
1567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1568               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1569 #endif
1570   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1571   n->id = *peer;
1572   n->state = S_NOT_CONNECTED;
1573   n->latency = GNUNET_TIME_relative_get_forever();
1574   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1575                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1576                                  MAX_BANDWIDTH_CARRY_S);
1577   n->timeout_task =
1578       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1579                                     &neighbour_timeout_task, n);
1580   GNUNET_assert (GNUNET_OK ==
1581                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1582                                                     &n->id.hashPubKey, n,
1583                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1584   return n;
1585 }
1586
1587
1588 /**
1589  * Try to create a connection to the given target (eventually).
1590  *
1591  * @param target peer to try to connect to
1592  */
1593 void
1594 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1595 {
1596   struct NeighbourMapEntry *n;
1597
1598   // This can happen during shutdown
1599   if (neighbours == NULL)
1600   {
1601     return;
1602   }
1603 #if DEBUG_TRANSPORT
1604   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1605               GNUNET_i2s (target));
1606 #endif
1607   if (0 ==
1608       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1609   {
1610     /* my own hello */
1611     return;
1612   }
1613   n = lookup_neighbour (target);
1614
1615   if (NULL != n)
1616   {
1617     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1618       return;                   /* already connecting or connected */
1619     if (is_disconnecting (n))
1620       change_state (n, S_NOT_CONNECTED);
1621   }
1622
1623
1624   if (n == NULL)
1625     n = setup_neighbour (target);
1626 #if DEBUG_TRANSPORT
1627   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1628               "Asking ATS for suggested address to connect to peer `%s'\n",
1629               GNUNET_i2s (&n->id));
1630 #endif
1631
1632   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1633 }
1634
1635 /**
1636  * Test if we're connected to the given peer.
1637  *
1638  * @param target peer to test
1639  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1640  */
1641 int
1642 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1643 {
1644   struct NeighbourMapEntry *n;
1645
1646   // This can happen during shutdown
1647   if (neighbours == NULL)
1648   {
1649     return GNUNET_NO;
1650   }
1651
1652   n = lookup_neighbour (target);
1653
1654   if ((NULL == n) || (S_CONNECTED != n->state))
1655     return GNUNET_NO;           /* not connected */
1656   return GNUNET_YES;
1657 }
1658
1659 /**
1660  * A session was terminated. Take note.
1661  *
1662  * @param peer identity of the peer where the session died
1663  * @param session session that is gone
1664  */
1665 void
1666 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1667                                    struct Session *session)
1668 {
1669   struct NeighbourMapEntry *n;
1670
1671   if (neighbours == NULL)
1672   {
1673     /* This can happen during shutdown */
1674     return;
1675   }
1676
1677 #if DEBUG_TRANSPORT
1678   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Session %X to peer `%s' ended \n",
1679               session, GNUNET_i2s (peer));
1680 #endif
1681
1682   n = lookup_neighbour (peer);
1683   if (NULL == n)
1684     return;
1685   if (session != n->session)
1686     return;                     /* doesn't affect us */
1687   if (n->state == S_CONNECTED)
1688   {
1689     if (n->address_state == USED)
1690     {
1691       GST_validation_set_address_use (&n->id,
1692                                       n->address,
1693                                       n->session,
1694                                       GNUNET_NO);
1695       GNUNET_ATS_address_in_use (GST_ats,n->address, n->session, GNUNET_NO);
1696       n->address_state = UNUSED;
1697     }
1698   }
1699
1700   if (NULL != n->address)
1701   {
1702     GNUNET_HELLO_address_free (n->address);
1703     n->address = NULL;
1704   }
1705   n->session = NULL;
1706   
1707   /* not connected anymore anyway, shouldn't matter */
1708   if (S_CONNECTED != n->state)
1709     return;
1710
1711   /* connected, try fast reconnect */
1712 #if DEBUG_TRANSPORT
1713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1714               "Trying fast reconnect to peer `%s'\n", GNUNET_i2s (peer));
1715 #endif
1716   change_state (n, S_FAST_RECONNECT);
1717   GNUNET_assert (neighbours_connected > 0);
1718   neighbours_connected--;
1719
1720   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1721   {
1722     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1723     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1724   }
1725
1726   /* We are connected, so ask ATS to switch addresses */
1727   GNUNET_SCHEDULER_cancel (n->timeout_task);
1728   n->timeout_task =
1729       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1730                                     &neighbour_timeout_task, n);
1731   /* try QUICKLY to re-establish a connection, reduce timeout! */
1732   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1733     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1734   n->ats_suggest =
1735       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1736                                     n);
1737   GNUNET_ATS_suggest_address (GST_ats, peer);
1738 }
1739
1740
1741 /**
1742  * Transmit a message to the given target using the active connection.
1743  *
1744  * @param target destination
1745  * @param msg message to send
1746  * @param msg_size number of bytes in msg
1747  * @param timeout when to fail with timeout
1748  * @param cont function to call when done
1749  * @param cont_cls closure for 'cont'
1750  */
1751 void
1752 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1753                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1754                      GST_NeighbourSendContinuation cont, void *cont_cls)
1755 {
1756   struct NeighbourMapEntry *n;
1757   struct MessageQueue *mq;
1758
1759   // This can happen during shutdown
1760   if (neighbours == NULL)
1761   {
1762     return;
1763   }
1764
1765   n = lookup_neighbour (target);
1766   if ((n == NULL) || (!is_connected (n)))
1767   {
1768     GNUNET_STATISTICS_update (GST_stats,
1769                               gettext_noop
1770                               ("# messages not sent (no such peer or not connected)"),
1771                               1, GNUNET_NO);
1772 #if DEBUG_TRANSPORT
1773     if (n == NULL)
1774       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1775                   "Could not send message to peer `%s': unknown neighbour",
1776                   GNUNET_i2s (target));
1777     else if (!is_connected (n))
1778       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1779                   "Could not send message to peer `%s': not connected\n",
1780                   GNUNET_i2s (target));
1781 #endif
1782     if (NULL != cont)
1783       cont (cont_cls, GNUNET_SYSERR);
1784     return;
1785   }
1786
1787   if ((n->session == NULL) && (n->address == NULL) )
1788   {
1789     GNUNET_STATISTICS_update (GST_stats,
1790                               gettext_noop
1791                               ("# messages not sent (no such peer or not connected)"),
1792                               1, GNUNET_NO);
1793 #if DEBUG_TRANSPORT
1794     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1795                 "Could not send message to peer `%s': no address available\n",
1796                 GNUNET_i2s (target));
1797 #endif
1798
1799     if (NULL != cont)
1800       cont (cont_cls, GNUNET_SYSERR);
1801     return;
1802   }
1803
1804   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1805   GNUNET_STATISTICS_update (GST_stats,
1806                             gettext_noop
1807                             ("# bytes in message queue for other peers"),
1808                             msg_size, GNUNET_NO);
1809   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1810   mq->cont = cont;
1811   mq->cont_cls = cont_cls;
1812   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1813   memcpy (&mq[1], msg, msg_size);
1814   mq->message_buf = (const char *) &mq[1];
1815   mq->message_buf_size = msg_size;
1816   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1817   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1818
1819   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1820       (NULL == n->is_active))
1821     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1822 }
1823
1824
1825 /**
1826  * We have received a message from the given sender.  How long should
1827  * we delay before receiving more?  (Also used to keep the peer marked
1828  * as live).
1829  *
1830  * @param sender sender of the message
1831  * @param size size of the message
1832  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1833  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1834  *                   GNUNET_SYSERR if the connection is not fully up yet
1835  * @return how long to wait before reading more from this sender
1836  */
1837 struct GNUNET_TIME_Relative
1838 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1839                                         *sender, ssize_t size, int *do_forward)
1840 {
1841   struct NeighbourMapEntry *n;
1842   struct GNUNET_TIME_Relative ret;
1843
1844   // This can happen during shutdown
1845   if (neighbours == NULL)
1846   {
1847     return GNUNET_TIME_UNIT_FOREVER_REL;
1848   }
1849
1850   n = lookup_neighbour (sender);
1851   if (n == NULL)
1852   {
1853     GST_neighbours_try_connect (sender);
1854     n = lookup_neighbour (sender);
1855     if (NULL == n)
1856     {
1857       GNUNET_STATISTICS_update (GST_stats,
1858                                 gettext_noop
1859                                 ("# messages discarded due to lack of neighbour record"),
1860                                 1, GNUNET_NO);
1861       *do_forward = GNUNET_NO;
1862       return GNUNET_TIME_UNIT_ZERO;
1863     }
1864   }
1865   if (!is_connected (n))
1866   {
1867     *do_forward = GNUNET_SYSERR;
1868     return GNUNET_TIME_UNIT_ZERO;
1869   }
1870   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1871   {
1872     n->quota_violation_count++;
1873 #if DEBUG_TRANSPORT
1874     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1875                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1876                 n->in_tracker.available_bytes_per_s__,
1877                 n->quota_violation_count);
1878 #endif
1879     /* Discount 32k per violation */
1880     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1881   }
1882   else
1883   {
1884     if (n->quota_violation_count > 0)
1885     {
1886       /* try to add 32k back */
1887       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1888       n->quota_violation_count--;
1889     }
1890   }
1891   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1892   {
1893     GNUNET_STATISTICS_update (GST_stats,
1894                               gettext_noop
1895                               ("# bandwidth quota violations by other peers"),
1896                               1, GNUNET_NO);
1897     *do_forward = GNUNET_NO;
1898     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1899   }
1900   *do_forward = GNUNET_YES;
1901   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1902   if (ret.rel_value > 0)
1903   {
1904 #if DEBUG_TRANSPORT
1905     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1906                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1907                 (unsigned long long) n->in_tracker.
1908                 consumption_since_last_update__,
1909                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1910                 (unsigned long long) ret.rel_value);
1911 #endif
1912     GNUNET_STATISTICS_update (GST_stats,
1913                               gettext_noop ("# ms throttling suggested"),
1914                               (int64_t) ret.rel_value, GNUNET_NO);
1915   }
1916   return ret;
1917 }
1918
1919
1920 /**
1921  * Keep the connection to the given neighbour alive longer,
1922  * we received a KEEPALIVE (or equivalent).
1923  *
1924  * @param neighbour neighbour to keep alive
1925  */
1926 void
1927 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1928 {
1929   struct NeighbourMapEntry *n;
1930
1931   // This can happen during shutdown
1932   if (neighbours == NULL)
1933   {
1934     return;
1935   }
1936
1937   n = lookup_neighbour (neighbour);
1938   if (NULL == n)
1939   {
1940     GNUNET_STATISTICS_update (GST_stats,
1941                               gettext_noop
1942                               ("# KEEPALIVE messages discarded (not connected)"),
1943                               1, GNUNET_NO);
1944     return;
1945   }
1946   GNUNET_SCHEDULER_cancel (n->timeout_task);
1947   n->timeout_task =
1948       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1949                                     &neighbour_timeout_task, n);
1950
1951   /* send reply to measure latency */
1952   if (S_CONNECTED != n->state)
1953     return;
1954
1955   struct GNUNET_MessageHeader m;
1956   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1957   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
1958
1959   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1960                     UINT32_MAX /* priority */ ,
1961                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1962                     GNUNET_YES, NULL, NULL);
1963 }
1964
1965 /**
1966  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
1967  * to this peer
1968  *
1969  * @param neighbour neighbour to keep alive
1970  */
1971 void
1972 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
1973                                    const struct GNUNET_ATS_Information * ats,
1974                                    uint32_t ats_count)
1975 {
1976   struct NeighbourMapEntry *n;
1977   struct GNUNET_ATS_Information * ats_new;
1978   uint32_t latency;
1979
1980   if (neighbours == NULL)
1981   {
1982     // This can happen during shutdown
1983     return;
1984   }
1985
1986   n = lookup_neighbour (neighbour);
1987   if (NULL == n)
1988   {
1989     GNUNET_STATISTICS_update (GST_stats,
1990                               gettext_noop
1991                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
1992                               1, GNUNET_NO);
1993     return;
1994   }
1995   if (n->expect_latency_response != GNUNET_YES)
1996   {
1997     GNUNET_STATISTICS_update (GST_stats,
1998                               gettext_noop
1999                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2000                               1, GNUNET_NO);
2001     return;
2002   }
2003   n->expect_latency_response = GNUNET_NO;
2004
2005   GNUNET_assert (n->keep_alive_sent.abs_value != GNUNET_TIME_absolute_get_zero().abs_value);
2006   n->latency = GNUNET_TIME_absolute_get_difference(n->keep_alive_sent, GNUNET_TIME_absolute_get());
2007 #if DEBUG_TRANSPORT
2008   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2009               "Latency for peer `%s' is %llu ms\n",
2010               GNUNET_i2s (&n->id), n->latency.rel_value);
2011 #endif
2012
2013
2014   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever().rel_value)
2015   {
2016     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2017   }
2018   else
2019   {
2020     ats_new = GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2021     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2022
2023     /* add latency */
2024     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2025     if (n->latency.rel_value > UINT32_MAX)
2026       latency = UINT32_MAX;
2027     else
2028       latency = n->latency.rel_value;
2029     ats_new[ats_count].value = htonl (latency);
2030
2031     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new, ats_count + 1);
2032     GNUNET_free (ats_new);
2033   }
2034 }
2035
2036
2037 /**
2038  * Change the incoming quota for the given peer.
2039  *
2040  * @param neighbour identity of peer to change qutoa for
2041  * @param quota new quota
2042  */
2043 void
2044 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2045                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2046 {
2047   struct NeighbourMapEntry *n;
2048
2049   // This can happen during shutdown
2050   if (neighbours == NULL)
2051   {
2052     return;
2053   }
2054
2055   n = lookup_neighbour (neighbour);
2056   if (n == NULL)
2057   {
2058     GNUNET_STATISTICS_update (GST_stats,
2059                               gettext_noop
2060                               ("# SET QUOTA messages ignored (no such peer)"),
2061                               1, GNUNET_NO);
2062     return;
2063   }
2064 #if DEBUG_TRANSPORT
2065   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2066               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2067               ntohl (quota.value__), GNUNET_i2s (&n->id));
2068 #endif
2069   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2070   if (0 != ntohl (quota.value__))
2071     return;
2072 #if DEBUG_TRANSPORT
2073   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2074               GNUNET_i2s (&n->id), "SET_QUOTA");
2075 #endif
2076   if (is_connected (n))
2077     GNUNET_STATISTICS_update (GST_stats,
2078                               gettext_noop ("# disconnects due to quota of 0"),
2079                               1, GNUNET_NO);
2080   disconnect_neighbour (n);
2081 }
2082
2083
2084 /**
2085  * Closure for the neighbours_iterate function.
2086  */
2087 struct IteratorContext
2088 {
2089   /**
2090    * Function to call on each connected neighbour.
2091    */
2092   GST_NeighbourIterator cb;
2093
2094   /**
2095    * Closure for 'cb'.
2096    */
2097   void *cb_cls;
2098 };
2099
2100
2101 /**
2102  * Call the callback from the closure for each connected neighbour.
2103  *
2104  * @param cls the 'struct IteratorContext'
2105  * @param key the hash of the public key of the neighbour
2106  * @param value the 'struct NeighbourMapEntry'
2107  * @return GNUNET_OK (continue to iterate)
2108  */
2109 static int
2110 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2111 {
2112   struct IteratorContext *ic = cls;
2113   struct NeighbourMapEntry *n = value;
2114
2115   if (!is_connected (n))
2116     return GNUNET_OK;
2117
2118   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2119   return GNUNET_OK;
2120 }
2121
2122
2123 /**
2124  * Iterate over all connected neighbours.
2125  *
2126  * @param cb function to call
2127  * @param cb_cls closure for cb
2128  */
2129 void
2130 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2131 {
2132   struct IteratorContext ic;
2133
2134   // This can happen during shutdown
2135   if (neighbours == NULL)
2136   {
2137     return;
2138   }
2139
2140   ic.cb = cb;
2141   ic.cb_cls = cb_cls;
2142   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2143 }
2144
2145 /**
2146  * If we have an active connection to the given target, it must be shutdown.
2147  *
2148  * @param target peer to disconnect from
2149  */
2150 void
2151 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2152 {
2153   struct NeighbourMapEntry *n;
2154
2155   // This can happen during shutdown
2156   if (neighbours == NULL)
2157   {
2158     return;
2159   }
2160
2161   n = lookup_neighbour (target);
2162   if (NULL == n)
2163     return;                     /* not active */
2164   disconnect_neighbour (n);
2165 }
2166
2167
2168 /**
2169  * We received a disconnect message from the given peer,
2170  * validate and process.
2171  *
2172  * @param peer sender of the message
2173  * @param msg the disconnect message
2174  */
2175 void
2176 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2177                                           *peer,
2178                                           const struct GNUNET_MessageHeader
2179                                           *msg)
2180 {
2181   struct NeighbourMapEntry *n;
2182   const struct SessionDisconnectMessage *sdm;
2183   GNUNET_HashCode hc;
2184
2185 #if DEBUG_TRANSPORT
2186   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2187               "Received DISCONNECT message from peer `%s'\n",
2188               GNUNET_i2s (peer));
2189 #endif
2190
2191   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2192   {
2193     // GNUNET_break_op (0);
2194     GNUNET_STATISTICS_update (GST_stats,
2195                               gettext_noop
2196                               ("# disconnect messages ignored (old format)"), 1,
2197                               GNUNET_NO);
2198     return;
2199   }
2200   sdm = (const struct SessionDisconnectMessage *) msg;
2201   n = lookup_neighbour (peer);
2202   if (NULL == n)
2203     return;                     /* gone already */
2204   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2205       n->connect_ts.abs_value)
2206   {
2207     GNUNET_STATISTICS_update (GST_stats,
2208                               gettext_noop
2209                               ("# disconnect messages ignored (timestamp)"), 1,
2210                               GNUNET_NO);
2211     return;
2212   }
2213   GNUNET_CRYPTO_hash (&sdm->public_key,
2214                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2215                       &hc);
2216   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2217   {
2218     GNUNET_break_op (0);
2219     return;
2220   }
2221   if (ntohl (sdm->purpose.size) !=
2222       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2223       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2224       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2225   {
2226     GNUNET_break_op (0);
2227     return;
2228   }
2229   if (GNUNET_OK !=
2230       GNUNET_CRYPTO_rsa_verify
2231       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2232        &sdm->signature, &sdm->public_key))
2233   {
2234     GNUNET_break_op (0);
2235     return;
2236   }
2237   GST_neighbours_force_disconnect (peer);
2238 }
2239
2240
2241 /**
2242  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2243  * Consider switching to it.
2244  *
2245  * @param message possibly a 'struct SessionConnectMessage' (check format)
2246  * @param peer identity of the peer to switch the address for
2247  * @param address address of the other peer, NULL if other peer
2248  *                       connected to us
2249  * @param session session to use (or NULL)
2250  * @param ats performance data
2251  * @param ats_count number of entries in ats
2252  */
2253 void
2254 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2255                                    const struct GNUNET_PeerIdentity *peer,
2256                                    const struct GNUNET_HELLO_Address *address,
2257                                    struct Session *session,
2258                                    const struct GNUNET_ATS_Information *ats,
2259                                    uint32_t ats_count)
2260 {
2261   const struct SessionConnectMessage *scm;
2262   struct GNUNET_MessageHeader msg;
2263   struct NeighbourMapEntry *n;
2264   size_t msg_len;
2265   size_t ret;
2266
2267 #if DEBUG_TRANSPORT
2268   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2269               "Received CONNECT_ACK message from peer `%s'\n",
2270               GNUNET_i2s (peer));
2271 #endif
2272
2273   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2274   {
2275     GNUNET_break_op (0);
2276     return;
2277   }
2278   scm = (const struct SessionConnectMessage *) message;
2279   GNUNET_break_op (ntohl (scm->reserved) == 0);
2280   n = lookup_neighbour (peer);
2281   if (NULL == n)
2282   {
2283     /* we did not send 'CONNECT' (how could we? no record for this peer!) */
2284     GNUNET_break_op (0);
2285     return;
2286   }  
2287
2288   /* Additional check
2289    *
2290    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2291    *
2292    * We also received an CONNECT message, switched from SENDT to RECV and
2293    * ATS already suggested us an address after a successful blacklist check
2294    */
2295   if ((n->state != S_CONNECT_SENT) && ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2296   {
2297     GNUNET_STATISTICS_update (GST_stats,
2298                               gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
2299                               GNUNET_NO);
2300     return;
2301   }
2302
2303   change_state (n, S_CONNECTED);
2304
2305   if (NULL != session)
2306   {
2307     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2308                      "transport-ats",
2309                      "Giving ATS session %p of plugin %s for peer %s\n",
2310                      session, address->transport_name, GNUNET_i2s (peer));
2311   }
2312   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2313   GNUNET_assert (NULL != n->address);
2314   if (n->address_state == FRESH)
2315   {
2316     GST_validation_set_address_use (&n->id,
2317             n->address,
2318             n->session,
2319             GNUNET_YES);
2320     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2321     n->address_state = USED;
2322   }
2323
2324   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2325
2326   /* send ACK (ACK) */
2327   msg_len = sizeof (msg);
2328   msg.size = htons (msg_len);
2329   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2330
2331   ret =
2332       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2333                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2334                         n->address, GNUNET_YES, NULL,
2335                         NULL);
2336
2337   if (ret == GNUNET_SYSERR)
2338     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2339                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2340                 GNUNET_i2s (&n->id), 
2341                 GST_plugins_a2s (n->address), n->session);
2342
2343
2344   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2345     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2346   
2347   neighbours_connected++;
2348   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2349                             GNUNET_NO);
2350 #if DEBUG_TRANSPORT
2351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2352               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2353               GNUNET_i2s (&n->id),
2354               GST_plugins_a2s (n->address), n->session,
2355               __LINE__);
2356 #endif
2357   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2358   send_outbound_quota(peer, n->bandwidth_out);
2359
2360 }
2361
2362
2363 void
2364 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2365                            const struct GNUNET_PeerIdentity *peer,
2366                            const struct GNUNET_HELLO_Address *address,
2367                            struct Session *session,
2368                            const struct GNUNET_ATS_Information *ats,
2369                            uint32_t ats_count)
2370 {
2371   struct NeighbourMapEntry *n;
2372
2373 #if DEBUG_TRANSPORT
2374   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2375               "Received ACK message from peer `%s'\n",
2376               GNUNET_i2s (peer));
2377 #endif
2378
2379   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2380   {
2381     GNUNET_break_op (0);
2382     return;
2383   }
2384   n = lookup_neighbour (peer);
2385   if (NULL == n)
2386   {
2387     send_disconnect (peer, address,
2388                      session);
2389     GNUNET_break (0);
2390     return;
2391   }
2392   if (S_CONNECTED == n->state)
2393     return;
2394   if (!is_connecting(n))
2395   {
2396     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2397                               GNUNET_NO);
2398     return;
2399   }
2400   change_state (n, S_CONNECTED);
2401   if (NULL != session)
2402     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2403                      "transport-ats",
2404                      "Giving ATS session %p of plugin %s for peer %s\n",
2405                      session, address->transport_name, GNUNET_i2s (peer));
2406   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2407   GNUNET_assert (n->address != NULL);
2408   if (n->address_state == FRESH)
2409   {
2410     GST_validation_set_address_use (&n->id,
2411                                   n->address,
2412                                   n->session,
2413                                   GNUNET_YES);
2414     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2415     n->address_state = USED;
2416   }
2417
2418   neighbours_connected++;
2419   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2420                             GNUNET_NO);
2421   
2422   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2423   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2424         n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2425 #if DEBUG_TRANSPORT
2426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2427               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2428               GNUNET_i2s (&n->id),
2429               GST_plugins_a2s (n->address), n->session,
2430               __LINE__);
2431 #endif
2432   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2433   send_outbound_quota(peer, n->bandwidth_out);
2434 }
2435
2436 struct BlackListCheckContext
2437 {
2438   struct GNUNET_ATS_Information *ats;
2439
2440   uint32_t ats_count;
2441
2442   struct Session *session;
2443
2444   struct GNUNET_HELLO_Address *address;
2445
2446   struct GNUNET_TIME_Absolute ts;
2447 };
2448
2449
2450 static void
2451 handle_connect_blacklist_cont (void *cls,
2452                                const struct GNUNET_PeerIdentity *peer,
2453                                int result)
2454 {
2455   struct NeighbourMapEntry *n;
2456   struct BlackListCheckContext *bcc = cls;
2457
2458 #if DEBUG_TRANSPORT
2459   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2460               "Blacklist check due to CONNECT message: `%s'\n",
2461               GNUNET_i2s (peer),
2462               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2463 #endif
2464
2465   /* not allowed */
2466   if (GNUNET_OK != result)
2467   {
2468     GNUNET_HELLO_address_free (bcc->address);
2469     GNUNET_free (bcc);
2470     return;
2471   }
2472
2473   n = lookup_neighbour (peer);
2474   if (NULL == n)
2475     n = setup_neighbour (peer);
2476
2477   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2478   {
2479     if (NULL != bcc->session)
2480       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2481                        "transport-ats",
2482                        "Giving ATS session %p of address `%s' for peer %s\n",
2483                        bcc->session, 
2484                        GST_plugins_a2s (bcc->address),
2485                        GNUNET_i2s (peer));
2486     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2487
2488     GNUNET_ATS_address_update (GST_ats,
2489                                bcc->address,
2490                                bcc->session,
2491                                bcc->ats,
2492                                bcc->ats_count);
2493     n->connect_ts = bcc->ts;
2494   }
2495
2496   GNUNET_HELLO_address_free (bcc->address);
2497   GNUNET_free (bcc);
2498
2499   if (n->state != S_CONNECT_RECV)
2500     change_state (n, S_CONNECT_RECV);
2501
2502   /* Ask ATS for an address to connect via that address */
2503   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2504     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2505   n->ats_suggest =
2506       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2507                                     n);
2508   GNUNET_ATS_suggest_address (GST_ats, peer);
2509 }
2510
2511 /**
2512  * We received a 'SESSION_CONNECT' message from the other peer.
2513  * Consider switching to it.
2514  *
2515  * @param message possibly a 'struct SessionConnectMessage' (check format)
2516  * @param peer identity of the peer to switch the address for
2517  * @param address address of the other peer, NULL if other peer
2518  *                       connected to us
2519  * @param session session to use (or NULL)
2520  * @param ats performance data
2521  * @param ats_count number of entries in ats (excluding 0-termination)
2522  */
2523 void
2524 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2525                                const struct GNUNET_PeerIdentity *peer,
2526                                const struct GNUNET_HELLO_Address *address,
2527                                struct Session *session,
2528                                const struct GNUNET_ATS_Information *ats,
2529                                uint32_t ats_count)
2530 {
2531   const struct SessionConnectMessage *scm;
2532   struct NeighbourMapEntry *n;
2533   struct BlackListCheckContext *bcc = NULL;
2534
2535 #if DEBUG_TRANSPORT
2536   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2537               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2538 #endif
2539
2540   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2541   {
2542     GNUNET_break_op (0);
2543     return;
2544   }
2545
2546   scm = (const struct SessionConnectMessage *) message;
2547   GNUNET_break_op (ntohl (scm->reserved) == 0);
2548
2549   n = lookup_neighbour (peer);
2550   if ( (n != NULL) &&
2551        (S_CONNECTED == n->state) )
2552   {
2553     /* connected peer switches addresses */
2554     GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2555     return;
2556   }
2557
2558   /* we are not connected to this peer */
2559   /* do blacklist check */
2560   bcc =
2561       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2562                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2563   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2564   bcc->ats_count = ats_count + 1;
2565   bcc->address = GNUNET_HELLO_address_copy (address);
2566   bcc->session = session;
2567   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2568   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2569   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2570   bcc->ats[ats_count].value = htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2571   GST_blacklist_test_allowed (peer, address->transport_name, handle_connect_blacklist_cont,
2572                               bcc);
2573 }
2574
2575
2576 /* end of file gnunet-service-transport_neighbours.c */