(no commit message)
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
61
62 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
63
64 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
65
66
67 /**
68  * Entry in neighbours.
69  */
70 struct NeighbourMapEntry;
71
72 /**
73  * Message a peer sends to another to indicate its
74  * preference for communicating via a particular
75  * session (and the desire to establish a real
76  * connection).
77  */
78 struct SessionConnectMessage
79 {
80   /**
81    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
82    */
83   struct GNUNET_MessageHeader header;
84
85   /**
86    * Always zero.
87    */
88   uint32_t reserved GNUNET_PACKED;
89
90   /**
91    * Absolute time at the sender.  Only the most recent connect
92    * message implies which session is preferred by the sender.
93    */
94   struct GNUNET_TIME_AbsoluteNBO timestamp;
95
96 };
97
98
99 struct SessionDisconnectMessage
100 {
101   /**
102    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
103    */
104   struct GNUNET_MessageHeader header;
105
106   /**
107    * Always zero.
108    */
109   uint32_t reserved GNUNET_PACKED;
110
111   /**
112    * Purpose of the signature.  Extends over the timestamp.
113    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
114    */
115   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
116
117   /**
118    * Absolute time at the sender.  Only the most recent connect
119    * message implies which session is preferred by the sender.
120    */
121   struct GNUNET_TIME_AbsoluteNBO timestamp;
122
123   /**
124    * Public key of the sender.
125    */
126   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
127
128   /**
129    * Signature of the peer that sends us the disconnect.  Only
130    * valid if the timestamp is AFTER the timestamp from the
131    * corresponding 'CONNECT' message.
132    */
133   struct GNUNET_CRYPTO_RsaSignature signature;
134
135 };
136
137
138 /**
139  * For each neighbour we keep a list of messages
140  * that we still want to transmit to the neighbour.
141  */
142 struct MessageQueue
143 {
144
145   /**
146    * This is a doubly linked list.
147    */
148   struct MessageQueue *next;
149
150   /**
151    * This is a doubly linked list.
152    */
153   struct MessageQueue *prev;
154
155   /**
156    * Once this message is actively being transmitted, which
157    * neighbour is it associated with?
158    */
159   struct NeighbourMapEntry *n;
160
161   /**
162    * Function to call once we're done.
163    */
164   GST_NeighbourSendContinuation cont;
165
166   /**
167    * Closure for 'cont'
168    */
169   void *cont_cls;
170
171   /**
172    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
173    * stuck together in memory.  Allocated at the end of this struct.
174    */
175   const char *message_buf;
176
177   /**
178    * Size of the message buf
179    */
180   size_t message_buf_size;
181
182   /**
183    * At what time should we fail?
184    */
185   struct GNUNET_TIME_Absolute timeout;
186
187 };
188
189
190 enum State
191 {
192   /**
193    * fresh peer or completely disconnected 
194    */
195   S_NOT_CONNECTED,
196
197   /**
198    * sent CONNECT message to other peer, waiting for CONNECT_ACK 
199    */
200   S_CONNECT_SENT,
201
202   /**
203    * received CONNECT message to other peer, sending CONNECT_ACK 
204    */
205   S_CONNECT_RECV,
206
207   /**
208    * received ACK or payload 
209    */
210   S_CONNECTED,
211
212   /**
213    * connection ended, fast reconnect
214    */
215   S_FAST_RECONNECT,
216
217   /**
218    * Disconnect in progress 
219    */
220   S_DISCONNECT
221 };
222
223 enum Address_State
224 {
225   USED,
226   UNUSED,
227   FRESH,
228 };
229
230 /**
231  * Entry in neighbours.
232  */
233 struct NeighbourMapEntry
234 {
235
236   /**
237    * Head of list of messages we would like to send to this peer;
238    * must contain at most one message per client.
239    */
240   struct MessageQueue *messages_head;
241
242   /**
243    * Tail of list of messages we would like to send to this peer; must
244    * contain at most one message per client.
245    */
246   struct MessageQueue *messages_tail;
247
248   /**
249    * Performance data for the peer.
250    */
251   //struct GNUNET_ATS_Information *ats;
252
253   /**
254    * Are we currently trying to send a message? If so, which one?
255    */
256   struct MessageQueue *is_active;
257
258   /**
259    * Active session for communicating with the peer.
260    */
261   struct Session *session;
262
263   /**
264    * Address we currently use.
265    */
266   struct GNUNET_HELLO_Address *address;
267
268   /**
269    * Identity of this neighbour.
270    */
271   struct GNUNET_PeerIdentity id;
272
273   /**
274    * ID of task scheduled to run when this peer is about to
275    * time out (will free resources associated with the peer).
276    */
277   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
278
279   /**
280    * ID of task scheduled to send keepalives.
281    */
282   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
283
284   /**
285    * ID of task scheduled to run when we should try transmitting
286    * the head of the message queue.
287    */
288   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
289
290   /**
291    * Tracker for inbound bandwidth.
292    */
293   struct GNUNET_BANDWIDTH_Tracker in_tracker;
294
295   /**
296    * Inbound bandwidth from ATS, activated when connection is up
297    */
298   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
299
300   /**
301    * Inbound bandwidth from ATS, activated when connection is up
302    */
303   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
304
305   /**
306    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
307    */
308   struct GNUNET_TIME_Absolute connect_ts;
309
310   /**
311    * When did we sent the last keep-alive message?
312    */
313   struct GNUNET_TIME_Absolute keep_alive_sent;
314
315   /**
316    * Latest calculated latency value
317    */
318   struct GNUNET_TIME_Relative latency;
319
320   /**
321    * Timeout for ATS
322    * We asked ATS for a new address for this peer
323    */
324   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
325
326   /**
327    * Task the resets the peer state after due to an pending
328    * unsuccessful connection setup
329    */
330   GNUNET_SCHEDULER_TaskIdentifier state_reset;
331
332   /**
333    * How often has the other peer (recently) violated the inbound
334    * traffic limit?  Incremented by 10 per violation, decremented by 1
335    * per non-violation (for each time interval).
336    */
337   unsigned int quota_violation_count;
338
339
340   /**
341    * The current state of the peer
342    * Element of enum State
343    */
344   int state;
345
346   /**
347    * Did we sent an KEEP_ALIVE message and are we expecting a response?
348    */
349   int expect_latency_response;
350   int address_state;
351 };
352
353
354 /**
355  * All known neighbours and their HELLOs.
356  */
357 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
358
359 /**
360  * Closure for connect_notify_cb and disconnect_notify_cb
361  */
362 static void *callback_cls;
363
364 /**
365  * Function to call when we connected to a neighbour.
366  */
367 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
368
369 /**
370  * Function to call when we disconnected from a neighbour.
371  */
372 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
373
374 /**
375  * counter for connected neighbours
376  */
377 static int neighbours_connected;
378
379 /**
380  * Lookup a neighbour entry in the neighbours hash map.
381  *
382  * @param pid identity of the peer to look up
383  * @return the entry, NULL if there is no existing record
384  */
385 static struct NeighbourMapEntry *
386 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
387 {
388   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
389 }
390
391 #define change_state(n, state, ...) change (n, state, __LINE__)
392
393 static int
394 is_connecting (struct NeighbourMapEntry *n)
395 {
396   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
397     return GNUNET_YES;
398   return GNUNET_NO;
399 }
400
401 static int
402 is_connected (struct NeighbourMapEntry *n)
403 {
404   if (n->state == S_CONNECTED)
405     return GNUNET_YES;
406   return GNUNET_NO;
407 }
408
409 static int
410 is_disconnecting (struct NeighbourMapEntry *n)
411 {
412   if (n->state == S_DISCONNECT)
413     return GNUNET_YES;
414   return GNUNET_NO;
415 }
416
417 static const char *
418 print_state (int state)
419 {
420   switch (state)
421   {
422   case S_CONNECTED:
423     return "S_CONNECTED";
424     break;
425   case S_CONNECT_RECV:
426     return "S_CONNECT_RECV";
427     break;
428   case S_CONNECT_SENT:
429     return "S_CONNECT_SENT";
430     break;
431   case S_DISCONNECT:
432     return "S_DISCONNECT";
433     break;
434   case S_NOT_CONNECTED:
435     return "S_NOT_CONNECTED";
436     break;
437   case S_FAST_RECONNECT:
438     return "S_FAST_RECONNECT";
439     break;
440   default:
441     GNUNET_break (0);
442     break;
443   }
444   return NULL;
445 }
446
447 static int
448 change (struct NeighbourMapEntry *n, int state, int line);
449
450 static void
451 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
452
453 static void
454 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
455 {
456   struct NeighbourMapEntry *n = cls;
457
458   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
459
460 #if DEBUG_TRANSPORT
461 #endif
462
463   GNUNET_STATISTICS_update (GST_stats,
464                             gettext_noop
465                             ("# failed connection attempts due to timeout"), 1,
466                             GNUNET_NO);
467
468   /* resetting state */
469   n->state = S_NOT_CONNECTED;
470
471   /* destroying address */
472   GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
473
474   /* request new address */
475   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
476     GNUNET_SCHEDULER_cancel (n->ats_suggest);
477   n->ats_suggest =
478       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
479                                     n);
480   GNUNET_ATS_suggest_address (GST_ats, &n->id);
481 }
482
483 static int
484 change (struct NeighbourMapEntry *n, int state, int line)
485 {
486   /* allowed transitions */
487   int allowed = GNUNET_NO;
488
489   switch (n->state)
490   {
491   case S_NOT_CONNECTED:
492     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
493         (state == S_DISCONNECT))
494       allowed = GNUNET_YES;
495     break;
496   case S_CONNECT_RECV:
497     allowed = GNUNET_YES;
498     break;
499   case S_CONNECT_SENT:
500     allowed = GNUNET_YES;
501     break;
502   case S_CONNECTED:
503     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
504       allowed = GNUNET_YES;
505     break;
506   case S_DISCONNECT:
507     break;
508   case S_FAST_RECONNECT:
509     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
510       allowed = GNUNET_YES;
511     break;
512   default:
513     GNUNET_break (0);
514     break;
515   }
516   if (allowed == GNUNET_NO)
517   {
518     char *old = GNUNET_strdup (print_state (n->state));
519     char *new = GNUNET_strdup (print_state (state));
520     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
521                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
522                 new, line);
523     GNUNET_break (0);
524     GNUNET_free (old);
525     GNUNET_free (new);
526     return GNUNET_SYSERR;
527   }
528 #if DEBUG_TRANSPORT
529
530   {
531     char *old = GNUNET_strdup (print_state (n->state));
532     char *new = GNUNET_strdup (print_state (state));
533     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
534                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
535                 GNUNET_i2s (&n->id), n, old, new, line);
536     GNUNET_free (old);
537     GNUNET_free (new);
538   }
539 #endif
540   n->state = state;
541
542   switch (n->state)
543   {
544   case S_FAST_RECONNECT:
545   case S_CONNECT_RECV:
546   case S_CONNECT_SENT:
547     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
548       GNUNET_SCHEDULER_cancel (n->state_reset);
549     n->state_reset =
550       GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
551                                     n);   
552     break;
553   case S_CONNECTED:
554   case S_NOT_CONNECTED:
555   case S_DISCONNECT:
556     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
557     {
558 #if DEBUG_TRANSPORT
559       char *old = GNUNET_strdup (print_state (n->state));
560       char *new = GNUNET_strdup (print_state (state));
561       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
562                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
563                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
564                   old, new);
565       GNUNET_free (old);
566       GNUNET_free (new);
567 #endif
568       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
569       GNUNET_SCHEDULER_cancel (n->state_reset);
570       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
571     }
572     break;
573
574   default:
575     GNUNET_assert (0);
576   }
577   
578
579
580   return GNUNET_OK;
581 }
582
583 static ssize_t
584 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
585                   size_t msgbuf_size, uint32_t priority,
586                   struct GNUNET_TIME_Relative timeout, struct Session *session,
587                   const struct GNUNET_HELLO_Address *address,
588                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
589                   void *cont_cls)
590 {
591   struct GNUNET_TRANSPORT_PluginFunctions *papi;
592   size_t ret = GNUNET_SYSERR;
593
594   /* FIXME : ats returns an address with all values 0 */
595   if (address == NULL)
596   {
597     if (cont != NULL)
598       cont (cont_cls, target, GNUNET_SYSERR);
599     return GNUNET_SYSERR;
600   }
601
602   if ((session == NULL) && (address->address_length == 0))
603   {
604     if (cont != NULL)
605       cont (cont_cls, target, GNUNET_SYSERR);
606     return GNUNET_SYSERR;
607   }
608
609   papi = GST_plugins_find (address->transport_name);
610   if (papi == NULL)
611   {
612     if (cont != NULL)
613       cont (cont_cls, target, GNUNET_SYSERR);
614     return GNUNET_SYSERR;
615   }
616
617   ret =
618       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
619                   address->address, 
620                   address->address_length, GNUNET_YES, cont, cont_cls);
621
622   if (ret == -1)
623   {
624     if (cont != NULL)
625       cont (cont_cls, target, GNUNET_SYSERR);
626   }
627   return ret;
628 }
629
630 /**
631  * Task invoked to start a transmission to another peer.
632  *
633  * @param cls the 'struct NeighbourMapEntry'
634  * @param tc scheduler context
635  */
636 static void
637 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
638
639
640 /**
641  * We're done with our transmission attempt, continue processing.
642  *
643  * @param cls the 'struct MessageQueue' of the message
644  * @param receiver intended receiver
645  * @param success whether it worked or not
646  */
647 static void
648 transmit_send_continuation (void *cls,
649                             const struct GNUNET_PeerIdentity *receiver,
650                             int success)
651 {
652   struct MessageQueue *mq;
653   struct NeighbourMapEntry *n;
654
655   mq = cls;
656   n = mq->n;
657   if (NULL != n)
658   {
659     GNUNET_assert (n->is_active == mq);
660     n->is_active = NULL;
661     if (success == GNUNET_YES)
662     {
663       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
664       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
665     }
666   }
667 #if DEBUG_TRANSPORT
668   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
669               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
670               (success == GNUNET_OK) ? "successful" : "FAILED");
671 #endif
672   if (NULL != mq->cont)
673     mq->cont (mq->cont_cls, success);
674   GNUNET_free (mq);
675 }
676
677
678 /**
679  * Check the ready list for the given neighbour and if a plugin is
680  * ready for transmission (and if we have a message), do so!
681  *
682  * @param n target peer for which to transmit
683  */
684 static void
685 try_transmission_to_peer (struct NeighbourMapEntry *n)
686 {
687   struct MessageQueue *mq;
688   struct GNUNET_TIME_Relative timeout;
689   ssize_t ret;
690
691   if (n->is_active != NULL)
692   {
693     GNUNET_break (0);
694     return;                     /* transmission already pending */
695   }
696   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
697   {
698     GNUNET_break (0);
699     return;                     /* currently waiting for bandwidth */
700   }
701   while (NULL != (mq = n->messages_head))
702   {
703     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
704     if (timeout.rel_value > 0)
705       break;
706     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
707     n->is_active = mq;
708     mq->n = n;
709     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
710   }
711   if (NULL == mq)
712     return;                     /* no more messages */
713
714   if (n->address == NULL)
715   {
716     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
717                 "No address for peer `%s'\n",
718                 GNUNET_i2s (&n->id));
719     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
720     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
721     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
722     return;
723   }
724
725   if (GST_plugins_find (n->address->transport_name) == NULL)
726   {
727     GNUNET_break (0);
728     return;
729   }
730   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
731   n->is_active = mq;
732   mq->n = n;
733
734   if ((n->address->address_length == 0) && (n->session == NULL))
735   {
736     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
737                 "No address for peer `%s'\n",
738                 GNUNET_i2s (&n->id));
739     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
740     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
741     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
742     return;
743   }
744
745   ret =
746       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
747                         timeout, n->session, n->address, 
748                         GNUNET_YES, &transmit_send_continuation,
749                         mq);
750   if (ret == -1)
751   {
752     /* failure, but 'send' would not call continuation in this case,
753      * so we need to do it here! */
754     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
755   }
756
757 }
758
759
760 /**
761  * Task invoked to start a transmission to another peer.
762  *
763  * @param cls the 'struct NeighbourMapEntry'
764  * @param tc scheduler context
765  */
766 static void
767 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
768 {
769   struct NeighbourMapEntry *n = cls;
770
771   GNUNET_assert (NULL != lookup_neighbour (&n->id));
772   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
773   try_transmission_to_peer (n);
774 }
775
776
777 /**
778  * Initialize the neighbours subsystem.
779  *
780  * @param cls closure for callbacks
781  * @param connect_cb function to call if we connect to a peer
782  * @param disconnect_cb function to call if we disconnect from a peer
783  */
784 void
785 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
786                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
787 {
788   callback_cls = cls;
789   connect_notify_cb = connect_cb;
790   disconnect_notify_cb = disconnect_cb;
791   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
792 }
793
794
795 static void
796 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
797                       int result)
798 {
799 #if DEBUG_TRANSPORT
800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801               "Sending DISCONNECT message to peer `%4s': %i\n",
802               GNUNET_i2s (target), result);
803 #endif
804 }
805
806
807 static int
808 send_disconnect (const struct GNUNET_PeerIdentity *target,
809                  const struct GNUNET_HELLO_Address *address,
810                  struct Session *session)
811 {
812   size_t ret;
813   struct SessionDisconnectMessage disconnect_msg;
814
815 #if DEBUG_TRANSPORT
816   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817               "Sending DISCONNECT message to peer `%4s'\n",
818               GNUNET_i2s (target));
819 #endif
820
821   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
822   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
823   disconnect_msg.reserved = htonl (0);
824   disconnect_msg.purpose.size =
825       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
826              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
827              sizeof (struct GNUNET_TIME_AbsoluteNBO));
828   disconnect_msg.purpose.purpose =
829       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
830   disconnect_msg.timestamp =
831       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
832   disconnect_msg.public_key = GST_my_public_key;
833   GNUNET_assert (GNUNET_OK ==
834                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
835                                          &disconnect_msg.purpose,
836                                          &disconnect_msg.signature));
837
838   ret =
839       send_with_plugin (target, (const char *) &disconnect_msg,
840                         sizeof (disconnect_msg), UINT32_MAX,
841                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
842                         GNUNET_YES,
843                         &send_disconnect_cont, NULL);
844
845   if (ret == GNUNET_SYSERR)
846     return GNUNET_SYSERR;
847
848   GNUNET_STATISTICS_update (GST_stats,
849                             gettext_noop
850                             ("# peers disconnected due to external request"), 1,
851                             GNUNET_NO);
852   return GNUNET_OK;
853 }
854
855
856 /**
857  * Disconnect from the given neighbour, clean up the record.
858  *
859  * @param n neighbour to disconnect from
860  */
861 static void
862 disconnect_neighbour (struct NeighbourMapEntry *n)
863 {
864   struct MessageQueue *mq;
865   int previous_state;
866
867   previous_state = n->state;
868
869   if (is_disconnecting (n))
870     return;
871
872
873   /* send DISCONNECT MESSAGE */
874   if ((previous_state == S_CONNECTED) || is_connecting (n))
875   {
876     if (GNUNET_OK ==
877         send_disconnect (&n->id, n->address,
878                          n->session))
879       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
880                   GNUNET_i2s (&n->id));
881     else
882       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
883                   "Could not send DISCONNECT_MSG to `%s'\n",
884                   GNUNET_i2s (&n->id));
885   }
886
887   change_state (n, S_DISCONNECT);
888
889   if (previous_state == S_CONNECTED)
890   {
891     GNUNET_assert (NULL != n->address);
892     if (n->address_state == USED)
893     {
894       GST_validation_set_address_use (&n->id,
895                                       n->address,
896                                       n->session,
897                                       GNUNET_NO);
898
899       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
900       n->address_state = UNUSED;
901     }
902   }
903
904   if (n->address != NULL)
905   {
906     struct GNUNET_TRANSPORT_PluginFunctions *papi;
907     papi = GST_plugins_find (n->address->transport_name);
908     if (papi != NULL)
909       papi->disconnect (papi->cls, &n->id);
910   }
911   while (NULL != (mq = n->messages_head))
912   {
913     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
914     if (NULL != mq->cont)
915       mq->cont (mq->cont_cls, GNUNET_SYSERR);
916     GNUNET_free (mq);
917   }
918   if (NULL != n->is_active)
919   {
920     n->is_active->n = NULL;
921     n->is_active = NULL;
922   }
923
924   switch (previous_state) {
925     case S_CONNECTED:
926 //      GNUNET_assert (neighbours_connected > 0);
927       neighbours_connected--;
928       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
929       GNUNET_SCHEDULER_cancel (n->keepalive_task);
930       n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
931       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
932                                 GNUNET_NO);
933       disconnect_notify_cb (callback_cls, &n->id);
934       break;
935     case S_FAST_RECONNECT:
936       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
937                                 GNUNET_NO);
938       GNUNET_STATISTICS_update (GST_stats,
939                                 gettext_noop
940                                 ("# fast reconnects failed"),
941                                 1, GNUNET_NO);
942       disconnect_notify_cb (callback_cls, &n->id);
943     default:
944       break;
945   }
946
947   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
948
949   GNUNET_assert (GNUNET_YES ==
950                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
951                                                        &n->id.hashPubKey, n));
952   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
953   {
954     GNUNET_SCHEDULER_cancel (n->ats_suggest);
955     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
956   }
957   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
958   {
959     GNUNET_SCHEDULER_cancel (n->timeout_task);
960     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
961   }
962   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
963   {
964     GNUNET_SCHEDULER_cancel (n->transmission_task);
965     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
966   }
967   if (NULL != n->address)
968   {
969     GNUNET_HELLO_address_free (n->address);
970     n->address = NULL;
971   }
972   n->session = NULL;
973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
974               GNUNET_i2s (&n->id), n);
975   GNUNET_free (n);
976 }
977
978
979 /**
980  * Peer has been idle for too long. Disconnect.
981  *
982  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
983  * @param tc scheduler context
984  */
985 static void
986 neighbour_timeout_task (void *cls,
987                         const struct GNUNET_SCHEDULER_TaskContext *tc)
988 {
989   struct NeighbourMapEntry *n = cls;
990
991   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
992
993   GNUNET_STATISTICS_update (GST_stats,
994                             gettext_noop
995                             ("# peers disconnected due to timeout"), 1,
996                             GNUNET_NO);
997   disconnect_neighbour (n);
998 }
999
1000
1001 /**
1002  * Send another keepalive message.
1003  *
1004  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1005  * @param tc scheduler context
1006  */
1007 static void
1008 neighbour_keepalive_task (void *cls,
1009                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1010 {
1011   struct NeighbourMapEntry *n = cls;
1012   struct GNUNET_MessageHeader m;
1013   int ret;
1014
1015   n->keepalive_task =
1016       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1017                                     &neighbour_keepalive_task, n);
1018
1019   GNUNET_assert (S_CONNECTED == n->state);
1020   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1021                             GNUNET_NO);
1022   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1023   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1024
1025
1026   ret = send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1027                     UINT32_MAX /* priority */ ,
1028                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address, 
1029                     GNUNET_YES, NULL, NULL);
1030
1031   n->expect_latency_response = GNUNET_NO;
1032   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero();
1033   if (ret != GNUNET_SYSERR)
1034   {
1035     n->expect_latency_response = GNUNET_YES;
1036     n->keep_alive_sent = GNUNET_TIME_absolute_get();
1037   }
1038
1039 }
1040
1041
1042 /**
1043  * Disconnect from the given neighbour.
1044  *
1045  * @param cls unused
1046  * @param key hash of neighbour's public key (not used)
1047  * @param value the 'struct NeighbourMapEntry' of the neighbour
1048  */
1049 static int
1050 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1051 {
1052   struct NeighbourMapEntry *n = value;
1053
1054 #if DEBUG_TRANSPORT
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1056               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1057 #endif
1058   if (S_CONNECTED == n->state)
1059     GNUNET_STATISTICS_update (GST_stats,
1060                               gettext_noop
1061                               ("# peers disconnected due to global disconnect"),
1062                               1, GNUNET_NO);
1063   disconnect_neighbour (n);
1064   return GNUNET_OK;
1065 }
1066
1067
1068 static void
1069 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1070 {
1071   struct NeighbourMapEntry *n = cls;
1072
1073   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1074
1075   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076               " ATS did not suggested address to connect to peer `%s'\n",
1077               GNUNET_i2s (&n->id));
1078
1079   disconnect_neighbour (n);
1080 }
1081
1082 /**
1083  * Cleanup the neighbours subsystem.
1084  */
1085 void
1086 GST_neighbours_stop ()
1087 {
1088   // This can happen during shutdown
1089   if (neighbours == NULL)
1090   {
1091     return;
1092   }
1093
1094   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1095                                          NULL);
1096   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1097 //  GNUNET_assert (neighbours_connected == 0);
1098   neighbours = NULL;
1099   callback_cls = NULL;
1100   connect_notify_cb = NULL;
1101   disconnect_notify_cb = NULL;
1102 }
1103
1104 struct ContinutionContext
1105 {
1106   struct GNUNET_HELLO_Address *address;
1107
1108   struct Session *session;
1109 };
1110
1111 static void send_outbound_quota (const struct GNUNET_PeerIdentity *target, struct GNUNET_BANDWIDTH_Value32NBO quota)
1112 {
1113    struct QuotaSetMessage q_msg;
1114 #if DEBUG_TRANSPORT
1115     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1116                 "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1117                 ntohl (quota.value__), GNUNET_i2s (target));
1118 #endif
1119     q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1120     q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1121     q_msg.quota = quota;
1122     q_msg.peer = (*target);
1123     GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1124 }
1125
1126 /**
1127  * We tried to send a SESSION_CONNECT message to another peer.  If this
1128  * succeeded, we change the state.  If it failed, we should tell
1129  * ATS to not use this address anymore (until it is re-validated).
1130  *
1131  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1132  * @param success GNUNET_OK on success
1133  */
1134 static void
1135 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1136                            int success)
1137 {
1138   struct ContinutionContext * cc = cls;
1139   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1140   
1141   if (GNUNET_YES != success)
1142     GNUNET_ATS_address_destroyed (GST_ats, cc->address, NULL);
1143     //GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1144   if ( (NULL == neighbours) ||
1145        (NULL == n) ||
1146        (n->state == S_DISCONNECT))
1147   {
1148     GNUNET_HELLO_address_free (cc->address);
1149     GNUNET_free (cc);
1150     return;
1151   }
1152
1153   if ((GNUNET_YES == success) &&
1154      ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1155   {
1156     change_state (n, S_CONNECT_SENT);
1157     GNUNET_HELLO_address_free (cc->address);
1158     GNUNET_free (cc);
1159     return;
1160   }
1161
1162   if ((GNUNET_NO == success) &&
1163      ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1164   {
1165 #if DEBUG_TRANSPORT
1166     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1168                 GNUNET_i2s (&n->id),
1169                 GST_plugins_a2s (n->address),
1170                 n->session);
1171 #endif
1172     change_state (n, S_NOT_CONNECTED);
1173     GNUNET_ATS_address_destroyed (GST_ats, cc->address, NULL);
1174     //GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1175
1176     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1177       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1178     n->ats_suggest =
1179       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1180                                     n);
1181     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1182   }
1183   GNUNET_HELLO_address_free (cc->address);
1184   GNUNET_free (cc);
1185 }
1186
1187 /**
1188  * We tried to switch addresses with an peer already connected. If it failed,
1189  * we should tell ATS to not use this address anymore (until it is re-validated).
1190  *
1191  * @param cls the 'struct NeighbourMapEntry'
1192  * @param success GNUNET_OK on success
1193  */
1194 static void
1195 send_switch_address_continuation (void *cls,
1196                                   const struct GNUNET_PeerIdentity *target,
1197                                   int success)
1198 {
1199   struct ContinutionContext * cc = cls;
1200   struct NeighbourMapEntry *n;
1201
1202   if (neighbours == NULL)
1203   {
1204     GNUNET_HELLO_address_free (cc->address);
1205     GNUNET_free (cc);
1206     return;                     /* neighbour is going away */
1207   }
1208
1209   n = lookup_neighbour(&cc->address->peer);
1210   if ((n == NULL) || (is_disconnecting (n)))
1211   {
1212     GNUNET_HELLO_address_free (cc->address);
1213     GNUNET_free (cc);
1214     return;                     /* neighbour is going away */
1215   }
1216
1217   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1218   if (GNUNET_YES != success)
1219   {
1220 #if DEBUG_TRANSPORT
1221     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222                 "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n",
1223                 GNUNET_i2s (&n->id), 
1224                 GST_plugins_a2s (n->address), n->session);
1225 #endif
1226     GNUNET_ATS_address_destroyed (GST_ats, cc->address, NULL);
1227     //GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1228
1229     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1230       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1231     n->ats_suggest =
1232         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1233                                       n);
1234     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1235     GNUNET_HELLO_address_free (cc->address);
1236     GNUNET_free (cc);
1237     return;
1238   }
1239   /* Tell ATS that switching addresses was successful */
1240   switch (n->state) {
1241     case S_CONNECTED:
1242       if (n->address_state == FRESH)
1243       {
1244           GST_validation_set_address_use (&n->id,
1245                     cc->address,
1246                     cc->session,
1247                     GNUNET_YES);
1248           GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1249           n->address_state = USED;
1250       }
1251       break;
1252     case S_FAST_RECONNECT:
1253 #if DEBUG_TRANSPORT
1254       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255                   "Successful fast reconnect to peer `%s'\n", GNUNET_i2s (&n->id));
1256 #endif
1257       change_state (n, S_CONNECTED);
1258       neighbours_connected++;
1259       GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1260                                 GNUNET_NO);
1261
1262       if (n->address_state == FRESH)
1263       {
1264           GST_validation_set_address_use (&n->id,
1265                     cc->address,
1266                     cc->session,
1267                     GNUNET_YES);
1268           GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1269           n->address_state = USED;
1270       }
1271
1272       if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1273             n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1274
1275       /* Updating quotas */
1276       GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1277       send_outbound_quota(target, n->bandwidth_out);
1278
1279     default:
1280       break;
1281   }
1282   GNUNET_HELLO_address_free (cc->address);
1283   GNUNET_free (cc);
1284 }
1285
1286 /**
1287  * We tried to send a SESSION_CONNECT message to another peer.  If this
1288  * succeeded, we change the state.  If it failed, we should tell
1289  * ATS to not use this address anymore (until it is re-validated).
1290  *
1291  * @param cls the 'struct NeighbourMapEntry'
1292  * @param success GNUNET_OK on success
1293  */
1294 static void
1295 send_connect_ack_continuation (void *cls,
1296                                const struct GNUNET_PeerIdentity *target,
1297                                int success)
1298 {
1299   struct ContinutionContext * cc = cls;
1300   struct NeighbourMapEntry *n;
1301
1302   if (neighbours == NULL)
1303   {
1304     GNUNET_HELLO_address_free (cc->address);
1305     GNUNET_free (cc);
1306     return;                     /* neighbour is going away */
1307   }
1308
1309   n = lookup_neighbour(&cc->address->peer);
1310   if ((n == NULL) || (is_disconnecting (n)))
1311   {
1312     GNUNET_HELLO_address_free (cc->address);
1313     GNUNET_free (cc);
1314     return;                     /* neighbour is going away */
1315   }
1316
1317   if (GNUNET_YES == success)
1318   {
1319     GNUNET_HELLO_address_free (cc->address);
1320     GNUNET_free (cc);
1321     return;                     /* sending successful */
1322   }
1323
1324   /* sending failed, ask for next address  */
1325 #if DEBUG_TRANSPORT
1326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1327               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1328               GNUNET_i2s (&n->id), 
1329               GST_plugins_a2s (n->address),
1330               n->session);
1331 #endif
1332   change_state (n, S_NOT_CONNECTED);
1333
1334   GNUNET_ATS_address_destroyed (GST_ats, cc->address, NULL);
1335   //GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1336
1337   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1338     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1339   n->ats_suggest =
1340       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1341                                     n);
1342   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1343
1344   GNUNET_HELLO_address_free (cc->address);
1345   GNUNET_free (cc);
1346 }
1347
1348 /**
1349  * For an existing neighbour record, set the active connection to
1350  * the given address.
1351  *
1352  * @param peer identity of the peer to switch the address for
1353  * @param address address of the other peer, NULL if other peer
1354  *                       connected to us
1355  * @param session session to use (or NULL)
1356  * @param ats performance data
1357  * @param ats_count number of entries in ats
1358  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1359  *         connection is not up (yet)
1360  */
1361 int
1362 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1363                                        const struct GNUNET_HELLO_Address *address,
1364                                        struct Session *session,
1365                                        const struct GNUNET_ATS_Information *ats,
1366                                        uint32_t ats_count,
1367                                        struct GNUNET_BANDWIDTH_Value32NBO
1368                                        bandwidth_in,
1369                                        struct GNUNET_BANDWIDTH_Value32NBO
1370                                        bandwidth_out)
1371 {
1372   struct NeighbourMapEntry *n;
1373   struct SessionConnectMessage connect_msg;
1374   struct ContinutionContext * cc;
1375   size_t msg_len;
1376   size_t ret;
1377
1378   if (neighbours == NULL)
1379   {
1380     /* This can happen during shutdown */
1381     return GNUNET_NO;
1382   }
1383   n = lookup_neighbour (peer);
1384   if (NULL == n)
1385     return GNUNET_NO;
1386   if (n->state == S_DISCONNECT)
1387   {
1388     /* We are disconnecting, nothing to do here */
1389     return GNUNET_NO;
1390   }
1391   GNUNET_assert (address->transport_name != NULL);
1392   if ( (session == NULL) && (0 == address->address_length) )
1393   {
1394     GNUNET_break_op (0);
1395     GNUNET_ATS_address_destroyed (GST_ats, address, session);
1396     GNUNET_ATS_suggest_address (GST_ats, peer);
1397     return GNUNET_NO;
1398   }
1399
1400   /* checks successful and neighbour != NULL */
1401 #if DEBUG_TRANSPORT
1402   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1403               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1404               GST_plugins_a2s (address),
1405               session,
1406               GNUNET_i2s (peer),
1407               print_state(n->state));
1408 #endif
1409   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1410   {
1411     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1412     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1413   }
1414   /* do not switch addresses just update quotas */
1415   if ( (n->state == S_CONNECTED) && 
1416        (NULL != n->address) &&
1417        (0 == GNUNET_HELLO_address_cmp (address,
1418                                        n->address)) &&
1419        (n->session == session) )
1420   {
1421     n->bandwidth_in = bandwidth_in;
1422     n->bandwidth_out = bandwidth_out;
1423     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1424     send_outbound_quota(peer, n->bandwidth_out);
1425     return GNUNET_NO;    
1426   }
1427   if (n->state == S_CONNECTED)
1428   {
1429     /* mark old address as no longer used */
1430     GNUNET_assert (NULL != n->address);
1431     if (n->address_state == USED)
1432     {
1433       GST_validation_set_address_use (&n->id,
1434                                       n->address,
1435                                       n->session,
1436                                       GNUNET_NO);
1437       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1438       n->address_state = UNUSED;
1439     }
1440
1441   }
1442
1443   /* set new address */
1444   if (NULL != n->address)
1445     GNUNET_HELLO_address_free (n->address);
1446   n->address = GNUNET_HELLO_address_copy (address);
1447   n->address_state = FRESH;
1448   n->session = session;
1449   n->bandwidth_in = bandwidth_in;
1450   n->bandwidth_out = bandwidth_out;
1451   GNUNET_SCHEDULER_cancel (n->timeout_task);
1452   n->timeout_task =
1453       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1454                                     &neighbour_timeout_task, n);
1455   switch (n->state)
1456   {
1457   case S_NOT_CONNECTED:  
1458   case S_CONNECT_SENT:
1459     msg_len = sizeof (struct SessionConnectMessage);
1460     connect_msg.header.size = htons (msg_len);
1461     connect_msg.header.type =
1462         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1463     connect_msg.reserved = htonl (0);
1464     connect_msg.timestamp =
1465         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1466
1467     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1468     cc->session = session;
1469     cc->address = GNUNET_HELLO_address_copy (address);
1470     ret =
1471         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1472                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1473                           address, GNUNET_YES,
1474                           &send_connect_continuation, 
1475                           cc);
1476     return GNUNET_NO;
1477   case S_CONNECT_RECV:
1478     /* We received a CONNECT message and asked ATS for an address */
1479     msg_len = sizeof (struct SessionConnectMessage);
1480     connect_msg.header.size = htons (msg_len);
1481     connect_msg.header.type =
1482         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1483     connect_msg.reserved = htonl (0);
1484     connect_msg.timestamp =
1485         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1486     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1487     cc->session = session;
1488     cc->address = GNUNET_HELLO_address_copy (address);
1489     ret =
1490         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1491                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1492                           address, GNUNET_YES,
1493                           &send_connect_ack_continuation, cc);
1494     return GNUNET_NO;
1495   case S_CONNECTED:
1496   case S_FAST_RECONNECT:
1497     /* connected peer is switching addresses or tries fast reconnect*/
1498     msg_len = sizeof (struct SessionConnectMessage);
1499     connect_msg.header.size = htons (msg_len);
1500     connect_msg.header.type =
1501         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1502     connect_msg.reserved = htonl (0);
1503     connect_msg.timestamp =
1504         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1505     cc = GNUNET_malloc(sizeof (struct ContinutionContext));
1506     cc->session = session;
1507     cc->address = GNUNET_HELLO_address_copy (address);
1508     ret =
1509         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1510                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1511                           address, GNUNET_YES,
1512                           &send_switch_address_continuation, cc);
1513     if (ret == GNUNET_SYSERR)
1514     {
1515       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1516                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1517                   GNUNET_i2s (peer), 
1518                   GST_plugins_a2s (address), session);
1519     }
1520     return GNUNET_NO;
1521   default:
1522     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1523                 "Invalid connection state to switch addresses %u \n", n->state);
1524     GNUNET_break_op (0);
1525     return GNUNET_NO;
1526   }
1527 }
1528
1529
1530 /**
1531  * Obtain current latency information for the given neighbour.
1532  *
1533  * @param peer 
1534  * @return observed latency of the address, FOREVER if the address was
1535  *         never successfully validated
1536  */
1537 struct GNUNET_TIME_Relative
1538 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1539 {
1540   struct NeighbourMapEntry *n;
1541
1542   n = lookup_neighbour (peer);
1543   if ( (NULL == n) ||
1544        ( (n->address == NULL) && (n->session == NULL) ) )
1545     return GNUNET_TIME_UNIT_FOREVER_REL;
1546
1547   return n->latency;
1548 }
1549
1550
1551 /**
1552  * Create an entry in the neighbour map for the given peer
1553  *
1554  * @param peer peer to create an entry for
1555  * @return new neighbour map entry
1556  */
1557 static struct NeighbourMapEntry *
1558 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1559 {
1560   struct NeighbourMapEntry *n;
1561
1562 #if DEBUG_TRANSPORT
1563   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1564               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1565 #endif
1566   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1567   n->id = *peer;
1568   n->state = S_NOT_CONNECTED;
1569   n->latency = GNUNET_TIME_relative_get_forever();
1570   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1571                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1572                                  MAX_BANDWIDTH_CARRY_S);
1573   n->timeout_task =
1574       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1575                                     &neighbour_timeout_task, n);
1576   GNUNET_assert (GNUNET_OK ==
1577                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1578                                                     &n->id.hashPubKey, n,
1579                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1580   return n;
1581 }
1582
1583
1584 /**
1585  * Try to create a connection to the given target (eventually).
1586  *
1587  * @param target peer to try to connect to
1588  */
1589 void
1590 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1591 {
1592   struct NeighbourMapEntry *n;
1593
1594   // This can happen during shutdown
1595   if (neighbours == NULL)
1596   {
1597     return;
1598   }
1599 #if DEBUG_TRANSPORT
1600   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1601               GNUNET_i2s (target));
1602 #endif
1603   if (0 ==
1604       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1605   {
1606     /* my own hello */
1607     return;
1608   }
1609   n = lookup_neighbour (target);
1610
1611   if (NULL != n)
1612   {
1613     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1614       return;                   /* already connecting or connected */
1615     if (is_disconnecting (n))
1616       change_state (n, S_NOT_CONNECTED);
1617   }
1618
1619
1620   if (n == NULL)
1621     n = setup_neighbour (target);
1622 #if DEBUG_TRANSPORT
1623   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1624               "Asking ATS for suggested address to connect to peer `%s'\n",
1625               GNUNET_i2s (&n->id));
1626 #endif
1627
1628   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1629 }
1630
1631 /**
1632  * Test if we're connected to the given peer.
1633  *
1634  * @param target peer to test
1635  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1636  */
1637 int
1638 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1639 {
1640   struct NeighbourMapEntry *n;
1641
1642   // This can happen during shutdown
1643   if (neighbours == NULL)
1644   {
1645     return GNUNET_NO;
1646   }
1647
1648   n = lookup_neighbour (target);
1649
1650   if ((NULL == n) || (S_CONNECTED != n->state))
1651     return GNUNET_NO;           /* not connected */
1652   return GNUNET_YES;
1653 }
1654
1655 /**
1656  * A session was terminated. Take note.
1657  *
1658  * @param peer identity of the peer where the session died
1659  * @param session session that is gone
1660  */
1661 void
1662 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1663                                    struct Session *session)
1664 {
1665   struct NeighbourMapEntry *n;
1666
1667   if (neighbours == NULL)
1668   {
1669     /* This can happen during shutdown */
1670     return;
1671   }
1672
1673 #if DEBUG_TRANSPORT
1674   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Session %X to peer `%s' ended \n",
1675               session, GNUNET_i2s (peer));
1676 #endif
1677
1678   n = lookup_neighbour (peer);
1679   if (NULL == n)
1680     return;
1681   if (session != n->session)
1682     return;                     /* doesn't affect us */
1683   if (n->state == S_CONNECTED)
1684   {
1685     if (n->address_state == USED)
1686     {
1687       GST_validation_set_address_use (&n->id,
1688                                       n->address,
1689                                       n->session,
1690                                       GNUNET_NO);
1691       GNUNET_ATS_address_in_use (GST_ats,n->address, n->session, GNUNET_NO);
1692       n->address_state = UNUSED;
1693     }
1694   }
1695
1696
1697   //GNUNET_ATS_address_destroyed(GST_ats, n->address, n->session);
1698
1699   if (NULL != n->address)
1700   {
1701     GNUNET_HELLO_address_free (n->address);
1702     n->address = NULL;
1703   }
1704   n->session = NULL;
1705   
1706   /* not connected anymore anyway, shouldn't matter */
1707   if ((S_CONNECTED != n->state) && (!is_connecting (n)))
1708     return;
1709
1710   /* connected, try fast reconnect */
1711 #if DEBUG_TRANSPORT
1712   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1713               "Trying fast reconnect to peer `%s'\n", GNUNET_i2s (peer));
1714 #endif
1715   change_state (n, S_FAST_RECONNECT);
1716   GNUNET_assert (neighbours_connected > 0);
1717   neighbours_connected--;
1718
1719   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1720   {
1721     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1722     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1723   }
1724
1725   /* We are connected, so ask ATS to switch addresses */
1726   GNUNET_SCHEDULER_cancel (n->timeout_task);
1727   n->timeout_task =
1728       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1729                                     &neighbour_timeout_task, n);
1730   /* try QUICKLY to re-establish a connection, reduce timeout! */
1731   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1732     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1733   n->ats_suggest =
1734       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1735                                     n);
1736   GNUNET_ATS_suggest_address (GST_ats, peer);
1737 }
1738
1739
1740 /**
1741  * Transmit a message to the given target using the active connection.
1742  *
1743  * @param target destination
1744  * @param msg message to send
1745  * @param msg_size number of bytes in msg
1746  * @param timeout when to fail with timeout
1747  * @param cont function to call when done
1748  * @param cont_cls closure for 'cont'
1749  */
1750 void
1751 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1752                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1753                      GST_NeighbourSendContinuation cont, void *cont_cls)
1754 {
1755   struct NeighbourMapEntry *n;
1756   struct MessageQueue *mq;
1757
1758   // This can happen during shutdown
1759   if (neighbours == NULL)
1760   {
1761     return;
1762   }
1763
1764   n = lookup_neighbour (target);
1765   if ((n == NULL) || (!is_connected (n)))
1766   {
1767     GNUNET_STATISTICS_update (GST_stats,
1768                               gettext_noop
1769                               ("# messages not sent (no such peer or not connected)"),
1770                               1, GNUNET_NO);
1771 #if DEBUG_TRANSPORT
1772     if (n == NULL)
1773       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1774                   "Could not send message to peer `%s': unknown neighbour",
1775                   GNUNET_i2s (target));
1776     else if (!is_connected (n))
1777       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1778                   "Could not send message to peer `%s': not connected\n",
1779                   GNUNET_i2s (target));
1780 #endif
1781     if (NULL != cont)
1782       cont (cont_cls, GNUNET_SYSERR);
1783     return;
1784   }
1785
1786   if ((n->session == NULL) && (n->address == NULL) )
1787   {
1788     GNUNET_STATISTICS_update (GST_stats,
1789                               gettext_noop
1790                               ("# messages not sent (no such peer or not connected)"),
1791                               1, GNUNET_NO);
1792 #if DEBUG_TRANSPORT
1793     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1794                 "Could not send message to peer `%s': no address available\n",
1795                 GNUNET_i2s (target));
1796 #endif
1797
1798     if (NULL != cont)
1799       cont (cont_cls, GNUNET_SYSERR);
1800     return;
1801   }
1802
1803   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1804   GNUNET_STATISTICS_update (GST_stats,
1805                             gettext_noop
1806                             ("# bytes in message queue for other peers"),
1807                             msg_size, GNUNET_NO);
1808   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1809   mq->cont = cont;
1810   mq->cont_cls = cont_cls;
1811   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1812   memcpy (&mq[1], msg, msg_size);
1813   mq->message_buf = (const char *) &mq[1];
1814   mq->message_buf_size = msg_size;
1815   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1816   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1817
1818   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1819       (NULL == n->is_active))
1820     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1821 }
1822
1823
1824 /**
1825  * We have received a message from the given sender.  How long should
1826  * we delay before receiving more?  (Also used to keep the peer marked
1827  * as live).
1828  *
1829  * @param sender sender of the message
1830  * @param size size of the message
1831  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1832  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1833  *                   GNUNET_SYSERR if the connection is not fully up yet
1834  * @return how long to wait before reading more from this sender
1835  */
1836 struct GNUNET_TIME_Relative
1837 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1838                                         *sender, ssize_t size, int *do_forward)
1839 {
1840   struct NeighbourMapEntry *n;
1841   struct GNUNET_TIME_Relative ret;
1842
1843   // This can happen during shutdown
1844   if (neighbours == NULL)
1845   {
1846     return GNUNET_TIME_UNIT_FOREVER_REL;
1847   }
1848
1849   n = lookup_neighbour (sender);
1850   if (n == NULL)
1851   {
1852     GST_neighbours_try_connect (sender);
1853     n = lookup_neighbour (sender);
1854     if (NULL == n)
1855     {
1856       GNUNET_STATISTICS_update (GST_stats,
1857                                 gettext_noop
1858                                 ("# messages discarded due to lack of neighbour record"),
1859                                 1, GNUNET_NO);
1860       *do_forward = GNUNET_NO;
1861       return GNUNET_TIME_UNIT_ZERO;
1862     }
1863   }
1864   if (!is_connected (n))
1865   {
1866     *do_forward = GNUNET_SYSERR;
1867     return GNUNET_TIME_UNIT_ZERO;
1868   }
1869   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1870   {
1871     n->quota_violation_count++;
1872 #if DEBUG_TRANSPORT
1873     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1874                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1875                 n->in_tracker.available_bytes_per_s__,
1876                 n->quota_violation_count);
1877 #endif
1878     /* Discount 32k per violation */
1879     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1880   }
1881   else
1882   {
1883     if (n->quota_violation_count > 0)
1884     {
1885       /* try to add 32k back */
1886       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1887       n->quota_violation_count--;
1888     }
1889   }
1890   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1891   {
1892     GNUNET_STATISTICS_update (GST_stats,
1893                               gettext_noop
1894                               ("# bandwidth quota violations by other peers"),
1895                               1, GNUNET_NO);
1896     *do_forward = GNUNET_NO;
1897     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1898   }
1899   *do_forward = GNUNET_YES;
1900   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1901   if (ret.rel_value > 0)
1902   {
1903 #if DEBUG_TRANSPORT
1904     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1905                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1906                 (unsigned long long) n->in_tracker.
1907                 consumption_since_last_update__,
1908                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1909                 (unsigned long long) ret.rel_value);
1910 #endif
1911     GNUNET_STATISTICS_update (GST_stats,
1912                               gettext_noop ("# ms throttling suggested"),
1913                               (int64_t) ret.rel_value, GNUNET_NO);
1914   }
1915   return ret;
1916 }
1917
1918
1919 /**
1920  * Keep the connection to the given neighbour alive longer,
1921  * we received a KEEPALIVE (or equivalent).
1922  *
1923  * @param neighbour neighbour to keep alive
1924  */
1925 void
1926 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1927 {
1928   struct NeighbourMapEntry *n;
1929
1930   // This can happen during shutdown
1931   if (neighbours == NULL)
1932   {
1933     return;
1934   }
1935
1936   n = lookup_neighbour (neighbour);
1937   if (NULL == n)
1938   {
1939     GNUNET_STATISTICS_update (GST_stats,
1940                               gettext_noop
1941                               ("# KEEPALIVE messages discarded (not connected)"),
1942                               1, GNUNET_NO);
1943     return;
1944   }
1945   GNUNET_SCHEDULER_cancel (n->timeout_task);
1946   n->timeout_task =
1947       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1948                                     &neighbour_timeout_task, n);
1949
1950   /* send reply to measure latency */
1951   if (S_CONNECTED != n->state)
1952     return;
1953
1954   struct GNUNET_MessageHeader m;
1955   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1956   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
1957
1958   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1959                     UINT32_MAX /* priority */ ,
1960                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1961                     GNUNET_YES, NULL, NULL);
1962 }
1963
1964 /**
1965  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
1966  * to this peer
1967  *
1968  * @param neighbour neighbour to keep alive
1969  */
1970 void
1971 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
1972                                    const struct GNUNET_ATS_Information * ats,
1973                                    uint32_t ats_count)
1974 {
1975   struct NeighbourMapEntry *n;
1976   struct GNUNET_ATS_Information * ats_new;
1977   uint32_t latency;
1978
1979   if (neighbours == NULL)
1980   {
1981     // This can happen during shutdown
1982     return;
1983   }
1984
1985   n = lookup_neighbour (neighbour);
1986   if (NULL == n)
1987   {
1988     GNUNET_STATISTICS_update (GST_stats,
1989                               gettext_noop
1990                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
1991                               1, GNUNET_NO);
1992     return;
1993   }
1994   if (n->expect_latency_response != GNUNET_YES)
1995   {
1996     GNUNET_STATISTICS_update (GST_stats,
1997                               gettext_noop
1998                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
1999                               1, GNUNET_NO);
2000     return;
2001   }
2002   n->expect_latency_response = GNUNET_NO;
2003
2004   GNUNET_assert (n->keep_alive_sent.abs_value != GNUNET_TIME_absolute_get_zero().abs_value);
2005   n->latency = GNUNET_TIME_absolute_get_difference(n->keep_alive_sent, GNUNET_TIME_absolute_get());
2006 #if DEBUG_TRANSPORT
2007   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2008               "Latency for peer `%s' is %llu ms\n",
2009               GNUNET_i2s (&n->id), n->latency.rel_value);
2010 #endif
2011
2012
2013   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever().rel_value)
2014     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2015   else
2016   {
2017     ats_new = GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2018     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2019
2020     /* add latency */
2021     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2022     if (n->latency.rel_value > UINT32_MAX)
2023       latency = UINT32_MAX;
2024     else
2025       latency = n->latency.rel_value;
2026     ats_new[ats_count].value = htonl (latency);
2027
2028     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new, ats_count + 1);
2029     GNUNET_free (ats_new);
2030   }
2031 }
2032
2033
2034 /**
2035  * Change the incoming quota for the given peer.
2036  *
2037  * @param neighbour identity of peer to change qutoa for
2038  * @param quota new quota
2039  */
2040 void
2041 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2042                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2043 {
2044   struct NeighbourMapEntry *n;
2045
2046   // This can happen during shutdown
2047   if (neighbours == NULL)
2048   {
2049     return;
2050   }
2051
2052   n = lookup_neighbour (neighbour);
2053   if (n == NULL)
2054   {
2055     GNUNET_STATISTICS_update (GST_stats,
2056                               gettext_noop
2057                               ("# SET QUOTA messages ignored (no such peer)"),
2058                               1, GNUNET_NO);
2059     return;
2060   }
2061 #if DEBUG_TRANSPORT
2062   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2063               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2064               ntohl (quota.value__), GNUNET_i2s (&n->id));
2065 #endif
2066   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2067   if (0 != ntohl (quota.value__))
2068     return;
2069 #if DEBUG_TRANSPORT
2070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2071               GNUNET_i2s (&n->id), "SET_QUOTA");
2072 #endif
2073   if (is_connected (n))
2074     GNUNET_STATISTICS_update (GST_stats,
2075                               gettext_noop ("# disconnects due to quota of 0"),
2076                               1, GNUNET_NO);
2077   disconnect_neighbour (n);
2078 }
2079
2080
2081 /**
2082  * Closure for the neighbours_iterate function.
2083  */
2084 struct IteratorContext
2085 {
2086   /**
2087    * Function to call on each connected neighbour.
2088    */
2089   GST_NeighbourIterator cb;
2090
2091   /**
2092    * Closure for 'cb'.
2093    */
2094   void *cb_cls;
2095 };
2096
2097
2098 /**
2099  * Call the callback from the closure for each connected neighbour.
2100  *
2101  * @param cls the 'struct IteratorContext'
2102  * @param key the hash of the public key of the neighbour
2103  * @param value the 'struct NeighbourMapEntry'
2104  * @return GNUNET_OK (continue to iterate)
2105  */
2106 static int
2107 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2108 {
2109   struct IteratorContext *ic = cls;
2110   struct NeighbourMapEntry *n = value;
2111
2112   if (!is_connected (n))
2113     return GNUNET_OK;
2114
2115   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2116   return GNUNET_OK;
2117 }
2118
2119
2120 /**
2121  * Iterate over all connected neighbours.
2122  *
2123  * @param cb function to call
2124  * @param cb_cls closure for cb
2125  */
2126 void
2127 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2128 {
2129   struct IteratorContext ic;
2130
2131   // This can happen during shutdown
2132   if (neighbours == NULL)
2133   {
2134     return;
2135   }
2136
2137   ic.cb = cb;
2138   ic.cb_cls = cb_cls;
2139   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2140 }
2141
2142 /**
2143  * If we have an active connection to the given target, it must be shutdown.
2144  *
2145  * @param target peer to disconnect from
2146  */
2147 void
2148 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2149 {
2150   struct NeighbourMapEntry *n;
2151
2152   // This can happen during shutdown
2153   if (neighbours == NULL)
2154   {
2155     return;
2156   }
2157
2158   n = lookup_neighbour (target);
2159   if (NULL == n)
2160     return;                     /* not active */
2161   disconnect_neighbour (n);
2162 }
2163
2164
2165 /**
2166  * We received a disconnect message from the given peer,
2167  * validate and process.
2168  *
2169  * @param peer sender of the message
2170  * @param msg the disconnect message
2171  */
2172 void
2173 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2174                                           *peer,
2175                                           const struct GNUNET_MessageHeader
2176                                           *msg)
2177 {
2178   struct NeighbourMapEntry *n;
2179   const struct SessionDisconnectMessage *sdm;
2180   GNUNET_HashCode hc;
2181
2182 #if DEBUG_TRANSPORT
2183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2184               "Received DISCONNECT message from peer `%s'\n",
2185               GNUNET_i2s (peer));
2186 #endif
2187
2188   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2189   {
2190     // GNUNET_break_op (0);
2191     GNUNET_STATISTICS_update (GST_stats,
2192                               gettext_noop
2193                               ("# disconnect messages ignored (old format)"), 1,
2194                               GNUNET_NO);
2195     return;
2196   }
2197   sdm = (const struct SessionDisconnectMessage *) msg;
2198   n = lookup_neighbour (peer);
2199   if (NULL == n)
2200     return;                     /* gone already */
2201   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2202       n->connect_ts.abs_value)
2203   {
2204     GNUNET_STATISTICS_update (GST_stats,
2205                               gettext_noop
2206                               ("# disconnect messages ignored (timestamp)"), 1,
2207                               GNUNET_NO);
2208     return;
2209   }
2210   GNUNET_CRYPTO_hash (&sdm->public_key,
2211                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2212                       &hc);
2213   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2214   {
2215     GNUNET_break_op (0);
2216     return;
2217   }
2218   if (ntohl (sdm->purpose.size) !=
2219       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2220       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2221       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2222   {
2223     GNUNET_break_op (0);
2224     return;
2225   }
2226   if (GNUNET_OK !=
2227       GNUNET_CRYPTO_rsa_verify
2228       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2229        &sdm->signature, &sdm->public_key))
2230   {
2231     GNUNET_break_op (0);
2232     return;
2233   }
2234   GST_neighbours_force_disconnect (peer);
2235 }
2236
2237
2238 /**
2239  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2240  * Consider switching to it.
2241  *
2242  * @param message possibly a 'struct SessionConnectMessage' (check format)
2243  * @param peer identity of the peer to switch the address for
2244  * @param address address of the other peer, NULL if other peer
2245  *                       connected to us
2246  * @param session session to use (or NULL)
2247  * @param ats performance data
2248  * @param ats_count number of entries in ats
2249  */
2250 void
2251 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2252                                    const struct GNUNET_PeerIdentity *peer,
2253                                    const struct GNUNET_HELLO_Address *address,
2254                                    struct Session *session,
2255                                    const struct GNUNET_ATS_Information *ats,
2256                                    uint32_t ats_count)
2257 {
2258   const struct SessionConnectMessage *scm;
2259   struct GNUNET_MessageHeader msg;
2260   struct NeighbourMapEntry *n;
2261   size_t msg_len;
2262   size_t ret;
2263
2264 #if DEBUG_TRANSPORT
2265   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2266               "Received CONNECT_ACK message from peer `%s'\n",
2267               GNUNET_i2s (peer));
2268 #endif
2269
2270   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2271   {
2272     GNUNET_break_op (0);
2273     return;
2274   }
2275   scm = (const struct SessionConnectMessage *) message;
2276   GNUNET_break_op (ntohl (scm->reserved) == 0);
2277   n = lookup_neighbour (peer);
2278   if (NULL == n)
2279   {
2280     /* we did not send 'CONNECT' (how could we? no record for this peer!) */
2281     GNUNET_break_op (0);
2282     return;
2283   }  
2284
2285   /* Additional check
2286    *
2287    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2288    *
2289    * We also received an CONNECT message, switched from SENDT to RECV and
2290    * ATS already suggested us an address after a successful blacklist check
2291    */
2292   if ((n->state != S_CONNECT_SENT) && ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2293   {
2294     GNUNET_STATISTICS_update (GST_stats,
2295                               gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
2296                               GNUNET_NO);
2297     return;
2298   }
2299
2300   if (NULL != session)
2301     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2302                      "transport-ats",
2303                      "Giving ATS session %p of plugin %s for peer %s\n",
2304                      session, address->transport_name, GNUNET_i2s (peer));
2305   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2306   GNUNET_assert (NULL != n->address);
2307
2308   change_state (n, S_CONNECTED);
2309   if (n->address_state == FRESH)
2310   {
2311     GST_validation_set_address_use (&n->id,
2312             n->address,
2313             n->session,
2314             GNUNET_YES);
2315     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2316     n->address_state = USED;
2317   }
2318
2319   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2320
2321   /* send ACK (ACK) */
2322   msg_len = sizeof (msg);
2323   msg.size = htons (msg_len);
2324   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2325
2326   ret =
2327       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2328                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2329                         n->address, GNUNET_YES, NULL,
2330                         NULL);
2331
2332   if (ret == GNUNET_SYSERR)
2333     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2334                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2335                 GNUNET_i2s (&n->id), 
2336                 GST_plugins_a2s (n->address), n->session);
2337
2338
2339   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2340     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2341   
2342   neighbours_connected++;
2343   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2344                             GNUNET_NO);
2345 #if DEBUG_TRANSPORT
2346   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2347               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2348               GNUNET_i2s (&n->id),
2349               GST_plugins_a2s (n->address), n->session,
2350               __LINE__);
2351 #endif
2352   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2353   send_outbound_quota(peer, n->bandwidth_out);
2354
2355 }
2356
2357
2358 void
2359 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2360                            const struct GNUNET_PeerIdentity *peer,
2361                            const struct GNUNET_HELLO_Address *address,
2362                            struct Session *session,
2363                            const struct GNUNET_ATS_Information *ats,
2364                            uint32_t ats_count)
2365 {
2366   struct NeighbourMapEntry *n;
2367
2368 #if DEBUG_TRANSPORT
2369   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2370               "Received ACK message from peer `%s'\n",
2371               GNUNET_i2s (peer));
2372 #endif
2373
2374   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2375   {
2376     GNUNET_break_op (0);
2377     return;
2378   }
2379   n = lookup_neighbour (peer);
2380   if (NULL == n)
2381   {
2382     send_disconnect (peer, address,
2383                      session);
2384     GNUNET_break (0);
2385     return;
2386   }
2387   if (S_CONNECTED == n->state)
2388     return;
2389   if (!is_connecting(n))
2390   {
2391     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2392                               GNUNET_NO);
2393     return;
2394   }
2395   if (NULL != session)
2396     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2397                      "transport-ats",
2398                      "Giving ATS session %p of plugin %s for peer %s\n",
2399                      session, address->transport_name, GNUNET_i2s (peer));
2400   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2401   GNUNET_assert (n->address != NULL);
2402   change_state (n, S_CONNECTED);
2403
2404   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2405   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2406         n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2407   if (n->address_state == FRESH)
2408   {
2409     GST_validation_set_address_use (&n->id,
2410                                   n->address,
2411                                   n->session,
2412                                   GNUNET_YES);
2413     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2414     n->address_state = USED;
2415   }
2416   neighbours_connected++;
2417   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2418                             GNUNET_NO);
2419   
2420 #if DEBUG_TRANSPORT
2421   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2422               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2423               GNUNET_i2s (&n->id),
2424               GST_plugins_a2s (n->address), n->session,
2425               __LINE__);
2426 #endif
2427   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2428   send_outbound_quota(peer, n->bandwidth_out);
2429 }
2430
2431 struct BlackListCheckContext
2432 {
2433   struct GNUNET_ATS_Information *ats;
2434
2435   uint32_t ats_count;
2436
2437   struct Session *session;
2438
2439   struct GNUNET_HELLO_Address *address;
2440
2441   struct GNUNET_TIME_Absolute ts;
2442 };
2443
2444
2445 static void
2446 handle_connect_blacklist_cont (void *cls,
2447                                const struct GNUNET_PeerIdentity *peer,
2448                                int result)
2449 {
2450   struct NeighbourMapEntry *n;
2451   struct BlackListCheckContext *bcc = cls;
2452
2453 #if DEBUG_TRANSPORT
2454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2455               "Blacklist check due to CONNECT message: `%s'\n",
2456               GNUNET_i2s (peer),
2457               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2458 #endif
2459
2460   /* not allowed */
2461   if (GNUNET_OK != result)
2462   {
2463     GNUNET_HELLO_address_free (bcc->address);
2464     GNUNET_free (bcc);
2465     return;
2466   }
2467
2468   n = lookup_neighbour (peer);
2469   if (NULL == n)
2470     n = setup_neighbour (peer);
2471
2472   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2473   {
2474     if (NULL != bcc->session)
2475       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2476                        "transport-ats",
2477                        "Giving ATS session %p of address `%s' for peer %s\n",
2478                        bcc->session, 
2479                        GST_plugins_a2s (bcc->address),
2480                        GNUNET_i2s (peer));
2481     GNUNET_ATS_address_update (GST_ats, bcc->address,
2482                                bcc->session, bcc->ats, bcc->ats_count);
2483     n->connect_ts = bcc->ts;
2484   }
2485
2486   GNUNET_HELLO_address_free (bcc->address);
2487   GNUNET_free (bcc);
2488
2489   if (n->state != S_CONNECT_RECV)
2490     change_state (n, S_CONNECT_RECV);
2491
2492   /* Ask ATS for an address to connect via that address */
2493   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2494     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2495   n->ats_suggest =
2496       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2497                                     n);
2498   GNUNET_ATS_suggest_address (GST_ats, peer);
2499 }
2500
2501 /**
2502  * We received a 'SESSION_CONNECT' message from the other peer.
2503  * Consider switching to it.
2504  *
2505  * @param message possibly a 'struct SessionConnectMessage' (check format)
2506  * @param peer identity of the peer to switch the address for
2507  * @param address address of the other peer, NULL if other peer
2508  *                       connected to us
2509  * @param session session to use (or NULL)
2510  * @param ats performance data
2511  * @param ats_count number of entries in ats (excluding 0-termination)
2512  */
2513 void
2514 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2515                                const struct GNUNET_PeerIdentity *peer,
2516                                const struct GNUNET_HELLO_Address *address,
2517                                struct Session *session,
2518                                const struct GNUNET_ATS_Information *ats,
2519                                uint32_t ats_count)
2520 {
2521   const struct SessionConnectMessage *scm;
2522   struct NeighbourMapEntry *n;
2523   struct BlackListCheckContext *bcc = NULL;
2524
2525 #if DEBUG_TRANSPORT
2526   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2527               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2528 #endif
2529
2530   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2531   {
2532     GNUNET_break_op (0);
2533     return;
2534   }
2535
2536   scm = (const struct SessionConnectMessage *) message;
2537   GNUNET_break_op (ntohl (scm->reserved) == 0);
2538
2539   n = lookup_neighbour (peer);
2540   if ( (n != NULL) &&
2541        (S_CONNECTED == n->state) )
2542   {
2543     /* connected peer switches addresses */
2544     GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2545     return;
2546   }
2547
2548   /* we are not connected to this peer */
2549   /* do blacklist check */
2550   bcc =
2551       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2552                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2553   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2554   bcc->ats_count = ats_count + 1;
2555   bcc->address = GNUNET_HELLO_address_copy (address);
2556   bcc->session = session;
2557   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2558   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2559   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2560   bcc->ats[ats_count].value = htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2561   GST_blacklist_test_allowed (peer, address->transport_name, handle_connect_blacklist_cont,
2562                               bcc);
2563 }
2564
2565
2566 /* end of file gnunet-service-transport_neighbours.c */