-makefile for new test_stream_local (commented)
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
61
62 #define FAST_RECONNECT_RATE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100)
63
64 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
65
66 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
67
68 #define  TEST_NEW_CODE GNUNET_NO
69
70 /**
71  * Entry in neighbours.
72  */
73 struct NeighbourMapEntry;
74
75 GNUNET_NETWORK_STRUCT_BEGIN
76
77 /**
78  * Message a peer sends to another to indicate its
79  * preference for communicating via a particular
80  * session (and the desire to establish a real
81  * connection).
82  */
83 struct SessionConnectMessage
84 {
85   /**
86    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
87    */
88   struct GNUNET_MessageHeader header;
89
90   /**
91    * Always zero.
92    */
93   uint32_t reserved GNUNET_PACKED;
94
95   /**
96    * Absolute time at the sender.  Only the most recent connect
97    * message implies which session is preferred by the sender.
98    */
99   struct GNUNET_TIME_AbsoluteNBO timestamp;
100
101 };
102
103
104 struct SessionDisconnectMessage
105 {
106   /**
107    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
108    */
109   struct GNUNET_MessageHeader header;
110
111   /**
112    * Always zero.
113    */
114   uint32_t reserved GNUNET_PACKED;
115
116   /**
117    * Purpose of the signature.  Extends over the timestamp.
118    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
119    */
120   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
121
122   /**
123    * Absolute time at the sender.  Only the most recent connect
124    * message implies which session is preferred by the sender.
125    */
126   struct GNUNET_TIME_AbsoluteNBO timestamp;
127
128   /**
129    * Public key of the sender.
130    */
131   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
132
133   /**
134    * Signature of the peer that sends us the disconnect.  Only
135    * valid if the timestamp is AFTER the timestamp from the
136    * corresponding 'CONNECT' message.
137    */
138   struct GNUNET_CRYPTO_RsaSignature signature;
139
140 };
141 GNUNET_NETWORK_STRUCT_END
142
143 /**
144  * For each neighbour we keep a list of messages
145  * that we still want to transmit to the neighbour.
146  */
147 struct MessageQueue
148 {
149
150   /**
151    * This is a doubly linked list.
152    */
153   struct MessageQueue *next;
154
155   /**
156    * This is a doubly linked list.
157    */
158   struct MessageQueue *prev;
159
160   /**
161    * Once this message is actively being transmitted, which
162    * neighbour is it associated with?
163    */
164   struct NeighbourMapEntry *n;
165
166   /**
167    * Function to call once we're done.
168    */
169   GST_NeighbourSendContinuation cont;
170
171   /**
172    * Closure for 'cont'
173    */
174   void *cont_cls;
175
176   /**
177    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
178    * stuck together in memory.  Allocated at the end of this struct.
179    */
180   const char *message_buf;
181
182   /**
183    * Size of the message buf
184    */
185   size_t message_buf_size;
186
187   /**
188    * At what time should we fail?
189    */
190   struct GNUNET_TIME_Absolute timeout;
191
192 };
193
194
195 enum State
196 {
197   /**
198    * fresh peer or completely disconnected
199    */
200   S_NOT_CONNECTED,
201
202   /**
203    * sent CONNECT message to other peer, waiting for CONNECT_ACK
204    */
205   S_CONNECT_SENT,
206
207   /**
208    * received CONNECT message to other peer, sending CONNECT_ACK
209    */
210   S_CONNECT_RECV,
211
212   /**
213    * received ACK or payload
214    */
215   S_CONNECTED,
216
217   /**
218    * connection ended, fast reconnect
219    */
220   S_FAST_RECONNECT,
221
222   /**
223    * Disconnect in progress
224    */
225   S_DISCONNECT
226 };
227
228 enum Address_State
229 {
230   USED,
231   UNUSED,
232   FRESH,
233 };
234
235
236 /**
237  * Entry in neighbours.
238  */
239 struct NeighbourMapEntry
240 {
241
242   /**
243    * Head of list of messages we would like to send to this peer;
244    * must contain at most one message per client.
245    */
246   struct MessageQueue *messages_head;
247
248   /**
249    * Tail of list of messages we would like to send to this peer; must
250    * contain at most one message per client.
251    */
252   struct MessageQueue *messages_tail;
253
254   /**
255    * Are we currently trying to send a message? If so, which one?
256    */
257   struct MessageQueue *is_active;
258
259   /**
260    * Active session for communicating with the peer.
261    */
262   struct Session *session;
263
264   /**
265    * Address we currently use.
266    */
267   struct GNUNET_HELLO_Address *address;
268
269   /**
270    * Address we currently use.
271    */
272   struct GNUNET_HELLO_Address *fast_reconnect_address;
273
274
275   /**
276    * Identity of this neighbour.
277    */
278   struct GNUNET_PeerIdentity id;
279
280   /**
281    * ID of task scheduled to run when this peer is about to
282    * time out (will free resources associated with the peer).
283    */
284   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
285
286   /**
287    * ID of task scheduled to send keepalives.
288    */
289   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
290
291   /**
292    * ID of task scheduled to run when we should try transmitting
293    * the head of the message queue.
294    */
295   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
296
297   /**
298    * Tracker for inbound bandwidth.
299    */
300   struct GNUNET_BANDWIDTH_Tracker in_tracker;
301
302   /**
303    * Inbound bandwidth from ATS, activated when connection is up
304    */
305   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
306
307   /**
308    * Inbound bandwidth from ATS, activated when connection is up
309    */
310   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
311
312   /**
313    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
314    */
315   struct GNUNET_TIME_Absolute connect_ts;
316
317   /**
318    * When did we sent the last keep-alive message?
319    */
320   struct GNUNET_TIME_Absolute keep_alive_sent;
321
322   /**
323    * Latest calculated latency value
324    */
325   struct GNUNET_TIME_Relative latency;
326
327   /**
328    * Timeout for ATS
329    * We asked ATS for a new address for this peer
330    */
331   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
332
333   /**
334    * Delay for ATS request
335    */
336   GNUNET_SCHEDULER_TaskIdentifier ats_request;
337
338   /**
339    * Task the resets the peer state after due to an pending
340    * unsuccessful connection setup
341    */
342   GNUNET_SCHEDULER_TaskIdentifier state_reset;
343
344
345   /**
346    * How often has the other peer (recently) violated the inbound
347    * traffic limit?  Incremented by 10 per violation, decremented by 1
348    * per non-violation (for each time interval).
349    */
350   unsigned int quota_violation_count;
351
352
353   /**
354    * The current state of the peer
355    * Element of enum State
356    */
357   int state;
358
359   /**
360    * Did we sent an KEEP_ALIVE message and are we expecting a response?
361    */
362   int expect_latency_response;
363
364   /**
365    * Was the address used successfully
366    */
367   int address_state;
368
369   /**
370    * Fast reconnect attempts for identical address
371    */
372   unsigned int fast_reconnect_attempts;
373 };
374
375
376 /**
377  * All known neighbours and their HELLOs.
378  */
379 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
380
381 /**
382  * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb
383  */
384 static void *callback_cls;
385
386 /**
387  * Function to call when we connected to a neighbour.
388  */
389 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
390
391 /**
392  * Function to call when we disconnected from a neighbour.
393  */
394 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
395
396 /**
397  * Function to call when we changed an active address of a neighbour.
398  */
399 static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb;
400
401 /**
402  * counter for connected neighbours
403  */
404 static int neighbours_connected;
405
406 /**
407  * Lookup a neighbour entry in the neighbours hash map.
408  *
409  * @param pid identity of the peer to look up
410  * @return the entry, NULL if there is no existing record
411  */
412 static struct NeighbourMapEntry *
413 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
414 {
415   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
416 }
417
418 /**
419  * Disconnect from the given neighbour, clean up the record.
420  *
421  * @param n neighbour to disconnect from
422  */
423 static void
424 disconnect_neighbour (struct NeighbourMapEntry *n);
425
426 #define change_state(n, state, ...) change (n, state, __LINE__)
427
428 static int
429 is_connecting (struct NeighbourMapEntry *n)
430 {
431   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
432     return GNUNET_YES;
433   return GNUNET_NO;
434 }
435
436 static int
437 is_connected (struct NeighbourMapEntry *n)
438 {
439   if (n->state == S_CONNECTED)
440     return GNUNET_YES;
441   return GNUNET_NO;
442 }
443
444 static int
445 is_disconnecting (struct NeighbourMapEntry *n)
446 {
447   if (n->state == S_DISCONNECT)
448     return GNUNET_YES;
449   return GNUNET_NO;
450 }
451
452 static const char *
453 print_state (int state)
454 {
455   switch (state)
456   {
457   case S_CONNECTED:
458     return "S_CONNECTED";
459     break;
460   case S_CONNECT_RECV:
461     return "S_CONNECT_RECV";
462     break;
463   case S_CONNECT_SENT:
464     return "S_CONNECT_SENT";
465     break;
466   case S_DISCONNECT:
467     return "S_DISCONNECT";
468     break;
469   case S_NOT_CONNECTED:
470     return "S_NOT_CONNECTED";
471     break;
472   case S_FAST_RECONNECT:
473     return "S_FAST_RECONNECT";
474     break;
475   default:
476     GNUNET_break (0);
477     break;
478   }
479   return NULL;
480 }
481
482 static int
483 change (struct NeighbourMapEntry *n, int state, int line);
484
485 static void
486 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
487
488
489 static void
490 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
491 {
492   struct NeighbourMapEntry *n = cls;
493
494   if (n == NULL)
495     return;
496
497   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
498   if (n->state == S_CONNECTED)
499     return;
500
501 #if DEBUG_TRANSPORT
502   GNUNET_STATISTICS_update (GST_stats,
503                             gettext_noop
504                             ("# failed connection attempts due to timeout"), 1,
505                             GNUNET_NO);
506 #endif
507
508   /* resetting state */
509
510   if (n->state == S_FAST_RECONNECT)
511   {
512     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513                 "Fast reconnect time out, disconnecting peer `%s'\n",
514                 GNUNET_i2s (&n->id));
515     disconnect_neighbour(n);
516     return;
517   }
518
519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
520               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
521               GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECTED", __LINE__);
522
523   n->state = S_NOT_CONNECTED;
524
525   /* destroying address */
526   if (n->address != NULL)
527   {
528     GNUNET_assert (strlen (n->address->transport_name) > 0);
529     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
530   }
531
532   /* request new address */
533   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
534     GNUNET_SCHEDULER_cancel (n->ats_suggest);
535   n->ats_suggest =
536       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
537                                     n);
538   GNUNET_ATS_suggest_address (GST_ats, &n->id);
539 }
540
541 static int
542 change (struct NeighbourMapEntry *n, int state, int line)
543 {
544   int previous_state;
545   /* allowed transitions */
546   int allowed = GNUNET_NO;
547
548   previous_state = n->state;
549
550   switch (n->state)
551   {
552   case S_NOT_CONNECTED:
553     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
554         (state == S_DISCONNECT))
555       allowed = GNUNET_YES;
556     break;
557   case S_CONNECT_RECV:
558     allowed = GNUNET_YES;
559     break;
560   case S_CONNECT_SENT:
561     allowed = GNUNET_YES;
562     break;
563   case S_CONNECTED:
564     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
565       allowed = GNUNET_YES;
566     break;
567   case S_DISCONNECT:
568     break;
569   case S_FAST_RECONNECT:
570     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
571       allowed = GNUNET_YES;
572     break;
573   default:
574     GNUNET_break (0);
575     break;
576   }
577   if (allowed == GNUNET_NO)
578   {
579     char *old = GNUNET_strdup (print_state (n->state));
580     char *new = GNUNET_strdup (print_state (state));
581
582     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
583                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
584                 new, line);
585     GNUNET_break (0);
586     GNUNET_free (old);
587     GNUNET_free (new);
588     return GNUNET_SYSERR;
589   }
590   {
591     char *old = GNUNET_strdup (print_state (n->state));
592     char *new = GNUNET_strdup (print_state (state));
593
594     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
596                 GNUNET_i2s (&n->id), n, old, new, line);
597     GNUNET_free (old);
598     GNUNET_free (new);
599   }
600   n->state = state;
601
602   switch (n->state)
603   {
604   case S_FAST_RECONNECT:
605   case S_CONNECT_RECV:
606   case S_CONNECT_SENT:
607     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
608       GNUNET_SCHEDULER_cancel (n->state_reset);
609     n->state_reset =
610         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
611     break;
612   case S_CONNECTED:
613   case S_NOT_CONNECTED:
614   case S_DISCONNECT:
615     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
616     {
617 #if DEBUG_TRANSPORT
618       char *old = GNUNET_strdup (print_state (n->state));
619       char *new = GNUNET_strdup (print_state (state));
620
621       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
623                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
624       GNUNET_free (old);
625       GNUNET_free (new);
626 #endif
627       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
628       GNUNET_SCHEDULER_cancel (n->state_reset);
629       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
630     }
631     break;
632
633   default:
634     GNUNET_assert (0);
635   }
636
637   if (NULL != address_change_cb)
638   {
639     if (n->state == S_CONNECTED)
640       address_change_cb (callback_cls, &n->id, n->address);
641     else if (previous_state == S_CONNECTED)
642       address_change_cb (callback_cls, &n->id, NULL);
643   }
644
645   return GNUNET_OK;
646 }
647
648 static ssize_t
649 send_with_session (struct NeighbourMapEntry *n,
650                    const char *msgbuf, size_t msgbuf_size,
651                    uint32_t priority,
652                    struct GNUNET_TIME_Relative timeout,
653                    GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
654 {
655   struct GNUNET_TRANSPORT_PluginFunctions *papi;
656   size_t ret = GNUNET_SYSERR;
657
658   GNUNET_assert (n != NULL);
659   GNUNET_assert (n->session != NULL);
660
661   papi = GST_plugins_find (n->address->transport_name);
662   if (papi == NULL)
663   {
664     if (cont != NULL)
665       cont (cont_cls, &n->id, GNUNET_SYSERR);
666     return GNUNET_SYSERR;
667   }
668
669   ret = papi->send (papi->cls,
670                    n->session,
671                    msgbuf, msgbuf_size,
672                    0,
673                    timeout,
674                    cont, cont_cls);
675
676   if ((ret == -1) && (cont != NULL))
677       cont (cont_cls, &n->id, GNUNET_SYSERR);
678   return ret;
679 }
680
681 /**
682  * Task invoked to start a transmission to another peer.
683  *
684  * @param cls the 'struct NeighbourMapEntry'
685  * @param tc scheduler context
686  */
687 static void
688 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
689
690
691 /**
692  * We're done with our transmission attempt, continue processing.
693  *
694  * @param cls the 'struct MessageQueue' of the message
695  * @param receiver intended receiver
696  * @param success whether it worked or not
697  */
698 static void
699 transmit_send_continuation (void *cls,
700                             const struct GNUNET_PeerIdentity *receiver,
701                             int success)
702 {
703   struct MessageQueue *mq = cls;
704   struct NeighbourMapEntry *n;
705   struct NeighbourMapEntry *tmp;
706
707   tmp = lookup_neighbour (receiver);
708   n = mq->n;
709   if ((NULL != n) && (tmp != NULL) && (tmp == n))
710   {
711     GNUNET_assert (n->is_active == mq);
712     n->is_active = NULL;
713     if (success == GNUNET_YES)
714     {
715       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
716       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
717     }
718   }
719 #if DEBUG_TRANSPORT
720   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
721               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
722               (success == GNUNET_OK) ? "successful" : "FAILED");
723 #endif
724   if (NULL != mq->cont)
725     mq->cont (mq->cont_cls, success);
726   GNUNET_free (mq);
727 }
728
729
730 /**
731  * Check the ready list for the given neighbour and if a plugin is
732  * ready for transmission (and if we have a message), do so!
733  *
734  * @param n target peer for which to transmit
735  */
736 static void
737 try_transmission_to_peer (struct NeighbourMapEntry *n)
738 {
739   struct MessageQueue *mq;
740   struct GNUNET_TIME_Relative timeout;
741   ssize_t ret;
742
743   if (n->is_active != NULL)
744   {
745     GNUNET_break (0);
746     return;                     /* transmission already pending */
747   }
748   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
749   {
750     GNUNET_break (0);
751     return;                     /* currently waiting for bandwidth */
752   }
753   while (NULL != (mq = n->messages_head))
754   {
755     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
756     if (timeout.rel_value > 0)
757       break;
758     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
759     n->is_active = mq;
760     mq->n = n;
761     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
762   }
763   if (NULL == mq)
764     return;                     /* no more messages */
765
766   if (n->address == NULL)
767   {
768 #if DEBUG_TRANSPORT
769     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
770                 GNUNET_i2s (&n->id));
771 #endif
772     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
773     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
774     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
775     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
776     return;
777   }
778
779   if (GST_plugins_find (n->address->transport_name) == NULL)
780   {
781     GNUNET_break (0);
782     return;
783   }
784   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
785   n->is_active = mq;
786   mq->n = n;
787
788   if ((n->address->address_length == 0) && (n->session == NULL))
789   {
790     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
791                 GNUNET_i2s (&n->id));
792     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
793     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
794     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
795     return;
796   }
797
798   ret = send_with_session(n,
799               mq->message_buf, mq->message_buf_size,
800               0, timeout,
801               &transmit_send_continuation, mq);
802
803   if (ret == -1)
804   {
805     /* failure, but 'send' would not call continuation in this case,
806      * so we need to do it here! */
807     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
808   }
809
810 }
811
812
813 /**
814  * Task invoked to start a transmission to another peer.
815  *
816  * @param cls the 'struct NeighbourMapEntry'
817  * @param tc scheduler context
818  */
819 static void
820 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
821 {
822   struct NeighbourMapEntry *n = cls;
823
824   GNUNET_assert (NULL != lookup_neighbour (&n->id));
825   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
826   try_transmission_to_peer (n);
827 }
828
829
830 /**
831  * Initialize the neighbours subsystem.
832  *
833  * @param cls closure for callbacks
834  * @param connect_cb function to call if we connect to a peer
835  * @param disconnect_cb function to call if we disconnect from a peer
836  * @param peer_address_cb function to call if we change an active address
837  *                   of a neighbour
838  */
839 void
840 GST_neighbours_start (void *cls,
841                       GNUNET_TRANSPORT_NotifyConnect connect_cb,
842                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
843                       GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
844 {
845   callback_cls = cls;
846   connect_notify_cb = connect_cb;
847   disconnect_notify_cb = disconnect_cb;
848   address_change_cb = peer_address_cb;
849   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
850 }
851
852
853 static void
854 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
855                       int result)
856 {
857 #if DEBUG_TRANSPORT
858   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859               "Sending DISCONNECT message to peer `%4s': %i\n",
860               GNUNET_i2s (target), result);
861 #endif
862 }
863
864
865 static int
866 send_disconnect (struct NeighbourMapEntry * n)
867 {
868   size_t ret;
869   struct SessionDisconnectMessage disconnect_msg;
870
871 #if DEBUG_TRANSPORT
872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873               "Sending DISCONNECT message to peer `%4s'\n",
874               GNUNET_i2s (&n->id));
875 #endif
876
877   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
878   disconnect_msg.header.type =
879       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
880   disconnect_msg.reserved = htonl (0);
881   disconnect_msg.purpose.size =
882       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
883              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
884              sizeof (struct GNUNET_TIME_AbsoluteNBO));
885   disconnect_msg.purpose.purpose =
886       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
887   disconnect_msg.timestamp =
888       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
889   disconnect_msg.public_key = GST_my_public_key;
890   GNUNET_assert (GNUNET_OK ==
891                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
892                                          &disconnect_msg.purpose,
893                                          &disconnect_msg.signature));
894
895   ret = send_with_session (n,
896             (const char *) &disconnect_msg, sizeof (disconnect_msg),
897             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
898             &send_disconnect_cont, NULL);
899
900   if (ret == GNUNET_SYSERR)
901     return GNUNET_SYSERR;
902
903   GNUNET_STATISTICS_update (GST_stats,
904                             gettext_noop
905                             ("# peers disconnected due to external request"), 1,
906                             GNUNET_NO);
907   return GNUNET_OK;
908 }
909
910
911 /**
912  * Disconnect from the given neighbour, clean up the record.
913  *
914  * @param n neighbour to disconnect from
915  */
916 static void
917 disconnect_neighbour (struct NeighbourMapEntry *n)
918 {
919   struct MessageQueue *mq;
920   int previous_state;
921
922   previous_state = n->state;
923
924   if (is_disconnecting (n))
925     return;
926
927   /* send DISCONNECT MESSAGE */
928   if (previous_state == S_CONNECTED)
929   {
930     if (GNUNET_OK == send_disconnect (n))
931       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
932                   GNUNET_i2s (&n->id));
933     else
934       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935                   "Could not send DISCONNECT_MSG to `%s'\n",
936                   GNUNET_i2s (&n->id));
937   }
938
939   change_state (n, S_DISCONNECT);
940
941   if (previous_state == S_CONNECTED)
942   {
943     GNUNET_assert (NULL != n->address);
944     if (n->address_state == USED)
945     {
946       GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__);
947       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
948       n->address_state = UNUSED;
949     }
950   }
951
952   if (n->address != NULL)
953   {
954     struct GNUNET_TRANSPORT_PluginFunctions *papi;
955
956     papi = GST_plugins_find (n->address->transport_name);
957     if (papi != NULL)
958       papi->disconnect (papi->cls, &n->id);
959   }
960   while (NULL != (mq = n->messages_head))
961   {
962     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
963     if (NULL != mq->cont)
964       mq->cont (mq->cont_cls, GNUNET_SYSERR);
965     GNUNET_free (mq);
966   }
967   if (NULL != n->is_active)
968   {
969     n->is_active->n = NULL;
970     n->is_active = NULL;
971   }
972
973   switch (previous_state)
974   {
975   case S_CONNECTED:
976     GNUNET_assert (neighbours_connected > 0);
977     neighbours_connected--;
978     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
979     GNUNET_SCHEDULER_cancel (n->keepalive_task);
980     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
981     n->expect_latency_response = GNUNET_NO;
982     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
983                               GNUNET_NO);
984     disconnect_notify_cb (callback_cls, &n->id);
985     break;
986   case S_FAST_RECONNECT:
987     GNUNET_STATISTICS_update (GST_stats,
988                               gettext_noop ("# fast reconnects failed"), 1,
989                               GNUNET_NO);
990     disconnect_notify_cb (callback_cls, &n->id);
991     break;
992   default:
993     break;
994   }
995
996   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
997
998   GNUNET_assert (GNUNET_YES ==
999                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
1000                                                        &n->id.hashPubKey, n));
1001   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
1002   {
1003     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1004     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1005   }
1006   if (n->ats_request != GNUNET_SCHEDULER_NO_TASK)
1007   {
1008     GNUNET_SCHEDULER_cancel (n->ats_request);
1009     n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1010   }
1011
1012   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
1013   {
1014     GNUNET_SCHEDULER_cancel (n->timeout_task);
1015     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1016   }
1017   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
1018   {
1019     GNUNET_SCHEDULER_cancel (n->transmission_task);
1020     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
1021   }
1022   if (NULL != n->fast_reconnect_address)
1023   {
1024     GNUNET_HELLO_address_free (n->fast_reconnect_address);
1025     n->fast_reconnect_address = NULL;
1026   }
1027   if (NULL != n->address)
1028   {
1029     GNUNET_HELLO_address_free (n->address);
1030     n->address = NULL;
1031   }
1032   n->session = NULL;
1033   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
1034               GNUNET_i2s (&n->id), n);
1035   GNUNET_free (n);
1036 }
1037
1038
1039 /**
1040  * Peer has been idle for too long. Disconnect.
1041  *
1042  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1043  * @param tc scheduler context
1044  */
1045 static void
1046 neighbour_timeout_task (void *cls,
1047                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1048 {
1049   struct NeighbourMapEntry *n = cls;
1050
1051   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1052   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer`%4s' disconnected due to timeout\n",
1053               GNUNET_i2s (&n->id));
1054
1055   GNUNET_STATISTICS_update (GST_stats,
1056                             gettext_noop
1057                             ("# peers disconnected due to timeout"), 1,
1058                             GNUNET_NO);
1059   disconnect_neighbour (n);
1060 }
1061
1062
1063 /**
1064  * Send another keepalive message.
1065  *
1066  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1067  * @param tc scheduler context
1068  */
1069 static void
1070 neighbour_keepalive_task (void *cls,
1071                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1072 {
1073   struct NeighbourMapEntry *n = cls;
1074   struct GNUNET_MessageHeader m;
1075   int ret;
1076
1077   GNUNET_assert (S_CONNECTED == n->state);
1078   n->keepalive_task =
1079       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1080                                     &neighbour_keepalive_task, n);
1081
1082   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1083                             GNUNET_NO);
1084   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1085   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1086
1087   ret = send_with_session (n,
1088             (const void *) &m, sizeof (m),
1089             UINT32_MAX /* priority */ ,
1090             GNUNET_TIME_UNIT_FOREVER_REL,
1091             NULL, NULL);
1092
1093   n->expect_latency_response = GNUNET_NO;
1094   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1095   if (ret != GNUNET_SYSERR)
1096   {
1097     n->expect_latency_response = GNUNET_YES;
1098     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1099   }
1100
1101 }
1102
1103
1104 /**
1105  * Disconnect from the given neighbour.
1106  *
1107  * @param cls unused
1108  * @param key hash of neighbour's public key (not used)
1109  * @param value the 'struct NeighbourMapEntry' of the neighbour
1110  */
1111 static int
1112 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1113 {
1114   struct NeighbourMapEntry *n = value;
1115
1116 #if DEBUG_TRANSPORT
1117   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1118               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1119 #endif
1120   if (S_CONNECTED == n->state)
1121     GNUNET_STATISTICS_update (GST_stats,
1122                               gettext_noop
1123                               ("# peers disconnected due to global disconnect"),
1124                               1, GNUNET_NO);
1125   disconnect_neighbour (n);
1126   return GNUNET_OK;
1127 }
1128
1129
1130 static void
1131 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1132 {
1133   struct NeighbourMapEntry *n = cls;
1134
1135   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1136
1137   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138               "ATS did not suggested address to connect to peer `%s'\n",
1139               GNUNET_i2s (&n->id));
1140
1141   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# ATS address suggestions failed"), 1,
1142                             GNUNET_NO);
1143
1144
1145   disconnect_neighbour (n);
1146 }
1147
1148 /**
1149  * Cleanup the neighbours subsystem.
1150  */
1151 void
1152 GST_neighbours_stop ()
1153 {
1154   // This can happen during shutdown
1155   if (neighbours == NULL)
1156   {
1157     return;
1158   }
1159
1160   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1161                                          NULL);
1162   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1163 //  GNUNET_assert (neighbours_connected == 0);
1164   neighbours = NULL;
1165   callback_cls = NULL;
1166   connect_notify_cb = NULL;
1167   disconnect_notify_cb = NULL;
1168   address_change_cb = NULL;
1169 }
1170
1171 struct ContinutionContext
1172 {
1173   struct GNUNET_HELLO_Address *address;
1174
1175   struct Session *session;
1176 };
1177
1178 static void
1179 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1180                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1181 {
1182   struct QuotaSetMessage q_msg;
1183
1184 #if DEBUG_TRANSPORT
1185   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1187               ntohl (quota.value__), GNUNET_i2s (target));
1188 #endif
1189   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1190   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1191   q_msg.quota = quota;
1192   q_msg.peer = (*target);
1193   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1194 }
1195
1196 /**
1197  * We tried to send a SESSION_CONNECT message to another peer.  If this
1198  * succeeded, we change the state.  If it failed, we should tell
1199  * ATS to not use this address anymore (until it is re-validated).
1200  *
1201  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1202  * @param target peer to send the message to
1203  * @param success GNUNET_OK on success
1204  */
1205 static void
1206 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1207                            int success)
1208 {
1209   struct ContinutionContext *cc = cls;
1210   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1211
1212   if (GNUNET_YES != success)
1213   {
1214     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1215     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1216   }
1217   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1218   {
1219     GNUNET_HELLO_address_free (cc->address);
1220     GNUNET_free (cc);
1221     return;
1222   }
1223
1224   if ((GNUNET_YES == success) &&
1225       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1226   {
1227     change_state (n, S_CONNECT_SENT);
1228     GNUNET_HELLO_address_free (cc->address);
1229     GNUNET_free (cc);
1230     return;
1231   }
1232
1233   if ((GNUNET_NO == success) &&
1234       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1235   {
1236 #if DEBUG_TRANSPORT
1237     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1238                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1239                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1240 #endif
1241     change_state (n, S_NOT_CONNECTED);
1242     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1243       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1244     n->ats_suggest =
1245         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1246                                       n);
1247     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1248   }
1249   GNUNET_HELLO_address_free (cc->address);
1250   GNUNET_free (cc);
1251 }
1252
1253
1254 static void
1255 ats_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1256 {
1257   struct NeighbourMapEntry *n = cls;
1258   n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1259
1260   n->ats_suggest =
1261     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1262                                   n);
1263 }
1264
1265
1266 /**
1267  * We tried to switch addresses with an peer already connected. If it failed,
1268  * we should tell ATS to not use this address anymore (until it is re-validated).
1269  *
1270  * @param cls the 'struct NeighbourMapEntry'
1271  * @param target peer to send the message to
1272  * @param success GNUNET_OK on success
1273  */
1274 static void
1275 send_switch_address_continuation (void *cls,
1276                                   const struct GNUNET_PeerIdentity *target,
1277                                   int success)
1278 {
1279   struct ContinutionContext *cc = cls;
1280   struct NeighbourMapEntry *n;
1281
1282   if (neighbours == NULL)
1283   {
1284     GNUNET_HELLO_address_free (cc->address);
1285     GNUNET_free (cc);
1286     return;                     /* neighbour is going away */
1287   }
1288
1289   n = lookup_neighbour (&cc->address->peer);
1290   if ((n == NULL) || (is_disconnecting (n)))
1291   {
1292     GNUNET_HELLO_address_free (cc->address);
1293     GNUNET_free (cc);
1294     return;                     /* neighbour is going away */
1295   }
1296
1297   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1298   if (GNUNET_YES != success)
1299   {
1300
1301     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1302                 "Failed to switch connected peer `%s' in state %s to address '%s' session %X, asking ATS for new address \n",
1303                 GNUNET_i2s (&n->id), print_state(n->state), GST_plugins_a2s (cc->address), cc->session);
1304
1305     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1306     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1307
1308     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1309     {
1310       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1311       n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1312     }
1313
1314     if (n->state == S_FAST_RECONNECT)
1315     {
1316       if (GNUNET_SCHEDULER_NO_TASK == n->ats_request)
1317       {
1318         /* Throtteled ATS request for fast reconnect */
1319         struct GNUNET_TIME_Relative delay = GNUNET_TIME_relative_multiply(FAST_RECONNECT_RATE, n->fast_reconnect_attempts);
1320         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321                     "Fast reconnect attempt %u failed, delay ATS request for %llu ms\n", n->fast_reconnect_attempts, delay.rel_value);
1322         n->ats_request =
1323             GNUNET_SCHEDULER_add_delayed (delay,
1324                                           ats_request,
1325                                           n);
1326       }
1327     }
1328     else
1329     {
1330       /* Immediate ATS request for connected peers */
1331       n->ats_suggest =
1332         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1333                                       n);
1334       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1335     }
1336     GNUNET_HELLO_address_free (cc->address);
1337     GNUNET_free (cc);
1338     return;
1339   }
1340   /* Tell ATS that switching addresses was successful */
1341   switch (n->state)
1342   {
1343   case S_CONNECTED:
1344     if (n->address_state == FRESH)
1345     {
1346       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1347       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1348       if (cc->session != n->session)
1349         GNUNET_break (0);
1350       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1351       n->address_state = USED;
1352     }
1353     break;
1354   case S_FAST_RECONNECT:
1355 #if DEBUG_TRANSPORT
1356     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1357                 "Successful fast reconnect to peer `%s'\n",
1358                 GNUNET_i2s (&n->id));
1359 #endif
1360     change_state (n, S_CONNECTED);
1361     neighbours_connected++;
1362     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1363                               GNUNET_NO);
1364
1365     if (n->address_state == FRESH)
1366     {
1367       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1368       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1369       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1370       n->address_state = USED;
1371     }
1372
1373     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1374       n->keepalive_task =
1375           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1376
1377     /* Updating quotas */
1378     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1379     send_outbound_quota (target, n->bandwidth_out);
1380
1381   default:
1382     break;
1383   }
1384   GNUNET_HELLO_address_free (cc->address);
1385   GNUNET_free (cc);
1386 }
1387
1388
1389 /**
1390  * We tried to send a SESSION_CONNECT message to another peer.  If this
1391  * succeeded, we change the state.  If it failed, we should tell
1392  * ATS to not use this address anymore (until it is re-validated).
1393  *
1394  * @param cls the 'struct NeighbourMapEntry'
1395  * @param target peer to send the message to
1396  * @param success GNUNET_OK on success
1397  */
1398 static void
1399 send_connect_ack_continuation (void *cls,
1400                                const struct GNUNET_PeerIdentity *target,
1401                                int success)
1402 {
1403   struct ContinutionContext *cc = cls;
1404   struct NeighbourMapEntry *n;
1405
1406   if (neighbours == NULL)
1407   {
1408     GNUNET_HELLO_address_free (cc->address);
1409     GNUNET_free (cc);
1410     return;                     /* neighbour is going away */
1411   }
1412
1413   n = lookup_neighbour (&cc->address->peer);
1414   if ((n == NULL) || (is_disconnecting (n)))
1415   {
1416     GNUNET_HELLO_address_free (cc->address);
1417     GNUNET_free (cc);
1418     return;                     /* neighbour is going away */
1419   }
1420
1421   if (GNUNET_YES == success)
1422   {
1423     GNUNET_HELLO_address_free (cc->address);
1424     GNUNET_free (cc);
1425     return;                     /* sending successful */
1426   }
1427
1428   /* sending failed, ask for next address  */
1429 #if DEBUG_TRANSPORT
1430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1431               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1432               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1433 #endif
1434   change_state (n, S_NOT_CONNECTED);
1435   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1436   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1437
1438   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1439     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1440   n->ats_suggest =
1441       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1442                                     n);
1443   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1444   GNUNET_HELLO_address_free (cc->address);
1445   GNUNET_free (cc);
1446 }
1447
1448
1449 /**
1450  * For an existing neighbour record, set the active connection to
1451  * use the given address.
1452  *
1453  * @param peer identity of the peer to switch the address for
1454  * @param address address of the other peer, NULL if other peer
1455  *                       connected to us
1456  * @param session session to use (or NULL)
1457  * @param ats performance data
1458  * @param ats_count number of entries in ats
1459  * @param bandwidth_in inbound quota to be used when connection is up
1460  * @param bandwidth_out outbound quota to be used when connection is up
1461  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1462  *         connection is not up (yet)
1463  */
1464 int
1465 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
1466                                        const struct GNUNET_HELLO_Address
1467                                        *address,
1468                                        struct Session *session,
1469                                        const struct GNUNET_ATS_Information *ats,
1470                                        uint32_t ats_count,
1471                                        struct GNUNET_BANDWIDTH_Value32NBO
1472                                        bandwidth_in,
1473                                        struct GNUNET_BANDWIDTH_Value32NBO
1474                                        bandwidth_out)
1475 {
1476   struct NeighbourMapEntry *n;
1477   struct SessionConnectMessage connect_msg;
1478   struct ContinutionContext *cc;
1479   size_t msg_len;
1480   size_t ret;
1481
1482   if (neighbours == NULL)
1483   {
1484     /* This can happen during shutdown */
1485     return GNUNET_NO;
1486   }
1487   n = lookup_neighbour (peer);
1488   if (NULL == n)
1489     return GNUNET_NO;
1490   if (n->state == S_DISCONNECT)
1491   {
1492     /* We are disconnecting, nothing to do here */
1493     return GNUNET_NO;
1494   }
1495   GNUNET_assert (address->transport_name != NULL);
1496   if ((session == NULL) && (0 == address->address_length))
1497   {
1498     GNUNET_break_op (0);
1499     /* FIXME: is this actually possible? When does this happen? */
1500     if (strlen (address->transport_name) > 0)
1501       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1502     GNUNET_ATS_suggest_address (GST_ats, peer);
1503     return GNUNET_NO;
1504   }
1505
1506   /* checks successful and neighbour != NULL */
1507   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1508               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1509               (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
1510               session,
1511               GNUNET_i2s (peer),
1512               print_state (n->state));
1513
1514   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1515   {
1516     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1517     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1518   }
1519
1520
1521   if (n->state == S_FAST_RECONNECT)
1522   {
1523     /* Throtteled fast reconnect */
1524
1525     if (NULL == n->fast_reconnect_address)
1526     {
1527       n->fast_reconnect_address = GNUNET_HELLO_address_copy (address);
1528
1529     }
1530     else if (0 == GNUNET_HELLO_address_cmp(address, n->fast_reconnect_address))
1531     {
1532       n->fast_reconnect_attempts ++;
1533
1534       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1535                 "FAST RECONNECT to peer `%s' and address '%s' with identical address attempt %u\n",
1536                 GNUNET_i2s (&n->id), GST_plugins_a2s (address), n->fast_reconnect_attempts);
1537     }
1538   }
1539   else
1540   {
1541     n->fast_reconnect_attempts = 0;
1542   }
1543
1544   /* do not switch addresses just update quotas */
1545   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1546       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1547       (n->session == session))
1548   {
1549     n->bandwidth_in = bandwidth_in;
1550     n->bandwidth_out = bandwidth_out;
1551     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1552     send_outbound_quota (peer, n->bandwidth_out);
1553     return GNUNET_NO;
1554   }
1555   if (n->state == S_CONNECTED)
1556   {
1557     /* mark old address as no longer used */
1558     GNUNET_assert (NULL != n->address);
1559     if (n->address_state == USED)
1560     {
1561       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1562       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1563       n->address_state = UNUSED;
1564     }
1565   }
1566
1567   /* set new address */
1568   if (NULL != n->address)
1569     GNUNET_HELLO_address_free (n->address);
1570   n->address = GNUNET_HELLO_address_copy (address);
1571   n->address_state = FRESH;
1572   n->bandwidth_in = bandwidth_in;
1573   n->bandwidth_out = bandwidth_out;
1574   GNUNET_SCHEDULER_cancel (n->timeout_task);
1575   n->timeout_task =
1576       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1577                                     &neighbour_timeout_task, n);
1578
1579   if (NULL != address_change_cb && n->state == S_CONNECTED)
1580     address_change_cb (callback_cls, &n->id, n->address); 
1581
1582   /* Obtain an session for this address from plugin */
1583   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1584   papi = GST_plugins_find (address->transport_name);
1585
1586   if (papi == NULL)
1587   {
1588     /* we don't have the plugin for this address */
1589     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1590
1591     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1592       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1593     n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1594                                       ats_suggest_cancel,
1595                                       n);
1596     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1597     GNUNET_HELLO_address_free (n->address);
1598     n->address = NULL;
1599     n->session = NULL;
1600     return GNUNET_NO;
1601   }
1602
1603   if (session == NULL)
1604   {
1605     n->session = papi->get_session (papi->cls, address);
1606     /* Session could not be initiated */
1607     if (n->session == NULL)
1608     {
1609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1610                   "Failed to obtain new session %p for peer `%s' and  address '%s'\n",
1611                   n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1612
1613       GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1614
1615       if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1616         GNUNET_SCHEDULER_cancel (n->ats_suggest);
1617       n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1618                                         ats_suggest_cancel,
1619                                         n);
1620       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1621       GNUNET_HELLO_address_free (n->address);
1622       n->address = NULL;
1623       n->session = NULL;
1624       return GNUNET_NO;
1625     }
1626
1627     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1628                 "Obtained new session %p for peer `%s' and  address '%s'\n",
1629                  n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1630     /* Telling ATS about new session */
1631     GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0);
1632   }
1633   else
1634   {
1635     n->session = session;
1636     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1637                 "Using existing session %p for peer `%s' and  address '%s'\n",
1638                 n->session,
1639                 GNUNET_i2s (&n->id),
1640                 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>");
1641   }
1642
1643   switch (n->state)
1644   {
1645   case S_NOT_CONNECTED:
1646   case S_CONNECT_SENT:
1647     msg_len = sizeof (struct SessionConnectMessage);
1648     connect_msg.header.size = htons (msg_len);
1649     connect_msg.header.type =
1650         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1651     connect_msg.reserved = htonl (0);
1652     connect_msg.timestamp =
1653         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1654
1655     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1656     cc->session = n->session;
1657     cc->address = GNUNET_HELLO_address_copy (address);
1658
1659     ret = send_with_session (n,
1660       (const char *) &connect_msg, msg_len,
1661       UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1662       &send_connect_continuation, cc);
1663
1664     return GNUNET_NO;
1665   case S_CONNECT_RECV:
1666     /* We received a CONNECT message and asked ATS for an address */
1667     msg_len = sizeof (struct SessionConnectMessage);
1668     connect_msg.header.size = htons (msg_len);
1669     connect_msg.header.type =
1670         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1671     connect_msg.reserved = htonl (0);
1672     connect_msg.timestamp =
1673         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1674     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1675     cc->session = n->session;
1676     cc->address = GNUNET_HELLO_address_copy (address);
1677
1678     ret = send_with_session(n,
1679                             (const void *) &connect_msg, msg_len,
1680                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1681                             &send_connect_ack_continuation,
1682                             cc);
1683     return GNUNET_NO;
1684   case S_CONNECTED:
1685   case S_FAST_RECONNECT:
1686     /* connected peer is switching addresses or tries fast reconnect */
1687     msg_len = sizeof (struct SessionConnectMessage);
1688     connect_msg.header.size = htons (msg_len);
1689     connect_msg.header.type =
1690         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1691     connect_msg.reserved = htonl (0);
1692     connect_msg.timestamp =
1693         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1694     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1695     cc->session = n->session;
1696     cc->address = GNUNET_HELLO_address_copy (address);
1697     ret = send_with_session(n,
1698                             (const void *) &connect_msg, msg_len,
1699                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1700                             &send_switch_address_continuation, cc);
1701     if (ret == GNUNET_SYSERR)
1702     {
1703       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1704                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1705                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1706     }
1707     return GNUNET_NO;
1708   default:
1709     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1710                 "Invalid connection state to switch addresses %u \n", n->state);
1711     GNUNET_break_op (0);
1712     return GNUNET_NO;
1713   }
1714 }
1715
1716
1717 /**
1718  * Obtain current latency information for the given neighbour.
1719  *
1720  * @param peer
1721  * @return observed latency of the address, FOREVER if the address was
1722  *         never successfully validated
1723  */
1724 struct GNUNET_TIME_Relative
1725 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1726 {
1727   struct NeighbourMapEntry *n;
1728
1729   n = lookup_neighbour (peer);
1730   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1731     return GNUNET_TIME_UNIT_FOREVER_REL;
1732
1733   return n->latency;
1734 }
1735
1736 /**
1737  * Obtain current address information for the given neighbour.
1738  *
1739  * @param peer
1740  * @return address currently used
1741  */
1742 struct GNUNET_HELLO_Address *
1743 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1744 {
1745   struct NeighbourMapEntry *n;
1746
1747   n = lookup_neighbour (peer);
1748   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1749     return NULL;
1750
1751   return n->address;
1752 }
1753
1754
1755
1756 /**
1757  * Create an entry in the neighbour map for the given peer
1758  *
1759  * @param peer peer to create an entry for
1760  * @return new neighbour map entry
1761  */
1762 static struct NeighbourMapEntry *
1763 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1764 {
1765   struct NeighbourMapEntry *n;
1766
1767 #if DEBUG_TRANSPORT
1768   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1769               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1770 #endif
1771   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1772   n->id = *peer;
1773   n->state = S_NOT_CONNECTED;
1774   n->latency = GNUNET_TIME_relative_get_forever ();
1775   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1776                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1777                                  MAX_BANDWIDTH_CARRY_S);
1778   n->timeout_task =
1779       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1780                                     &neighbour_timeout_task, n);
1781   GNUNET_assert (GNUNET_OK ==
1782                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1783                                                     &n->id.hashPubKey, n,
1784                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1785   return n;
1786 }
1787
1788
1789 /**
1790  * Try to create a connection to the given target (eventually).
1791  *
1792  * @param target peer to try to connect to
1793  */
1794 void
1795 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1796 {
1797   struct NeighbourMapEntry *n;
1798
1799   // This can happen during shutdown
1800   if (neighbours == NULL)
1801   {
1802     return;
1803   }
1804 #if DEBUG_TRANSPORT
1805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1806               GNUNET_i2s (target));
1807 #endif
1808   if (0 ==
1809       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1810   {
1811     /* my own hello */
1812     return;
1813   }
1814   n = lookup_neighbour (target);
1815
1816   if (NULL != n)
1817   {
1818     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1819       return;                   /* already connecting or connected */
1820     if (is_disconnecting (n))
1821       change_state (n, S_NOT_CONNECTED);
1822   }
1823
1824
1825   if (n == NULL)
1826     n = setup_neighbour (target);
1827 #if DEBUG_TRANSPORT
1828   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1829               "Asking ATS for suggested address to connect to peer `%s'\n",
1830               GNUNET_i2s (&n->id));
1831 #endif
1832
1833   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1834 }
1835
1836 /**
1837  * Test if we're connected to the given peer.
1838  *
1839  * @param target peer to test
1840  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1841  */
1842 int
1843 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1844 {
1845   struct NeighbourMapEntry *n;
1846
1847   // This can happen during shutdown
1848   if (neighbours == NULL)
1849   {
1850     return GNUNET_NO;
1851   }
1852
1853   n = lookup_neighbour (target);
1854
1855   if ((NULL == n) || (S_CONNECTED != n->state))
1856     return GNUNET_NO;           /* not connected */
1857   return GNUNET_YES;
1858 }
1859
1860 /**
1861  * A session was terminated. Take note.
1862  *
1863  * @param peer identity of the peer where the session died
1864  * @param session session that is gone
1865  */
1866 void
1867 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1868                                    struct Session *session)
1869 {
1870   struct NeighbourMapEntry *n;
1871
1872   if (neighbours == NULL)
1873   {
1874     /* This can happen during shutdown */
1875     return;
1876   }
1877
1878 #if DEBUG_TRANSPORT
1879   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1880               session, GNUNET_i2s (peer));
1881 #endif
1882
1883   n = lookup_neighbour (peer);
1884   if (NULL == n)
1885     return;
1886   if (session != n->session)
1887     return;                     /* doesn't affect us */
1888   if (n->state == S_CONNECTED)
1889   {
1890     if (n->address_state == USED)
1891     {
1892       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1893       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1894       n->address_state = UNUSED;
1895
1896     }
1897   }
1898
1899   if (NULL != n->address)
1900   {
1901     GNUNET_HELLO_address_free (n->address);
1902     n->address = NULL;
1903   }
1904   n->session = NULL;
1905
1906   /* not connected anymore anyway, shouldn't matter */
1907   if (S_CONNECTED != n->state)
1908     return;
1909
1910   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1911   {
1912     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1913     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1914     n->expect_latency_response = GNUNET_NO;
1915   }
1916
1917   /* connected, try fast reconnect */
1918   /* statistics "transport" : "# peers connected" -= 1
1919    * neighbours_connected -= 1
1920    * BUT: no disconnect_cb to notify clients about disconnect
1921    */
1922
1923   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1924               GNUNET_i2s (peer));
1925
1926   GNUNET_assert (neighbours_connected > 0);
1927   change_state (n, S_FAST_RECONNECT);
1928   neighbours_connected--;
1929   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
1930                             GNUNET_NO);
1931
1932
1933   /* We are connected, so ask ATS to switch addresses */
1934   GNUNET_SCHEDULER_cancel (n->timeout_task);
1935   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1936                                     &neighbour_timeout_task, n);
1937   /* try QUICKLY to re-establish a connection, reduce timeout! */
1938   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1939     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1940   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1941                                     &ats_suggest_cancel,
1942                                     n);
1943   GNUNET_ATS_suggest_address (GST_ats, peer);
1944 }
1945
1946
1947 /**
1948  * Transmit a message to the given target using the active connection.
1949  *
1950  * @param target destination
1951  * @param msg message to send
1952  * @param msg_size number of bytes in msg
1953  * @param timeout when to fail with timeout
1954  * @param cont function to call when done
1955  * @param cont_cls closure for 'cont'
1956  */
1957 void
1958 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1959                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1960                      GST_NeighbourSendContinuation cont, void *cont_cls)
1961 {
1962   struct NeighbourMapEntry *n;
1963   struct MessageQueue *mq;
1964
1965   // This can happen during shutdown
1966   if (neighbours == NULL)
1967   {
1968     return;
1969   }
1970
1971   n = lookup_neighbour (target);
1972   if ((n == NULL) || (!is_connected (n)))
1973   {
1974     GNUNET_STATISTICS_update (GST_stats,
1975                               gettext_noop
1976                               ("# messages not sent (no such peer or not connected)"),
1977                               1, GNUNET_NO);
1978 #if DEBUG_TRANSPORT
1979     if (n == NULL)
1980       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981                   "Could not send message to peer `%s': unknown neighbour",
1982                   GNUNET_i2s (target));
1983     else if (!is_connected (n))
1984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1985                   "Could not send message to peer `%s': not connected\n",
1986                   GNUNET_i2s (target));
1987 #endif
1988     if (NULL != cont)
1989       cont (cont_cls, GNUNET_SYSERR);
1990     return;
1991   }
1992
1993   if ((n->session == NULL) && (n->address == NULL))
1994   {
1995     GNUNET_STATISTICS_update (GST_stats,
1996                               gettext_noop
1997                               ("# messages not sent (no such peer or not connected)"),
1998                               1, GNUNET_NO);
1999 #if DEBUG_TRANSPORT
2000     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2001                 "Could not send message to peer `%s': no address available\n",
2002                 GNUNET_i2s (target));
2003 #endif
2004
2005     if (NULL != cont)
2006       cont (cont_cls, GNUNET_SYSERR);
2007     return;
2008   }
2009
2010   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
2011   GNUNET_STATISTICS_update (GST_stats,
2012                             gettext_noop
2013                             ("# bytes in message queue for other peers"),
2014                             msg_size, GNUNET_NO);
2015   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
2016   mq->cont = cont;
2017   mq->cont_cls = cont_cls;
2018   /* FIXME: this memcpy can be up to 7% of our total runtime! */
2019   memcpy (&mq[1], msg, msg_size);
2020   mq->message_buf = (const char *) &mq[1];
2021   mq->message_buf_size = msg_size;
2022   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
2023   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
2024
2025   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
2026       (NULL == n->is_active))
2027     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
2028 }
2029
2030
2031 /**
2032  * We have received a message from the given sender.  How long should
2033  * we delay before receiving more?  (Also used to keep the peer marked
2034  * as live).
2035  *
2036  * @param sender sender of the message
2037  * @param size size of the message
2038  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
2039  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
2040  *                   GNUNET_SYSERR if the connection is not fully up yet
2041  * @return how long to wait before reading more from this sender
2042  */
2043 struct GNUNET_TIME_Relative
2044 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
2045                                         *sender, ssize_t size, int *do_forward)
2046 {
2047   struct NeighbourMapEntry *n;
2048   struct GNUNET_TIME_Relative ret;
2049
2050   // This can happen during shutdown
2051   if (neighbours == NULL)
2052   {
2053     return GNUNET_TIME_UNIT_FOREVER_REL;
2054   }
2055
2056   n = lookup_neighbour (sender);
2057   if (n == NULL)
2058   {
2059     GST_neighbours_try_connect (sender);
2060     n = lookup_neighbour (sender);
2061     if (NULL == n)
2062     {
2063       GNUNET_STATISTICS_update (GST_stats,
2064                                 gettext_noop
2065                                 ("# messages discarded due to lack of neighbour record"),
2066                                 1, GNUNET_NO);
2067       *do_forward = GNUNET_NO;
2068       return GNUNET_TIME_UNIT_ZERO;
2069     }
2070   }
2071   if (!is_connected (n))
2072   {
2073     *do_forward = GNUNET_SYSERR;
2074     return GNUNET_TIME_UNIT_ZERO;
2075   }
2076   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
2077   {
2078     n->quota_violation_count++;
2079 #if DEBUG_TRANSPORT
2080     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2081                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
2082                 n->in_tracker.available_bytes_per_s__,
2083                 n->quota_violation_count);
2084 #endif
2085     /* Discount 32k per violation */
2086     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
2087   }
2088   else
2089   {
2090     if (n->quota_violation_count > 0)
2091     {
2092       /* try to add 32k back */
2093       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
2094       n->quota_violation_count--;
2095     }
2096   }
2097   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2098   {
2099     GNUNET_STATISTICS_update (GST_stats,
2100                               gettext_noop
2101                               ("# bandwidth quota violations by other peers"),
2102                               1, GNUNET_NO);
2103     *do_forward = GNUNET_NO;
2104     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
2105   }
2106   *do_forward = GNUNET_YES;
2107   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
2108   if (ret.rel_value > 0)
2109   {
2110 #if DEBUG_TRANSPORT
2111     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2112                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
2113                 (unsigned long long) n->in_tracker.
2114                 consumption_since_last_update__,
2115                 (unsigned int) n->in_tracker.available_bytes_per_s__,
2116                 (unsigned long long) ret.rel_value);
2117 #endif
2118     GNUNET_STATISTICS_update (GST_stats,
2119                               gettext_noop ("# ms throttling suggested"),
2120                               (int64_t) ret.rel_value, GNUNET_NO);
2121   }
2122   return ret;
2123 }
2124
2125
2126 /**
2127  * Keep the connection to the given neighbour alive longer,
2128  * we received a KEEPALIVE (or equivalent).
2129  *
2130  * @param neighbour neighbour to keep alive
2131  */
2132 void
2133 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
2134 {
2135   struct NeighbourMapEntry *n;
2136
2137   // This can happen during shutdown
2138   if (neighbours == NULL)
2139   {
2140     return;
2141   }
2142
2143   n = lookup_neighbour (neighbour);
2144   if (NULL == n)
2145   {
2146     GNUNET_STATISTICS_update (GST_stats,
2147                               gettext_noop
2148                               ("# KEEPALIVE messages discarded (not connected)"),
2149                               1, GNUNET_NO);
2150     return;
2151   }
2152   GNUNET_SCHEDULER_cancel (n->timeout_task);
2153   n->timeout_task =
2154       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2155                                     &neighbour_timeout_task, n);
2156
2157   /* send reply to measure latency */
2158   if (S_CONNECTED != n->state)
2159     return;
2160
2161   struct GNUNET_MessageHeader m;
2162
2163   m.size = htons (sizeof (struct GNUNET_MessageHeader));
2164   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
2165
2166   send_with_session(n,
2167       (const void *) &m, sizeof (m),
2168       UINT32_MAX,
2169       GNUNET_TIME_UNIT_FOREVER_REL,
2170       NULL, NULL);
2171 }
2172
2173 /**
2174  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
2175  * to this peer
2176  *
2177  * @param neighbour neighbour to keep alive
2178  * @param ats performance data
2179  * @param ats_count number of entries in ats
2180  */
2181 void
2182 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
2183                                    const struct GNUNET_ATS_Information *ats,
2184                                    uint32_t ats_count)
2185 {
2186   struct NeighbourMapEntry *n;
2187   struct GNUNET_ATS_Information *ats_new;
2188   uint32_t latency;
2189
2190   if (neighbours == NULL)
2191   {
2192     // This can happen during shutdown
2193     return;
2194   }
2195
2196   n = lookup_neighbour (neighbour);
2197   if ((NULL == n) || (n->state != S_CONNECTED))
2198   {
2199     GNUNET_STATISTICS_update (GST_stats,
2200                               gettext_noop
2201                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2202                               1, GNUNET_NO);
2203     return;
2204   }
2205   if (n->expect_latency_response != GNUNET_YES)
2206   {
2207     GNUNET_STATISTICS_update (GST_stats,
2208                               gettext_noop
2209                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2210                               1, GNUNET_NO);
2211     return;
2212   }
2213   n->expect_latency_response = GNUNET_NO;
2214
2215   GNUNET_assert (n->keep_alive_sent.abs_value !=
2216                  GNUNET_TIME_absolute_get_zero ().abs_value);
2217   n->latency =
2218       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2219                                            GNUNET_TIME_absolute_get ());
2220 #if DEBUG_TRANSPORT
2221   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2222               GNUNET_i2s (&n->id), n->latency.rel_value);
2223 #endif
2224
2225
2226   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2227   {
2228     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2229   }
2230   else
2231   {
2232     ats_new =
2233         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2234                        (ats_count + 1));
2235     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2236
2237     /* add latency */
2238     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2239     if (n->latency.rel_value > UINT32_MAX)
2240       latency = UINT32_MAX;
2241     else
2242       latency = n->latency.rel_value;
2243     ats_new[ats_count].value = htonl (latency);
2244
2245     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2246                                ats_count + 1);
2247     GNUNET_free (ats_new);
2248   }
2249 }
2250
2251
2252 /**
2253  * Change the incoming quota for the given peer.
2254  *
2255  * @param neighbour identity of peer to change qutoa for
2256  * @param quota new quota
2257  */
2258 void
2259 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2260                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2261 {
2262   struct NeighbourMapEntry *n;
2263
2264   // This can happen during shutdown
2265   if (neighbours == NULL)
2266   {
2267     return;
2268   }
2269
2270   n = lookup_neighbour (neighbour);
2271   if (n == NULL)
2272   {
2273     GNUNET_STATISTICS_update (GST_stats,
2274                               gettext_noop
2275                               ("# SET QUOTA messages ignored (no such peer)"),
2276                               1, GNUNET_NO);
2277     return;
2278   }
2279 #if DEBUG_TRANSPORT
2280   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2281               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2282               ntohl (quota.value__), GNUNET_i2s (&n->id));
2283 #endif
2284   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2285   if (0 != ntohl (quota.value__))
2286     return;
2287 #if DEBUG_TRANSPORT
2288   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2289               GNUNET_i2s (&n->id), "SET_QUOTA");
2290 #endif
2291   if (is_connected (n))
2292     GNUNET_STATISTICS_update (GST_stats,
2293                               gettext_noop ("# disconnects due to quota of 0"),
2294                               1, GNUNET_NO);
2295   disconnect_neighbour (n);
2296 }
2297
2298
2299 /**
2300  * Closure for the neighbours_iterate function.
2301  */
2302 struct IteratorContext
2303 {
2304   /**
2305    * Function to call on each connected neighbour.
2306    */
2307   GST_NeighbourIterator cb;
2308
2309   /**
2310    * Closure for 'cb'.
2311    */
2312   void *cb_cls;
2313 };
2314
2315
2316 /**
2317  * Call the callback from the closure for each connected neighbour.
2318  *
2319  * @param cls the 'struct IteratorContext'
2320  * @param key the hash of the public key of the neighbour
2321  * @param value the 'struct NeighbourMapEntry'
2322  * @return GNUNET_OK (continue to iterate)
2323  */
2324 static int
2325 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2326 {
2327   struct IteratorContext *ic = cls;
2328   struct NeighbourMapEntry *n = value;
2329
2330   if (!is_connected (n))
2331     return GNUNET_OK;
2332
2333   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2334   return GNUNET_OK;
2335 }
2336
2337
2338 /**
2339  * Iterate over all connected neighbours.
2340  *
2341  * @param cb function to call
2342  * @param cb_cls closure for cb
2343  */
2344 void
2345 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2346 {
2347   struct IteratorContext ic;
2348
2349   // This can happen during shutdown
2350   if (neighbours == NULL)
2351   {
2352     return;
2353   }
2354
2355   ic.cb = cb;
2356   ic.cb_cls = cb_cls;
2357   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2358 }
2359
2360 /**
2361  * If we have an active connection to the given target, it must be shutdown.
2362  *
2363  * @param target peer to disconnect from
2364  */
2365 void
2366 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2367 {
2368   struct NeighbourMapEntry *n;
2369
2370   // This can happen during shutdown
2371   if (neighbours == NULL)
2372   {
2373     return;
2374   }
2375
2376   n = lookup_neighbour (target);
2377   if (NULL == n)
2378     return;                     /* not active */
2379   disconnect_neighbour (n);
2380 }
2381
2382
2383 /**
2384  * We received a disconnect message from the given peer,
2385  * validate and process.
2386  *
2387  * @param peer sender of the message
2388  * @param msg the disconnect message
2389  */
2390 void
2391 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2392                                           *peer,
2393                                           const struct GNUNET_MessageHeader
2394                                           *msg)
2395 {
2396   struct NeighbourMapEntry *n;
2397   const struct SessionDisconnectMessage *sdm;
2398   GNUNET_HashCode hc;
2399
2400 #if DEBUG_TRANSPORT
2401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2402               "Received DISCONNECT message from peer `%s'\n",
2403               GNUNET_i2s (peer));
2404 #endif
2405
2406   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2407   {
2408     // GNUNET_break_op (0);
2409     GNUNET_STATISTICS_update (GST_stats,
2410                               gettext_noop
2411                               ("# disconnect messages ignored (old format)"), 1,
2412                               GNUNET_NO);
2413     return;
2414   }
2415   sdm = (const struct SessionDisconnectMessage *) msg;
2416   n = lookup_neighbour (peer);
2417   if (NULL == n)
2418     return;                     /* gone already */
2419   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2420       n->connect_ts.abs_value)
2421   {
2422     GNUNET_STATISTICS_update (GST_stats,
2423                               gettext_noop
2424                               ("# disconnect messages ignored (timestamp)"), 1,
2425                               GNUNET_NO);
2426     return;
2427   }
2428   GNUNET_CRYPTO_hash (&sdm->public_key,
2429                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2430                       &hc);
2431   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2432   {
2433     GNUNET_break_op (0);
2434     return;
2435   }
2436   if (ntohl (sdm->purpose.size) !=
2437       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2438       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2439       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2440   {
2441     GNUNET_break_op (0);
2442     return;
2443   }
2444   if (GNUNET_OK !=
2445       GNUNET_CRYPTO_rsa_verify
2446       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2447        &sdm->signature, &sdm->public_key))
2448   {
2449     GNUNET_break_op (0);
2450     return;
2451   }
2452   GST_neighbours_force_disconnect (peer);
2453 }
2454
2455
2456 /**
2457  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2458  * Consider switching to it.
2459  *
2460  * @param message possibly a 'struct SessionConnectMessage' (check format)
2461  * @param peer identity of the peer to switch the address for
2462  * @param address address of the other peer, NULL if other peer
2463  *                       connected to us
2464  * @param session session to use (or NULL)
2465  * @param ats performance data
2466  * @param ats_count number of entries in ats
2467  */
2468 void
2469 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2470                                    const struct GNUNET_PeerIdentity *peer,
2471                                    const struct GNUNET_HELLO_Address *address,
2472                                    struct Session *session,
2473                                    const struct GNUNET_ATS_Information *ats,
2474                                    uint32_t ats_count)
2475 {
2476   const struct SessionConnectMessage *scm;
2477   struct GNUNET_MessageHeader msg;
2478   struct NeighbourMapEntry *n;
2479   size_t msg_len;
2480   size_t ret;
2481
2482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2483               "Received CONNECT_ACK message from peer `%s'\n",
2484               GNUNET_i2s (peer));
2485
2486   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2487   {
2488     GNUNET_break_op (0);
2489     return;
2490   }
2491   scm = (const struct SessionConnectMessage *) message;
2492   GNUNET_break_op (ntohl (scm->reserved) == 0);
2493   n = lookup_neighbour (peer);
2494   if (NULL == n)
2495   {
2496     /* we did not send 'CONNECT' -- at least not recently */
2497     GNUNET_STATISTICS_update (GST_stats,
2498                               gettext_noop
2499                               ("# unexpected CONNECT_ACK messages (no peer)"),
2500                               1, GNUNET_NO);
2501     return;
2502   }
2503
2504   /* Additional check
2505    *
2506    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2507    *
2508    * We also received an CONNECT message, switched from SENDT to RECV and
2509    * ATS already suggested us an address after a successful blacklist check
2510    */
2511
2512   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2513               "Received CONNECT_ACK message from peer `%s' in state `%s'\n",
2514               GNUNET_i2s (peer),
2515               print_state(n->state));
2516
2517   if ((n->address != NULL) && (n->state == S_CONNECTED))
2518   {
2519     /* After fast reconnect: send ACK (ACK) even when we are connected */
2520     msg_len = sizeof (msg);
2521     msg.size = htons (msg_len);
2522     msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2523
2524     ret = send_with_session(n,
2525               (const char *) &msg, msg_len,
2526               UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2527               NULL, NULL);
2528
2529     if (ret == GNUNET_SYSERR)
2530       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2531                   "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2532                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2533     return;
2534   }
2535
2536   if ((n->state != S_CONNECT_SENT) &&
2537       ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2538   {
2539     GNUNET_STATISTICS_update (GST_stats,
2540                               gettext_noop
2541                               ("# unexpected CONNECT_ACK messages"), 1,
2542                               GNUNET_NO);
2543     return;
2544   }
2545   if (n->state != S_CONNECTED)
2546     change_state (n, S_CONNECTED);
2547
2548   if (NULL != session)
2549   {
2550     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2551                      "transport-ats",
2552                      "Giving ATS session %p of plugin %s for peer %s\n",
2553                      session, address->transport_name, GNUNET_i2s (peer));
2554   }
2555   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2556   GNUNET_assert (NULL != n->address);
2557
2558   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2559   {
2560     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2561     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2562     n->address_state = USED;
2563   }
2564
2565   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2566
2567   /* send ACK (ACK) */
2568   msg_len = sizeof (msg);
2569   msg.size = htons (msg_len);
2570   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2571
2572   ret = send_with_session(n,
2573             (const char *) &msg, msg_len,
2574             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2575             NULL, NULL);
2576
2577   if (ret == GNUNET_SYSERR)
2578     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2579                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2580                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2581
2582
2583   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2584     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2585
2586   neighbours_connected++;
2587   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2588                             GNUNET_NO);
2589 #if DEBUG_TRANSPORT
2590   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2591               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2592               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2593               __LINE__);
2594 #endif
2595   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2596   send_outbound_quota (peer, n->bandwidth_out);
2597
2598 }
2599
2600
2601 void
2602 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2603                            const struct GNUNET_PeerIdentity *peer,
2604                            const struct GNUNET_HELLO_Address *address,
2605                            struct Session *session,
2606                            const struct GNUNET_ATS_Information *ats,
2607                            uint32_t ats_count)
2608 {
2609   struct NeighbourMapEntry *n;
2610
2611 #if DEBUG_TRANSPORT
2612   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2613               GNUNET_i2s (peer));
2614 #endif
2615
2616   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2617   {
2618     GNUNET_break_op (0);
2619     return;
2620   }
2621   n = lookup_neighbour (peer);
2622   if (NULL == n)
2623   {
2624     GNUNET_break (0);
2625     return;
2626   }
2627   if (S_CONNECTED == n->state)
2628     return;
2629   if (!is_connecting (n))
2630   {
2631     GNUNET_STATISTICS_update (GST_stats,
2632                               gettext_noop ("# unexpected ACK messages"), 1,
2633                               GNUNET_NO);
2634     return;
2635   }
2636   change_state (n, S_CONNECTED);
2637   if (NULL != session)
2638     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2639                      "transport-ats",
2640                      "Giving ATS session %p of plugin %s for peer %s\n",
2641                      session, address->transport_name, GNUNET_i2s (peer));
2642   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2643   GNUNET_assert (n->address != NULL);
2644
2645   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2646   {
2647     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2648     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2649     n->address_state = USED;
2650   }
2651
2652
2653   neighbours_connected++;
2654   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2655                             GNUNET_NO);
2656
2657   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2658   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2659     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2660 #if DEBUG_TRANSPORT
2661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2662               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2663               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2664               __LINE__);
2665 #endif
2666   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2667   send_outbound_quota (peer, n->bandwidth_out);
2668 }
2669
2670 struct BlackListCheckContext
2671 {
2672   struct GNUNET_ATS_Information *ats;
2673
2674   uint32_t ats_count;
2675
2676   struct Session *session;
2677
2678   struct GNUNET_HELLO_Address *address;
2679
2680   struct GNUNET_TIME_Absolute ts;
2681 };
2682
2683
2684 static void
2685 handle_connect_blacklist_cont (void *cls,
2686                                const struct GNUNET_PeerIdentity *peer,
2687                                int result)
2688 {
2689   struct NeighbourMapEntry *n;
2690   struct BlackListCheckContext *bcc = cls;
2691
2692 #if DEBUG_TRANSPORT
2693   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2694               "Blacklist check due to CONNECT message: `%s'\n",
2695               GNUNET_i2s (peer),
2696               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2697 #endif
2698
2699   /* not allowed */
2700   if (GNUNET_OK != result)
2701   {
2702     GNUNET_HELLO_address_free (bcc->address);
2703     GNUNET_free (bcc);
2704     return;
2705   }
2706
2707   n = lookup_neighbour (peer);
2708   if (NULL == n)
2709     n = setup_neighbour (peer);
2710
2711   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2712   {
2713     if (NULL != bcc->session)
2714       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2715                        "transport-ats",
2716                        "Giving ATS session %p of address `%s' for peer %s\n",
2717                        bcc->session, GST_plugins_a2s (bcc->address),
2718                        GNUNET_i2s (peer));
2719     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2720
2721     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2722                                bcc->ats_count);
2723     n->connect_ts = bcc->ts;
2724   }
2725
2726   GNUNET_HELLO_address_free (bcc->address);
2727   GNUNET_free (bcc);
2728
2729   if (n->state != S_CONNECT_RECV)
2730     change_state (n, S_CONNECT_RECV);
2731
2732
2733   /* Ask ATS for an address to connect via that address */
2734   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2735     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2736   n->ats_suggest =
2737       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2738                                     n);
2739   GNUNET_ATS_suggest_address (GST_ats, peer);
2740 }
2741
2742 /**
2743  * We received a 'SESSION_CONNECT' message from the other peer.
2744  * Consider switching to it.
2745  *
2746  * @param message possibly a 'struct SessionConnectMessage' (check format)
2747  * @param peer identity of the peer to switch the address for
2748  * @param address address of the other peer, NULL if other peer
2749  *                       connected to us
2750  * @param session session to use (or NULL)
2751  * @param ats performance data
2752  * @param ats_count number of entries in ats (excluding 0-termination)
2753  */
2754 void
2755 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2756                                const struct GNUNET_PeerIdentity *peer,
2757                                const struct GNUNET_HELLO_Address *address,
2758                                struct Session *session,
2759                                const struct GNUNET_ATS_Information *ats,
2760                                uint32_t ats_count)
2761 {
2762   const struct SessionConnectMessage *scm;
2763   struct BlackListCheckContext *bcc = NULL;
2764   struct NeighbourMapEntry *n;
2765
2766 #if DEBUG_TRANSPORT
2767   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2768               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2769 #endif
2770
2771   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2772   {
2773     GNUNET_break_op (0);
2774     return;
2775   }
2776
2777   scm = (const struct SessionConnectMessage *) message;
2778   GNUNET_break_op (ntohl (scm->reserved) == 0);
2779
2780   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2781
2782   n = lookup_neighbour (peer);
2783   if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n->state)))
2784   {
2785     /* connected peer switches addresses or is trying to do a fast reconnect*/
2786     return;
2787   }
2788
2789
2790   /* we are not connected to this peer */
2791   /* do blacklist check */
2792   bcc =
2793       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2794                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2795   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2796   bcc->ats_count = ats_count + 1;
2797   bcc->address = GNUNET_HELLO_address_copy (address);
2798   bcc->session = session;
2799   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2800   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2801   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2802   bcc->ats[ats_count].value =
2803       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2804   GST_blacklist_test_allowed (peer, address->transport_name,
2805                               &handle_connect_blacklist_cont, bcc);
2806 }
2807
2808
2809 /* end of file gnunet-service-transport_neighbours.c */