-typo
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
61
62 #define FAST_RECONNECT_RATE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100)
63
64 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
65
66 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
67
68 /**
69  * Entry in neighbours.
70  */
71 struct NeighbourMapEntry;
72
73 GNUNET_NETWORK_STRUCT_BEGIN
74
75 /**
76  * Message a peer sends to another to indicate its
77  * preference for communicating via a particular
78  * session (and the desire to establish a real
79  * connection).
80  */
81 struct SessionConnectMessage
82 {
83   /**
84    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
85    */
86   struct GNUNET_MessageHeader header;
87
88   /**
89    * Always zero.
90    */
91   uint32_t reserved GNUNET_PACKED;
92
93   /**
94    * Absolute time at the sender.  Only the most recent connect
95    * message implies which session is preferred by the sender.
96    */
97   struct GNUNET_TIME_AbsoluteNBO timestamp;
98
99 };
100
101
102 struct SessionDisconnectMessage
103 {
104   /**
105    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
106    */
107   struct GNUNET_MessageHeader header;
108
109   /**
110    * Always zero.
111    */
112   uint32_t reserved GNUNET_PACKED;
113
114   /**
115    * Purpose of the signature.  Extends over the timestamp.
116    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
117    */
118   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
119
120   /**
121    * Absolute time at the sender.  Only the most recent connect
122    * message implies which session is preferred by the sender.
123    */
124   struct GNUNET_TIME_AbsoluteNBO timestamp;
125
126   /**
127    * Public key of the sender.
128    */
129   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
130
131   /**
132    * Signature of the peer that sends us the disconnect.  Only
133    * valid if the timestamp is AFTER the timestamp from the
134    * corresponding 'CONNECT' message.
135    */
136   struct GNUNET_CRYPTO_RsaSignature signature;
137
138 };
139 GNUNET_NETWORK_STRUCT_END
140
141 /**
142  * For each neighbour we keep a list of messages
143  * that we still want to transmit to the neighbour.
144  */
145 struct MessageQueue
146 {
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *next;
152
153   /**
154    * This is a doubly linked list.
155    */
156   struct MessageQueue *prev;
157
158   /**
159    * Once this message is actively being transmitted, which
160    * neighbour is it associated with?
161    */
162   struct NeighbourMapEntry *n;
163
164   /**
165    * Function to call once we're done.
166    */
167   GST_NeighbourSendContinuation cont;
168
169   /**
170    * Closure for 'cont'
171    */
172   void *cont_cls;
173
174   /**
175    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
176    * stuck together in memory.  Allocated at the end of this struct.
177    */
178   const char *message_buf;
179
180   /**
181    * Size of the message buf
182    */
183   size_t message_buf_size;
184
185   /**
186    * At what time should we fail?
187    */
188   struct GNUNET_TIME_Absolute timeout;
189
190 };
191
192
193 enum State
194 {
195   /**
196    * fresh peer or completely disconnected
197    */
198   S_NOT_CONNECTED,
199
200   /**
201    * sent CONNECT message to other peer, waiting for CONNECT_ACK
202    */
203   S_CONNECT_SENT,
204
205   /**
206    * received CONNECT message to other peer, sending CONNECT_ACK
207    */
208   S_CONNECT_RECV,
209
210   /**
211    * received ACK or payload
212    */
213   S_CONNECTED,
214
215   /**
216    * connection ended, fast reconnect
217    */
218   S_FAST_RECONNECT,
219
220   /**
221    * Disconnect in progress
222    */
223   S_DISCONNECT
224 };
225
226 enum Address_State
227 {
228   USED,
229   UNUSED,
230   FRESH,
231 };
232
233
234 /**
235  * Entry in neighbours.
236  */
237 struct NeighbourMapEntry
238 {
239
240   /**
241    * Head of list of messages we would like to send to this peer;
242    * must contain at most one message per client.
243    */
244   struct MessageQueue *messages_head;
245
246   /**
247    * Tail of list of messages we would like to send to this peer; must
248    * contain at most one message per client.
249    */
250   struct MessageQueue *messages_tail;
251
252   /**
253    * Are we currently trying to send a message? If so, which one?
254    */
255   struct MessageQueue *is_active;
256
257   /**
258    * Active session for communicating with the peer.
259    */
260   struct Session *session;
261
262   /**
263    * Address we currently use.
264    */
265   struct GNUNET_HELLO_Address *address;
266
267   /**
268    * Address we currently use.
269    */
270   struct GNUNET_HELLO_Address *fast_reconnect_address;
271
272
273   /**
274    * Identity of this neighbour.
275    */
276   struct GNUNET_PeerIdentity id;
277
278   /**
279    * ID of task scheduled to run when this peer is about to
280    * time out (will free resources associated with the peer).
281    */
282   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
283
284   /**
285    * ID of task scheduled to send keepalives.
286    */
287   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
288
289   /**
290    * ID of task scheduled to run when we should try transmitting
291    * the head of the message queue.
292    */
293   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
294
295   /**
296    * Tracker for inbound bandwidth.
297    */
298   struct GNUNET_BANDWIDTH_Tracker in_tracker;
299
300   /**
301    * Inbound bandwidth from ATS, activated when connection is up
302    */
303   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
304
305   /**
306    * Inbound bandwidth from ATS, activated when connection is up
307    */
308   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
309
310   /**
311    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
312    */
313   struct GNUNET_TIME_Absolute connect_ts;
314
315   /**
316    * When did we sent the last keep-alive message?
317    */
318   struct GNUNET_TIME_Absolute keep_alive_sent;
319
320   /**
321    * Latest calculated latency value
322    */
323   struct GNUNET_TIME_Relative latency;
324
325   /**
326    * Timeout for ATS
327    * We asked ATS for a new address for this peer
328    */
329   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
330
331   /**
332    * Delay for ATS request
333    */
334   GNUNET_SCHEDULER_TaskIdentifier ats_request;
335
336   /**
337    * Task the resets the peer state after due to an pending
338    * unsuccessful connection setup
339    */
340   GNUNET_SCHEDULER_TaskIdentifier state_reset;
341
342
343   /**
344    * How often has the other peer (recently) violated the inbound
345    * traffic limit?  Incremented by 10 per violation, decremented by 1
346    * per non-violation (for each time interval).
347    */
348   unsigned int quota_violation_count;
349
350
351   /**
352    * The current state of the peer
353    * Element of enum State
354    */
355   int state;
356
357   /**
358    * Did we sent an KEEP_ALIVE message and are we expecting a response?
359    */
360   int expect_latency_response;
361
362   /**
363    * Was the address used successfully
364    */
365   int address_state;
366
367   /**
368    * Fast reconnect attempts for identical address
369    */
370   unsigned int fast_reconnect_attempts;
371 };
372
373
374 /**
375  * All known neighbours and their HELLOs.
376  */
377 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
378
379 /**
380  * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb
381  */
382 static void *callback_cls;
383
384 /**
385  * Function to call when we connected to a neighbour.
386  */
387 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
388
389 /**
390  * Function to call when we disconnected from a neighbour.
391  */
392 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
393
394 /**
395  * Function to call when we changed an active address of a neighbour.
396  */
397 static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb;
398
399 /**
400  * counter for connected neighbours
401  */
402 static int neighbours_connected;
403
404 /**
405  * Lookup a neighbour entry in the neighbours hash map.
406  *
407  * @param pid identity of the peer to look up
408  * @return the entry, NULL if there is no existing record
409  */
410 static struct NeighbourMapEntry *
411 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
412 {
413   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
414 }
415
416 /**
417  * Disconnect from the given neighbour, clean up the record.
418  *
419  * @param n neighbour to disconnect from
420  */
421 static void
422 disconnect_neighbour (struct NeighbourMapEntry *n);
423
424 #define change_state(n, state, ...) change (n, state, __LINE__)
425
426 static int
427 is_connecting (struct NeighbourMapEntry *n)
428 {
429   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
430     return GNUNET_YES;
431   return GNUNET_NO;
432 }
433
434 static int
435 is_connected (struct NeighbourMapEntry *n)
436 {
437   if (n->state == S_CONNECTED)
438     return GNUNET_YES;
439   return GNUNET_NO;
440 }
441
442 static int
443 is_disconnecting (struct NeighbourMapEntry *n)
444 {
445   if (n->state == S_DISCONNECT)
446     return GNUNET_YES;
447   return GNUNET_NO;
448 }
449
450 static const char *
451 print_state (int state)
452 {
453   switch (state)
454   {
455   case S_CONNECTED:
456     return "S_CONNECTED";
457     break;
458   case S_CONNECT_RECV:
459     return "S_CONNECT_RECV";
460     break;
461   case S_CONNECT_SENT:
462     return "S_CONNECT_SENT";
463     break;
464   case S_DISCONNECT:
465     return "S_DISCONNECT";
466     break;
467   case S_NOT_CONNECTED:
468     return "S_NOT_CONNECTED";
469     break;
470   case S_FAST_RECONNECT:
471     return "S_FAST_RECONNECT";
472     break;
473   default:
474     GNUNET_break (0);
475     break;
476   }
477   return NULL;
478 }
479
480 static int
481 change (struct NeighbourMapEntry *n, int state, int line);
482
483 static void
484 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
485
486
487 static void
488 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
489 {
490   struct NeighbourMapEntry *n = cls;
491
492   if (n == NULL)
493     return;
494
495   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
496   if (n->state == S_CONNECTED)
497     return;
498   GNUNET_STATISTICS_update (GST_stats,
499                             gettext_noop
500                             ("# failed connection attempts due to timeout"), 1,
501                             GNUNET_NO);
502   /* resetting state */
503
504   if (n->state == S_FAST_RECONNECT)
505   {
506     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507                 "Fast reconnect time out, disconnecting peer `%s'\n",
508                 GNUNET_i2s (&n->id));
509     disconnect_neighbour(n);
510     return;
511   }
512
513   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
515               GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECTED", __LINE__);
516
517   n->state = S_NOT_CONNECTED;
518
519   /* destroying address */
520   if (n->address != NULL)
521   {
522     GNUNET_assert (strlen (n->address->transport_name) > 0);
523     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
524   }
525
526   /* request new address */
527   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
528     GNUNET_SCHEDULER_cancel (n->ats_suggest);
529   n->ats_suggest =
530       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
531                                     n);
532   GNUNET_ATS_suggest_address (GST_ats, &n->id);
533 }
534
535 static int
536 change (struct NeighbourMapEntry *n, int state, int line)
537 {
538   int previous_state;
539   /* allowed transitions */
540   int allowed = GNUNET_NO;
541
542   previous_state = n->state;
543
544   switch (n->state)
545   {
546   case S_NOT_CONNECTED:
547     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
548         (state == S_DISCONNECT))
549       allowed = GNUNET_YES;
550     break;
551   case S_CONNECT_RECV:
552     allowed = GNUNET_YES;
553     break;
554   case S_CONNECT_SENT:
555     allowed = GNUNET_YES;
556     break;
557   case S_CONNECTED:
558     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
559       allowed = GNUNET_YES;
560     break;
561   case S_DISCONNECT:
562     break;
563   case S_FAST_RECONNECT:
564     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
565       allowed = GNUNET_YES;
566     break;
567   default:
568     GNUNET_break (0);
569     break;
570   }
571   if (allowed == GNUNET_NO)
572   {
573     char *old = GNUNET_strdup (print_state (n->state));
574     char *new = GNUNET_strdup (print_state (state));
575
576     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
577                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
578                 new, line);
579     GNUNET_break (0);
580     GNUNET_free (old);
581     GNUNET_free (new);
582     return GNUNET_SYSERR;
583   }
584   {
585     char *old = GNUNET_strdup (print_state (n->state));
586     char *new = GNUNET_strdup (print_state (state));
587
588     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
589                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
590                 GNUNET_i2s (&n->id), n, old, new, line);
591     GNUNET_free (old);
592     GNUNET_free (new);
593   }
594   n->state = state;
595
596   switch (n->state)
597   {
598   case S_FAST_RECONNECT:
599   case S_CONNECT_RECV:
600   case S_CONNECT_SENT:
601     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
602       GNUNET_SCHEDULER_cancel (n->state_reset);
603     n->state_reset =
604         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
605     break;
606   case S_CONNECTED:
607   case S_NOT_CONNECTED:
608   case S_DISCONNECT:
609     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
610     {
611 #if DEBUG_TRANSPORT
612       char *old = GNUNET_strdup (print_state (n->state));
613       char *new = GNUNET_strdup (print_state (state));
614
615       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
617                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
618       GNUNET_free (old);
619       GNUNET_free (new);
620 #endif
621       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
622       GNUNET_SCHEDULER_cancel (n->state_reset);
623       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
624     }
625     break;
626
627   default:
628     GNUNET_assert (0);
629   }
630
631   if (NULL != address_change_cb)
632   {
633     if (n->state == S_CONNECTED)
634       address_change_cb (callback_cls, &n->id, n->address);
635     else if (previous_state == S_CONNECTED)
636       address_change_cb (callback_cls, &n->id, NULL);
637   }
638
639   return GNUNET_OK;
640 }
641
642 static ssize_t
643 send_with_session (struct NeighbourMapEntry *n,
644                    const char *msgbuf, size_t msgbuf_size,
645                    uint32_t priority,
646                    struct GNUNET_TIME_Relative timeout,
647                    GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
648 {
649   struct GNUNET_TRANSPORT_PluginFunctions *papi;
650   size_t ret = GNUNET_SYSERR;
651
652   GNUNET_assert (n != NULL);
653   GNUNET_assert (n->session != NULL);
654
655   papi = GST_plugins_find (n->address->transport_name);
656   if (papi == NULL)
657   {
658     if (cont != NULL)
659       cont (cont_cls, &n->id, GNUNET_SYSERR);
660     return GNUNET_SYSERR;
661   }
662
663   ret = papi->send (papi->cls,
664                    n->session,
665                    msgbuf, msgbuf_size,
666                    0,
667                    timeout,
668                    cont, cont_cls);
669
670   if ((ret == -1) && (cont != NULL))
671       cont (cont_cls, &n->id, GNUNET_SYSERR);
672   return ret;
673 }
674
675 /**
676  * Task invoked to start a transmission to another peer.
677  *
678  * @param cls the 'struct NeighbourMapEntry'
679  * @param tc scheduler context
680  */
681 static void
682 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
683
684
685 /**
686  * We're done with our transmission attempt, continue processing.
687  *
688  * @param cls the 'struct MessageQueue' of the message
689  * @param receiver intended receiver
690  * @param success whether it worked or not
691  */
692 static void
693 transmit_send_continuation (void *cls,
694                             const struct GNUNET_PeerIdentity *receiver,
695                             int success)
696 {
697   struct MessageQueue *mq = cls;
698   struct NeighbourMapEntry *n;
699   struct NeighbourMapEntry *tmp;
700
701   tmp = lookup_neighbour (receiver);
702   n = mq->n;
703   if ((NULL != n) && (tmp != NULL) && (tmp == n))
704   {
705     GNUNET_assert (n->is_active == mq);
706     n->is_active = NULL;
707     if (success == GNUNET_YES)
708     {
709       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
710       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
711     }
712   }
713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
714               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
715               (success == GNUNET_OK) ? "successful" : "FAILED");
716   if (NULL != mq->cont)
717     mq->cont (mq->cont_cls, success);
718   GNUNET_free (mq);
719 }
720
721
722 /**
723  * Check the ready list for the given neighbour and if a plugin is
724  * ready for transmission (and if we have a message), do so!
725  *
726  * @param n target peer for which to transmit
727  */
728 static void
729 try_transmission_to_peer (struct NeighbourMapEntry *n)
730 {
731   struct MessageQueue *mq;
732   struct GNUNET_TIME_Relative timeout;
733   ssize_t ret;
734
735   if (n->is_active != NULL)
736   {
737     GNUNET_break (0);
738     return;                     /* transmission already pending */
739   }
740   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
741   {
742     GNUNET_break (0);
743     return;                     /* currently waiting for bandwidth */
744   }
745   while (NULL != (mq = n->messages_head))
746   {
747     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
748     if (timeout.rel_value > 0)
749       break;
750     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
751     n->is_active = mq;
752     mq->n = n;
753     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
754   }
755   if (NULL == mq)
756     return;                     /* no more messages */
757
758   if (n->address == NULL)
759   {
760     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
761                 GNUNET_i2s (&n->id));
762     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
763     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
764     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
765     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
766     return;
767   }
768
769   if (GST_plugins_find (n->address->transport_name) == NULL)
770   {
771     GNUNET_break (0);
772     return;
773   }
774   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
775   n->is_active = mq;
776   mq->n = n;
777
778   if ((n->address->address_length == 0) && (n->session == NULL))
779   {
780     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
781                 GNUNET_i2s (&n->id));
782     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
783     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
784     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
785     return;
786   }
787
788   ret = send_with_session(n,
789               mq->message_buf, mq->message_buf_size,
790               0, timeout,
791               &transmit_send_continuation, mq);
792
793   if (ret == -1)
794   {
795     /* failure, but 'send' would not call continuation in this case,
796      * so we need to do it here! */
797     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
798   }
799
800 }
801
802
803 /**
804  * Task invoked to start a transmission to another peer.
805  *
806  * @param cls the 'struct NeighbourMapEntry'
807  * @param tc scheduler context
808  */
809 static void
810 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
811 {
812   struct NeighbourMapEntry *n = cls;
813
814   GNUNET_assert (NULL != lookup_neighbour (&n->id));
815   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
816   try_transmission_to_peer (n);
817 }
818
819
820 /**
821  * Initialize the neighbours subsystem.
822  *
823  * @param cls closure for callbacks
824  * @param connect_cb function to call if we connect to a peer
825  * @param disconnect_cb function to call if we disconnect from a peer
826  * @param peer_address_cb function to call if we change an active address
827  *                   of a neighbour
828  */
829 void
830 GST_neighbours_start (void *cls,
831                       GNUNET_TRANSPORT_NotifyConnect connect_cb,
832                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
833                       GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
834 {
835   callback_cls = cls;
836   connect_notify_cb = connect_cb;
837   disconnect_notify_cb = disconnect_cb;
838   address_change_cb = peer_address_cb;
839   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
840   neighbours_connected = 0;
841 }
842
843
844 static void
845 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
846                       int result)
847 {
848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849               "Sending DISCONNECT message to peer `%4s': %i\n",
850               GNUNET_i2s (target), result);
851 }
852
853
854 static int
855 send_disconnect (struct NeighbourMapEntry * n)
856 {
857   size_t ret;
858   struct SessionDisconnectMessage disconnect_msg;
859
860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861               "Sending DISCONNECT message to peer `%4s'\n",
862               GNUNET_i2s (&n->id));
863   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
864   disconnect_msg.header.type =
865       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
866   disconnect_msg.reserved = htonl (0);
867   disconnect_msg.purpose.size =
868       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
869              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
870              sizeof (struct GNUNET_TIME_AbsoluteNBO));
871   disconnect_msg.purpose.purpose =
872       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
873   disconnect_msg.timestamp =
874       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
875   disconnect_msg.public_key = GST_my_public_key;
876   GNUNET_assert (GNUNET_OK ==
877                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
878                                          &disconnect_msg.purpose,
879                                          &disconnect_msg.signature));
880
881   ret = send_with_session (n,
882             (const char *) &disconnect_msg, sizeof (disconnect_msg),
883             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
884             &send_disconnect_cont, NULL);
885
886   if (ret == GNUNET_SYSERR)
887     return GNUNET_SYSERR;
888
889   GNUNET_STATISTICS_update (GST_stats,
890                             gettext_noop
891                             ("# peers disconnected due to external request"), 1,
892                             GNUNET_NO);
893   return GNUNET_OK;
894 }
895
896
897 /**
898  * Disconnect from the given neighbour, clean up the record.
899  *
900  * @param n neighbour to disconnect from
901  */
902 static void
903 disconnect_neighbour (struct NeighbourMapEntry *n)
904 {
905   struct MessageQueue *mq;
906   int previous_state;
907
908   previous_state = n->state;
909
910   if (is_disconnecting (n))
911     return;
912
913   /* send DISCONNECT MESSAGE */
914   if (previous_state == S_CONNECTED)
915   {
916     if (GNUNET_OK == send_disconnect (n))
917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
918                   GNUNET_i2s (&n->id));
919     else
920       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921                   "Could not send DISCONNECT_MSG to `%s'\n",
922                   GNUNET_i2s (&n->id));
923   }
924
925   change_state (n, S_DISCONNECT);
926
927   if (previous_state == S_CONNECTED)
928   {
929     GNUNET_assert (NULL != n->address);
930     if (n->address_state == USED)
931     {
932       GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__);
933       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
934       n->address_state = UNUSED;
935     }
936   }
937
938   if (n->address != NULL)
939   {
940     struct GNUNET_TRANSPORT_PluginFunctions *papi;
941
942     papi = GST_plugins_find (n->address->transport_name);
943     if (papi != NULL)
944       papi->disconnect (papi->cls, &n->id);
945   }
946   while (NULL != (mq = n->messages_head))
947   {
948     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
949     if (NULL != mq->cont)
950       mq->cont (mq->cont_cls, GNUNET_SYSERR);
951     GNUNET_free (mq);
952   }
953   if (NULL != n->is_active)
954   {
955     n->is_active->n = NULL;
956     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
957                 "Failing transmission of active message due to disconnect\n");
958     if (NULL != n->is_active->cont)
959       n->is_active->cont (n->is_active->cont_cls, GNUNET_SYSERR);
960     GNUNET_free (n->is_active);
961     n->is_active = NULL;
962   }
963
964   switch (previous_state)
965   {
966   case S_CONNECTED:
967     GNUNET_assert (neighbours_connected > 0);
968     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
969     GNUNET_SCHEDULER_cancel (n->keepalive_task);
970     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
971     n->expect_latency_response = GNUNET_NO;
972     neighbours_connected--;
973     GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
974                               GNUNET_NO);
975     disconnect_notify_cb (callback_cls, &n->id);
976     break;
977   case S_FAST_RECONNECT:
978     GNUNET_STATISTICS_update (GST_stats,
979                               gettext_noop ("# fast reconnects failed"), 1,
980                               GNUNET_NO);
981     disconnect_notify_cb (callback_cls, &n->id);
982     break;
983   default:
984     break;
985   }
986
987   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
988
989   GNUNET_assert (GNUNET_YES ==
990                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
991                                                        &n->id.hashPubKey, n));
992   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
993   {
994     GNUNET_SCHEDULER_cancel (n->ats_suggest);
995     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
996   }
997   if (n->ats_request != GNUNET_SCHEDULER_NO_TASK)
998   {
999     GNUNET_SCHEDULER_cancel (n->ats_request);
1000     n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1001   }
1002
1003   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
1004   {
1005     GNUNET_SCHEDULER_cancel (n->timeout_task);
1006     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1007   }
1008   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
1009   {
1010     GNUNET_SCHEDULER_cancel (n->transmission_task);
1011     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
1012   }
1013   if (NULL != n->fast_reconnect_address)
1014   {
1015     GNUNET_HELLO_address_free (n->fast_reconnect_address);
1016     n->fast_reconnect_address = NULL;
1017   }
1018   if (NULL != n->address)
1019   {
1020     GNUNET_HELLO_address_free (n->address);
1021     n->address = NULL;
1022   }
1023   n->session = NULL;
1024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
1025               GNUNET_i2s (&n->id), n);
1026   GNUNET_free (n);
1027 }
1028
1029
1030 /**
1031  * Peer has been idle for too long. Disconnect.
1032  *
1033  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1034  * @param tc scheduler context
1035  */
1036 static void
1037 neighbour_timeout_task (void *cls,
1038                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1039 {
1040   struct NeighbourMapEntry *n = cls;
1041
1042   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1043   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer`%4s' disconnected due to timeout\n",
1044               GNUNET_i2s (&n->id));
1045
1046   GNUNET_STATISTICS_update (GST_stats,
1047                             gettext_noop
1048                             ("# peers disconnected due to timeout"), 1,
1049                             GNUNET_NO);
1050   disconnect_neighbour (n);
1051 }
1052
1053
1054 /**
1055  * Send another keepalive message.
1056  *
1057  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1058  * @param tc scheduler context
1059  */
1060 static void
1061 neighbour_keepalive_task (void *cls,
1062                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1063 {
1064   struct NeighbourMapEntry *n = cls;
1065   struct GNUNET_MessageHeader m;
1066   int ret;
1067
1068   GNUNET_assert (S_CONNECTED == n->state);
1069   n->keepalive_task =
1070       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1071                                     &neighbour_keepalive_task, n);
1072
1073   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1074                             GNUNET_NO);
1075   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1076   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1077
1078   ret = send_with_session (n,
1079             (const void *) &m, sizeof (m),
1080             UINT32_MAX /* priority */ ,
1081             GNUNET_TIME_UNIT_FOREVER_REL,
1082             NULL, NULL);
1083
1084   n->expect_latency_response = GNUNET_NO;
1085   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1086   if (ret != GNUNET_SYSERR)
1087   {
1088     n->expect_latency_response = GNUNET_YES;
1089     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1090   }
1091
1092 }
1093
1094
1095 /**
1096  * Disconnect from the given neighbour.
1097  *
1098  * @param cls unused
1099  * @param key hash of neighbour's public key (not used)
1100  * @param value the 'struct NeighbourMapEntry' of the neighbour
1101  */
1102 static int
1103 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1104 {
1105   struct NeighbourMapEntry *n = value;
1106
1107   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1108               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1109   if (S_CONNECTED == n->state)
1110     GNUNET_STATISTICS_update (GST_stats,
1111                               gettext_noop
1112                               ("# peers disconnected due to global disconnect"),
1113                               1, GNUNET_NO);
1114   disconnect_neighbour (n);
1115   return GNUNET_OK;
1116 }
1117
1118
1119 static void
1120 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1121 {
1122   struct NeighbourMapEntry *n = cls;
1123
1124   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1125
1126   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127               "ATS did not suggested address to connect to peer `%s'\n",
1128               GNUNET_i2s (&n->id));
1129
1130   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# ATS address suggestions failed"), 1,
1131                             GNUNET_NO);
1132
1133
1134   disconnect_neighbour (n);
1135 }
1136
1137 /**
1138  * Cleanup the neighbours subsystem.
1139  */
1140 void
1141 GST_neighbours_stop ()
1142 {
1143   // This can happen during shutdown
1144   if (neighbours == NULL)
1145   {
1146     return;
1147   }
1148
1149   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1150                                          NULL);
1151   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1152 //  GNUNET_assert (neighbours_connected == 0);
1153   neighbours = NULL;
1154   callback_cls = NULL;
1155   connect_notify_cb = NULL;
1156   disconnect_notify_cb = NULL;
1157   address_change_cb = NULL;
1158 }
1159
1160 struct ContinutionContext
1161 {
1162   struct GNUNET_HELLO_Address *address;
1163
1164   struct Session *session;
1165 };
1166
1167 static void
1168 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1169                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1170 {
1171   struct QuotaSetMessage q_msg;
1172
1173   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1174               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1175               ntohl (quota.value__), GNUNET_i2s (target));
1176   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1177   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1178   q_msg.quota = quota;
1179   q_msg.peer = (*target);
1180   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1181 }
1182
1183 /**
1184  * We tried to send a SESSION_CONNECT message to another peer.  If this
1185  * succeeded, we change the state.  If it failed, we should tell
1186  * ATS to not use this address anymore (until it is re-validated).
1187  *
1188  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1189  * @param target peer to send the message to
1190  * @param success GNUNET_OK on success
1191  */
1192 static void
1193 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1194                            int success)
1195 {
1196   struct ContinutionContext *cc = cls;
1197   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1198
1199   if (GNUNET_YES != success)
1200   {
1201     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1202     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1203   }
1204   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1205   {
1206     GNUNET_HELLO_address_free (cc->address);
1207     GNUNET_free (cc);
1208     return;
1209   }
1210
1211   if ((GNUNET_YES == success) &&
1212       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1213   {
1214     change_state (n, S_CONNECT_SENT);
1215     GNUNET_HELLO_address_free (cc->address);
1216     GNUNET_free (cc);
1217     return;
1218   }
1219
1220   if ((GNUNET_NO == success) &&
1221       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1222   {
1223     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1225                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1226     change_state (n, S_NOT_CONNECTED);
1227     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1228       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1229     n->ats_suggest =
1230         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1231                                       n);
1232     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1233   }
1234   GNUNET_HELLO_address_free (cc->address);
1235   GNUNET_free (cc);
1236 }
1237
1238
1239 static void
1240 ats_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1241 {
1242   struct NeighbourMapEntry *n = cls;
1243   n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1244
1245   n->ats_suggest =
1246     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1247                                   n);
1248 }
1249
1250
1251 /**
1252  * We tried to switch addresses with an peer already connected. If it failed,
1253  * we should tell ATS to not use this address anymore (until it is re-validated).
1254  *
1255  * @param cls the 'struct NeighbourMapEntry'
1256  * @param target peer to send the message to
1257  * @param success GNUNET_OK on success
1258  */
1259 static void
1260 send_switch_address_continuation (void *cls,
1261                                   const struct GNUNET_PeerIdentity *target,
1262                                   int success)
1263 {
1264   struct ContinutionContext *cc = cls;
1265   struct NeighbourMapEntry *n;
1266
1267   if (neighbours == NULL)
1268   {
1269     GNUNET_HELLO_address_free (cc->address);
1270     GNUNET_free (cc);
1271     return;                     /* neighbour is going away */
1272   }
1273
1274   n = lookup_neighbour (&cc->address->peer);
1275   if ((n == NULL) || (is_disconnecting (n)))
1276   {
1277     GNUNET_HELLO_address_free (cc->address);
1278     GNUNET_free (cc);
1279     return;                     /* neighbour is going away */
1280   }
1281
1282   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1283   if (GNUNET_YES != success)
1284   {
1285
1286     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1287                 "Failed to switch connected peer `%s' in state %s to address '%s' session %X, asking ATS for new address \n",
1288                 GNUNET_i2s (&n->id), print_state(n->state), GST_plugins_a2s (cc->address), cc->session);
1289
1290     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1291     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1292
1293     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1294     {
1295       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1296       n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1297     }
1298
1299     if (n->state == S_FAST_RECONNECT)
1300     {
1301       if (GNUNET_SCHEDULER_NO_TASK == n->ats_request)
1302       {
1303         /* Throtteled ATS request for fast reconnect */
1304         struct GNUNET_TIME_Relative delay = GNUNET_TIME_relative_multiply(FAST_RECONNECT_RATE, n->fast_reconnect_attempts);
1305         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306                     "Fast reconnect attempt %u failed, delay ATS request for %llu ms\n", n->fast_reconnect_attempts, delay.rel_value);
1307         n->ats_request =
1308             GNUNET_SCHEDULER_add_delayed (delay,
1309                                           ats_request,
1310                                           n);
1311       }
1312     }
1313     else
1314     {
1315       /* Immediate ATS request for connected peers */
1316       n->ats_suggest =
1317         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1318                                       n);
1319       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1320     }
1321     GNUNET_HELLO_address_free (cc->address);
1322     GNUNET_free (cc);
1323     return;
1324   }
1325   /* Tell ATS that switching addresses was successful */
1326   switch (n->state)
1327   {
1328   case S_CONNECTED:
1329     if (n->address_state == FRESH)
1330     {
1331       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1332       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1333       GNUNET_break (cc->session == n->session);
1334       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1335       n->address_state = USED;
1336     }
1337     break;
1338   case S_FAST_RECONNECT:
1339     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340                 "Successful fast reconnect to peer `%s'\n",
1341                 GNUNET_i2s (&n->id));
1342     change_state (n, S_CONNECTED);
1343     neighbours_connected++;
1344     GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1345                               GNUNET_NO);
1346
1347     if (n->address_state == FRESH)
1348     {
1349       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1350       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1351       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1352       n->address_state = USED;
1353     }
1354
1355     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1356       n->keepalive_task =
1357           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1358
1359     /* Updating quotas */
1360     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1361     send_outbound_quota (target, n->bandwidth_out);
1362
1363   default:
1364     break;
1365   }
1366   GNUNET_HELLO_address_free (cc->address);
1367   GNUNET_free (cc);
1368 }
1369
1370
1371 /**
1372  * We tried to send a SESSION_CONNECT message to another peer.  If this
1373  * succeeded, we change the state.  If it failed, we should tell
1374  * ATS to not use this address anymore (until it is re-validated).
1375  *
1376  * @param cls the 'struct NeighbourMapEntry'
1377  * @param target peer to send the message to
1378  * @param success GNUNET_OK on success
1379  */
1380 static void
1381 send_connect_ack_continuation (void *cls,
1382                                const struct GNUNET_PeerIdentity *target,
1383                                int success)
1384 {
1385   struct ContinutionContext *cc = cls;
1386   struct NeighbourMapEntry *n;
1387
1388   if (neighbours == NULL)
1389   {
1390     GNUNET_HELLO_address_free (cc->address);
1391     GNUNET_free (cc);
1392     return;                     /* neighbour is going away */
1393   }
1394
1395   n = lookup_neighbour (&cc->address->peer);
1396   if ((n == NULL) || (is_disconnecting (n)))
1397   {
1398     GNUNET_HELLO_address_free (cc->address);
1399     GNUNET_free (cc);
1400     return;                     /* neighbour is going away */
1401   }
1402
1403   if (GNUNET_YES == success)
1404   {
1405     GNUNET_HELLO_address_free (cc->address);
1406     GNUNET_free (cc);
1407     return;                     /* sending successful */
1408   }
1409
1410   /* sending failed, ask for next address  */
1411   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1412               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1413               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1414   if (n->state != S_NOT_CONNECTED)
1415     change_state (n, S_NOT_CONNECTED);
1416   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1417   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1418
1419   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1420     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1421   n->ats_suggest =
1422       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1423                                     n);
1424   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1425   GNUNET_HELLO_address_free (cc->address);
1426   GNUNET_free (cc);
1427 }
1428
1429
1430 /**
1431  * For an existing neighbour record, set the active connection to
1432  * use the given address.
1433  *
1434  * @param peer identity of the peer to switch the address for
1435  * @param address address of the other peer, NULL if other peer
1436  *                       connected to us
1437  * @param session session to use (or NULL)
1438  * @param ats performance data
1439  * @param ats_count number of entries in ats
1440  * @param bandwidth_in inbound quota to be used when connection is up
1441  * @param bandwidth_out outbound quota to be used when connection is up
1442  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1443  *         connection is not up (yet)
1444  */
1445 int
1446 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
1447                                        const struct GNUNET_HELLO_Address
1448                                        *address,
1449                                        struct Session *session,
1450                                        const struct GNUNET_ATS_Information *ats,
1451                                        uint32_t ats_count,
1452                                        struct GNUNET_BANDWIDTH_Value32NBO
1453                                        bandwidth_in,
1454                                        struct GNUNET_BANDWIDTH_Value32NBO
1455                                        bandwidth_out)
1456 {
1457   struct NeighbourMapEntry *n;
1458   struct SessionConnectMessage connect_msg;
1459   struct ContinutionContext *cc;
1460   size_t msg_len;
1461   size_t ret;
1462
1463   if (neighbours == NULL)
1464   {
1465     /* This can happen during shutdown */
1466     return GNUNET_NO;
1467   }
1468   n = lookup_neighbour (peer);
1469   if (NULL == n)
1470     return GNUNET_NO;
1471   if (n->state == S_DISCONNECT)
1472   {
1473     /* We are disconnecting, nothing to do here */
1474     return GNUNET_NO;
1475   }
1476   GNUNET_assert (address->transport_name != NULL);
1477   if ((session == NULL) && (0 == address->address_length))
1478   {
1479     GNUNET_break_op (0);
1480     /* FIXME: is this actually possible? When does this happen? */
1481     if (strlen (address->transport_name) > 0)
1482       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1483     GNUNET_ATS_suggest_address (GST_ats, peer);
1484     return GNUNET_NO;
1485   }
1486
1487   /* checks successful and neighbour != NULL */
1488   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1489               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1490               (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
1491               session,
1492               GNUNET_i2s (peer),
1493               print_state (n->state));
1494
1495   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1496   {
1497     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1498     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1499   }
1500
1501
1502   if (n->state == S_FAST_RECONNECT)
1503   {
1504     /* Throtteled fast reconnect */
1505
1506     if (NULL == n->fast_reconnect_address)
1507     {
1508       n->fast_reconnect_address = GNUNET_HELLO_address_copy (address);
1509
1510     }
1511     else if (0 == GNUNET_HELLO_address_cmp(address, n->fast_reconnect_address))
1512     {
1513       n->fast_reconnect_attempts ++;
1514
1515       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1516                 "FAST RECONNECT to peer `%s' and address '%s' with identical address attempt %u\n",
1517                 GNUNET_i2s (&n->id), GST_plugins_a2s (address), n->fast_reconnect_attempts);
1518     }
1519   }
1520   else
1521   {
1522     n->fast_reconnect_attempts = 0;
1523   }
1524
1525   /* do not switch addresses just update quotas */
1526   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1527       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1528       (n->session == session))
1529   {
1530     n->bandwidth_in = bandwidth_in;
1531     n->bandwidth_out = bandwidth_out;
1532     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1533     send_outbound_quota (peer, n->bandwidth_out);
1534     return GNUNET_NO;
1535   }
1536   if (n->state == S_CONNECTED)
1537   {
1538     /* mark old address as no longer used */
1539     GNUNET_assert (NULL != n->address);
1540     if (n->address_state == USED)
1541     {
1542       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1543       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1544       n->address_state = UNUSED;
1545     }
1546   }
1547
1548   /* set new address */
1549   if (NULL != n->address)
1550     GNUNET_HELLO_address_free (n->address);
1551   n->address = GNUNET_HELLO_address_copy (address);
1552   n->address_state = FRESH;
1553   n->bandwidth_in = bandwidth_in;
1554   n->bandwidth_out = bandwidth_out;
1555   GNUNET_SCHEDULER_cancel (n->timeout_task);
1556   n->timeout_task =
1557       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1558                                     &neighbour_timeout_task, n);
1559
1560   if (NULL != address_change_cb && n->state == S_CONNECTED)
1561     address_change_cb (callback_cls, &n->id, n->address); 
1562
1563   /* Obtain an session for this address from plugin */
1564   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1565   papi = GST_plugins_find (address->transport_name);
1566
1567   if (papi == NULL)
1568   {
1569     /* we don't have the plugin for this address */
1570     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1571
1572     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1573       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1574     n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1575                                       ats_suggest_cancel,
1576                                       n);
1577     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1578     GNUNET_HELLO_address_free (n->address);
1579     n->address = NULL;
1580     n->session = NULL;
1581     return GNUNET_NO;
1582   }
1583
1584   if (session == NULL)
1585   {
1586     n->session = papi->get_session (papi->cls, address);
1587     /* Session could not be initiated */
1588     if (n->session == NULL)
1589     {
1590       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1591                   "Failed to obtain new session %p for peer `%s' and  address '%s'\n",
1592                   n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1593
1594       GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1595
1596       if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1597         GNUNET_SCHEDULER_cancel (n->ats_suggest);
1598       n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1599                                         ats_suggest_cancel,
1600                                         n);
1601       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1602       GNUNET_HELLO_address_free (n->address);
1603       n->address = NULL;
1604       n->session = NULL;
1605       return GNUNET_NO;
1606     }
1607
1608     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1609                 "Obtained new session %p for peer `%s' and  address '%s'\n",
1610                  n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1611     /* Telling ATS about new session */
1612     GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0);
1613   }
1614   else
1615   {
1616     n->session = session;
1617     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1618                 "Using existing session %p for peer `%s' and  address '%s'\n",
1619                 n->session,
1620                 GNUNET_i2s (&n->id),
1621                 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>");
1622   }
1623
1624   switch (n->state)
1625   {
1626   case S_NOT_CONNECTED:
1627   case S_CONNECT_SENT:
1628     msg_len = sizeof (struct SessionConnectMessage);
1629     connect_msg.header.size = htons (msg_len);
1630     connect_msg.header.type =
1631         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1632     connect_msg.reserved = htonl (0);
1633     connect_msg.timestamp =
1634         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1635
1636     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1637     cc->session = n->session;
1638     cc->address = GNUNET_HELLO_address_copy (address);
1639     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1640                 "Sending CONNECT message to %s\n",
1641                 GNUNET_i2s (&n->id));
1642     if (n->state != S_CONNECT_RECV)
1643       change_state (n, S_CONNECT_RECV);
1644     ret = send_with_session (n,
1645       (const char *) &connect_msg, msg_len,
1646       UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1647       &send_connect_continuation, cc);
1648
1649     return GNUNET_NO;
1650   case S_CONNECT_RECV:
1651     /* We received a CONNECT message and asked ATS for an address */
1652     msg_len = sizeof (struct SessionConnectMessage);
1653     connect_msg.header.size = htons (msg_len);
1654     connect_msg.header.type =
1655         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1656     connect_msg.reserved = htonl (0);
1657     connect_msg.timestamp =
1658         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1659     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1660     cc->session = n->session;
1661     cc->address = GNUNET_HELLO_address_copy (address);
1662
1663     ret = send_with_session(n,
1664                             (const void *) &connect_msg, msg_len,
1665                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1666                             &send_connect_ack_continuation,
1667                             cc);
1668     return GNUNET_NO;
1669   case S_CONNECTED:
1670   case S_FAST_RECONNECT:
1671     /* connected peer is switching addresses or tries fast reconnect */
1672     msg_len = sizeof (struct SessionConnectMessage);
1673     connect_msg.header.size = htons (msg_len);
1674     connect_msg.header.type =
1675         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1676     connect_msg.reserved = htonl (0);
1677     connect_msg.timestamp =
1678         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1679     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1680     cc->session = n->session;
1681     cc->address = GNUNET_HELLO_address_copy (address);
1682     ret = send_with_session(n,
1683                             (const void *) &connect_msg, msg_len,
1684                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1685                             &send_switch_address_continuation, cc);
1686     if (ret == GNUNET_SYSERR)
1687     {
1688       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1689                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1690                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1691     }
1692     return GNUNET_NO;
1693   default:
1694     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1695                 "Invalid connection state to switch addresses %u \n", n->state);
1696     GNUNET_break_op (0);
1697     return GNUNET_NO;
1698   }
1699 }
1700
1701
1702 /**
1703  * Obtain current latency information for the given neighbour.
1704  *
1705  * @param peer
1706  * @return observed latency of the address, FOREVER if the address was
1707  *         never successfully validated
1708  */
1709 struct GNUNET_TIME_Relative
1710 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1711 {
1712   struct NeighbourMapEntry *n;
1713
1714   n = lookup_neighbour (peer);
1715   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1716     return GNUNET_TIME_UNIT_FOREVER_REL;
1717
1718   return n->latency;
1719 }
1720
1721 /**
1722  * Obtain current address information for the given neighbour.
1723  *
1724  * @param peer
1725  * @return address currently used
1726  */
1727 struct GNUNET_HELLO_Address *
1728 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1729 {
1730   struct NeighbourMapEntry *n;
1731
1732   n = lookup_neighbour (peer);
1733   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1734     return NULL;
1735
1736   return n->address;
1737 }
1738
1739
1740
1741 /**
1742  * Create an entry in the neighbour map for the given peer
1743  *
1744  * @param peer peer to create an entry for
1745  * @return new neighbour map entry
1746  */
1747 static struct NeighbourMapEntry *
1748 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1749 {
1750   struct NeighbourMapEntry *n;
1751
1752   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1753               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1754   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1755   n->id = *peer;
1756   n->state = S_NOT_CONNECTED;
1757   n->latency = GNUNET_TIME_relative_get_forever ();
1758   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1759                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1760                                  MAX_BANDWIDTH_CARRY_S);
1761   n->timeout_task =
1762       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1763                                     &neighbour_timeout_task, n);
1764   GNUNET_assert (GNUNET_OK ==
1765                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1766                                                     &n->id.hashPubKey, n,
1767                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1768   return n;
1769 }
1770
1771
1772 /**
1773  * Try to create a connection to the given target (eventually).
1774  *
1775  * @param target peer to try to connect to
1776  */
1777 void
1778 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1779 {
1780   struct NeighbourMapEntry *n;
1781
1782   // This can happen during shutdown
1783   if (neighbours == NULL)
1784   {
1785     return;
1786   }
1787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1788               GNUNET_i2s (target));
1789   if (0 ==
1790       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1791   {
1792     /* my own hello */
1793     return;
1794   }
1795   n = lookup_neighbour (target);
1796
1797   if (NULL != n)
1798   {
1799     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1800       return;                   /* already connecting or connected */
1801     if (is_disconnecting (n))
1802       change_state (n, S_NOT_CONNECTED);
1803   }
1804
1805
1806   if (n == NULL)
1807     n = setup_neighbour (target);
1808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1809               "Asking ATS for suggested address to connect to peer `%s'\n",
1810               GNUNET_i2s (&n->id));
1811   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1812 }
1813
1814 /**
1815  * Test if we're connected to the given peer.
1816  *
1817  * @param target peer to test
1818  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1819  */
1820 int
1821 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1822 {
1823   struct NeighbourMapEntry *n;
1824
1825   // This can happen during shutdown
1826   if (neighbours == NULL)
1827   {
1828     return GNUNET_NO;
1829   }
1830
1831   n = lookup_neighbour (target);
1832
1833   if ((NULL == n) || (S_CONNECTED != n->state))
1834     return GNUNET_NO;           /* not connected */
1835   return GNUNET_YES;
1836 }
1837
1838 /**
1839  * A session was terminated. Take note.
1840  *
1841  * @param peer identity of the peer where the session died
1842  * @param session session that is gone
1843  */
1844 void
1845 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1846                                    struct Session *session)
1847 {
1848   struct NeighbourMapEntry *n;
1849
1850   if (neighbours == NULL)
1851   {
1852     /* This can happen during shutdown */
1853     return;
1854   }
1855   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1856               session, GNUNET_i2s (peer));
1857   n = lookup_neighbour (peer);
1858   if (NULL == n)
1859     return;
1860   if (session != n->session)
1861     return;                     /* doesn't affect us */
1862   if (n->state == S_CONNECTED)
1863   {
1864     if (n->address_state == USED)
1865     {
1866       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1867       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1868       n->address_state = UNUSED;
1869
1870     }
1871   }
1872
1873   if (NULL != n->address)
1874   {
1875     GNUNET_HELLO_address_free (n->address);
1876     n->address = NULL;
1877   }
1878   n->session = NULL;
1879
1880   /* not connected anymore anyway, shouldn't matter */
1881   if (S_CONNECTED != n->state)
1882     return;
1883
1884   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1885   {
1886     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1887     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1888     n->expect_latency_response = GNUNET_NO;
1889   }
1890
1891   /* connected, try fast reconnect */
1892   /* statistics "transport" : "# peers connected" -= 1
1893    * neighbours_connected -= 1
1894    * BUT: no disconnect_cb to notify clients about disconnect
1895    */
1896
1897   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1898               GNUNET_i2s (peer));
1899
1900   GNUNET_assert (neighbours_connected > 0);
1901   change_state (n, S_FAST_RECONNECT);
1902   neighbours_connected--;
1903   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1904                             GNUNET_NO);
1905
1906
1907   /* We are connected, so ask ATS to switch addresses */
1908   GNUNET_SCHEDULER_cancel (n->timeout_task);
1909   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1910                                     &neighbour_timeout_task, n);
1911   /* try QUICKLY to re-establish a connection, reduce timeout! */
1912   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1913     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1914   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1915                                     &ats_suggest_cancel,
1916                                     n);
1917   GNUNET_ATS_suggest_address (GST_ats, peer);
1918 }
1919
1920
1921 /**
1922  * Transmit a message to the given target using the active connection.
1923  *
1924  * @param target destination
1925  * @param msg message to send
1926  * @param msg_size number of bytes in msg
1927  * @param timeout when to fail with timeout
1928  * @param cont function to call when done
1929  * @param cont_cls closure for 'cont'
1930  */
1931 void
1932 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1933                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1934                      GST_NeighbourSendContinuation cont, void *cont_cls)
1935 {
1936   struct NeighbourMapEntry *n;
1937   struct MessageQueue *mq;
1938
1939   // This can happen during shutdown
1940   if (neighbours == NULL)
1941   {
1942     return;
1943   }
1944
1945   n = lookup_neighbour (target);
1946   if ((n == NULL) || (!is_connected (n)))
1947   {
1948     GNUNET_STATISTICS_update (GST_stats,
1949                               gettext_noop
1950                               ("# messages not sent (no such peer or not connected)"),
1951                               1, GNUNET_NO);
1952     if (n == NULL)
1953       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1954                   "Could not send message to peer `%s': unknown neighbour",
1955                   GNUNET_i2s (target));
1956     else if (!is_connected (n))
1957       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1958                   "Could not send message to peer `%s': not connected\n",
1959                   GNUNET_i2s (target));
1960     if (NULL != cont)
1961       cont (cont_cls, GNUNET_SYSERR);
1962     return;
1963   }
1964
1965   if ((n->session == NULL) && (n->address == NULL))
1966   {
1967     GNUNET_STATISTICS_update (GST_stats,
1968                               gettext_noop
1969                               ("# messages not sent (no such peer or not connected)"),
1970                               1, GNUNET_NO);
1971     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1972                 "Could not send message to peer `%s': no address available\n",
1973                 GNUNET_i2s (target));
1974     if (NULL != cont)
1975       cont (cont_cls, GNUNET_SYSERR);
1976     return;
1977   }
1978
1979   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1980   GNUNET_STATISTICS_update (GST_stats,
1981                             gettext_noop
1982                             ("# bytes in message queue for other peers"),
1983                             msg_size, GNUNET_NO);
1984   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1985   mq->cont = cont;
1986   mq->cont_cls = cont_cls;
1987   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1988   memcpy (&mq[1], msg, msg_size);
1989   mq->message_buf = (const char *) &mq[1];
1990   mq->message_buf_size = msg_size;
1991   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1992   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1993
1994   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1995       (NULL == n->is_active))
1996     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1997 }
1998
1999
2000 /**
2001  * We have received a message from the given sender.  How long should
2002  * we delay before receiving more?  (Also used to keep the peer marked
2003  * as live).
2004  *
2005  * @param sender sender of the message
2006  * @param size size of the message
2007  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
2008  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
2009  *                   GNUNET_SYSERR if the connection is not fully up yet
2010  * @return how long to wait before reading more from this sender
2011  */
2012 struct GNUNET_TIME_Relative
2013 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
2014                                         *sender, ssize_t size, int *do_forward)
2015 {
2016   struct NeighbourMapEntry *n;
2017   struct GNUNET_TIME_Relative ret;
2018
2019   // This can happen during shutdown
2020   if (neighbours == NULL)
2021   {
2022     return GNUNET_TIME_UNIT_FOREVER_REL;
2023   }
2024
2025   n = lookup_neighbour (sender);
2026   if (n == NULL)
2027   {
2028     GST_neighbours_try_connect (sender);
2029     n = lookup_neighbour (sender);
2030     if (NULL == n)
2031     {
2032       GNUNET_STATISTICS_update (GST_stats,
2033                                 gettext_noop
2034                                 ("# messages discarded due to lack of neighbour record"),
2035                                 1, GNUNET_NO);
2036       *do_forward = GNUNET_NO;
2037       return GNUNET_TIME_UNIT_ZERO;
2038     }
2039   }
2040   if (!is_connected (n))
2041   {
2042     *do_forward = GNUNET_SYSERR;
2043     return GNUNET_TIME_UNIT_ZERO;
2044   }
2045   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
2046   {
2047     n->quota_violation_count++;
2048     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2049                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
2050                 n->in_tracker.available_bytes_per_s__,
2051                 n->quota_violation_count);
2052     /* Discount 32k per violation */
2053     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
2054   }
2055   else
2056   {
2057     if (n->quota_violation_count > 0)
2058     {
2059       /* try to add 32k back */
2060       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
2061       n->quota_violation_count--;
2062     }
2063   }
2064   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2065   {
2066     GNUNET_STATISTICS_update (GST_stats,
2067                               gettext_noop
2068                               ("# bandwidth quota violations by other peers"),
2069                               1, GNUNET_NO);
2070     *do_forward = GNUNET_NO;
2071     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
2072   }
2073   *do_forward = GNUNET_YES;
2074   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
2075   if (ret.rel_value > 0)
2076   {
2077     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2078                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
2079                 (unsigned long long) n->in_tracker.
2080                 consumption_since_last_update__,
2081                 (unsigned int) n->in_tracker.available_bytes_per_s__,
2082                 (unsigned long long) ret.rel_value);
2083     GNUNET_STATISTICS_update (GST_stats,
2084                               gettext_noop ("# ms throttling suggested"),
2085                               (int64_t) ret.rel_value, GNUNET_NO);
2086   }
2087   return ret;
2088 }
2089
2090
2091 /**
2092  * Keep the connection to the given neighbour alive longer,
2093  * we received a KEEPALIVE (or equivalent).
2094  *
2095  * @param neighbour neighbour to keep alive
2096  */
2097 void
2098 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
2099 {
2100   struct NeighbourMapEntry *n;
2101
2102   // This can happen during shutdown
2103   if (neighbours == NULL)
2104   {
2105     return;
2106   }
2107
2108   n = lookup_neighbour (neighbour);
2109   if (NULL == n)
2110   {
2111     GNUNET_STATISTICS_update (GST_stats,
2112                               gettext_noop
2113                               ("# KEEPALIVE messages discarded (not connected)"),
2114                               1, GNUNET_NO);
2115     return;
2116   }
2117   GNUNET_SCHEDULER_cancel (n->timeout_task);
2118   n->timeout_task =
2119       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2120                                     &neighbour_timeout_task, n);
2121
2122   /* send reply to measure latency */
2123   if (S_CONNECTED != n->state)
2124     return;
2125
2126   struct GNUNET_MessageHeader m;
2127
2128   m.size = htons (sizeof (struct GNUNET_MessageHeader));
2129   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
2130
2131   send_with_session(n,
2132       (const void *) &m, sizeof (m),
2133       UINT32_MAX,
2134       GNUNET_TIME_UNIT_FOREVER_REL,
2135       NULL, NULL);
2136 }
2137
2138 /**
2139  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
2140  * to this peer
2141  *
2142  * @param neighbour neighbour to keep alive
2143  * @param ats performance data
2144  * @param ats_count number of entries in ats
2145  */
2146 void
2147 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
2148                                    const struct GNUNET_ATS_Information *ats,
2149                                    uint32_t ats_count)
2150 {
2151   struct NeighbourMapEntry *n;
2152   struct GNUNET_ATS_Information *ats_new;
2153   uint32_t latency;
2154
2155   if (neighbours == NULL)
2156   {
2157     // This can happen during shutdown
2158     return;
2159   }
2160
2161   n = lookup_neighbour (neighbour);
2162   if ((NULL == n) || (n->state != S_CONNECTED))
2163   {
2164     GNUNET_STATISTICS_update (GST_stats,
2165                               gettext_noop
2166                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2167                               1, GNUNET_NO);
2168     return;
2169   }
2170   if (n->expect_latency_response != GNUNET_YES)
2171   {
2172     GNUNET_STATISTICS_update (GST_stats,
2173                               gettext_noop
2174                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2175                               1, GNUNET_NO);
2176     return;
2177   }
2178   n->expect_latency_response = GNUNET_NO;
2179
2180   GNUNET_assert (n->keep_alive_sent.abs_value !=
2181                  GNUNET_TIME_absolute_get_zero ().abs_value);
2182   n->latency =
2183       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2184                                            GNUNET_TIME_absolute_get ());
2185   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2186               GNUNET_i2s (&n->id), n->latency.rel_value);
2187
2188   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2189   {
2190     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2191   }
2192   else
2193   {
2194     ats_new =
2195         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2196                        (ats_count + 1));
2197     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2198
2199     /* add latency */
2200     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2201     if (n->latency.rel_value > UINT32_MAX)
2202       latency = UINT32_MAX;
2203     else
2204       latency = n->latency.rel_value;
2205     ats_new[ats_count].value = htonl (latency);
2206
2207     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2208                                ats_count + 1);
2209     GNUNET_free (ats_new);
2210   }
2211 }
2212
2213
2214 /**
2215  * Change the incoming quota for the given peer.
2216  *
2217  * @param neighbour identity of peer to change qutoa for
2218  * @param quota new quota
2219  */
2220 void
2221 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2222                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2223 {
2224   struct NeighbourMapEntry *n;
2225
2226   // This can happen during shutdown
2227   if (neighbours == NULL)
2228   {
2229     return;
2230   }
2231
2232   n = lookup_neighbour (neighbour);
2233   if (n == NULL)
2234   {
2235     GNUNET_STATISTICS_update (GST_stats,
2236                               gettext_noop
2237                               ("# SET QUOTA messages ignored (no such peer)"),
2238                               1, GNUNET_NO);
2239     return;
2240   }
2241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2242               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2243               ntohl (quota.value__), GNUNET_i2s (&n->id));
2244   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2245   if (0 != ntohl (quota.value__))
2246     return;
2247   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2248               GNUNET_i2s (&n->id), "SET_QUOTA");
2249   if (is_connected (n))
2250     GNUNET_STATISTICS_update (GST_stats,
2251                               gettext_noop ("# disconnects due to quota of 0"),
2252                               1, GNUNET_NO);
2253   disconnect_neighbour (n);
2254 }
2255
2256
2257 /**
2258  * Closure for the neighbours_iterate function.
2259  */
2260 struct IteratorContext
2261 {
2262   /**
2263    * Function to call on each connected neighbour.
2264    */
2265   GST_NeighbourIterator cb;
2266
2267   /**
2268    * Closure for 'cb'.
2269    */
2270   void *cb_cls;
2271 };
2272
2273
2274 /**
2275  * Call the callback from the closure for each connected neighbour.
2276  *
2277  * @param cls the 'struct IteratorContext'
2278  * @param key the hash of the public key of the neighbour
2279  * @param value the 'struct NeighbourMapEntry'
2280  * @return GNUNET_OK (continue to iterate)
2281  */
2282 static int
2283 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2284 {
2285   struct IteratorContext *ic = cls;
2286   struct NeighbourMapEntry *n = value;
2287
2288   if (!is_connected (n))
2289     return GNUNET_OK;
2290
2291   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2292   return GNUNET_OK;
2293 }
2294
2295
2296 /**
2297  * Iterate over all connected neighbours.
2298  *
2299  * @param cb function to call
2300  * @param cb_cls closure for cb
2301  */
2302 void
2303 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2304 {
2305   struct IteratorContext ic;
2306
2307   // This can happen during shutdown
2308   if (neighbours == NULL)
2309   {
2310     return;
2311   }
2312
2313   ic.cb = cb;
2314   ic.cb_cls = cb_cls;
2315   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2316 }
2317
2318 /**
2319  * If we have an active connection to the given target, it must be shutdown.
2320  *
2321  * @param target peer to disconnect from
2322  */
2323 void
2324 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2325 {
2326   struct NeighbourMapEntry *n;
2327
2328   // This can happen during shutdown
2329   if (neighbours == NULL)
2330   {
2331     return;
2332   }
2333
2334   n = lookup_neighbour (target);
2335   if (NULL == n)
2336     return;                     /* not active */
2337   disconnect_neighbour (n);
2338 }
2339
2340
2341 /**
2342  * We received a disconnect message from the given peer,
2343  * validate and process.
2344  *
2345  * @param peer sender of the message
2346  * @param msg the disconnect message
2347  */
2348 void
2349 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2350                                           *peer,
2351                                           const struct GNUNET_MessageHeader
2352                                           *msg)
2353 {
2354   struct NeighbourMapEntry *n;
2355   const struct SessionDisconnectMessage *sdm;
2356   GNUNET_HashCode hc;
2357
2358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2359               "Received DISCONNECT message from peer `%s'\n",
2360               GNUNET_i2s (peer));
2361   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2362   {
2363     // GNUNET_break_op (0);
2364     GNUNET_STATISTICS_update (GST_stats,
2365                               gettext_noop
2366                               ("# disconnect messages ignored (old format)"), 1,
2367                               GNUNET_NO);
2368     return;
2369   }
2370   sdm = (const struct SessionDisconnectMessage *) msg;
2371   n = lookup_neighbour (peer);
2372   if (NULL == n)
2373     return;                     /* gone already */
2374   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2375       n->connect_ts.abs_value)
2376   {
2377     GNUNET_STATISTICS_update (GST_stats,
2378                               gettext_noop
2379                               ("# disconnect messages ignored (timestamp)"), 1,
2380                               GNUNET_NO);
2381     return;
2382   }
2383   GNUNET_CRYPTO_hash (&sdm->public_key,
2384                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2385                       &hc);
2386   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2387   {
2388     GNUNET_break_op (0);
2389     return;
2390   }
2391   if (ntohl (sdm->purpose.size) !=
2392       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2393       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2394       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2395   {
2396     GNUNET_break_op (0);
2397     return;
2398   }
2399   if (GNUNET_OK !=
2400       GNUNET_CRYPTO_rsa_verify
2401       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2402        &sdm->signature, &sdm->public_key))
2403   {
2404     GNUNET_break_op (0);
2405     return;
2406   }
2407   GST_neighbours_force_disconnect (peer);
2408 }
2409
2410
2411 /**
2412  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2413  * Consider switching to it.
2414  *
2415  * @param message possibly a 'struct SessionConnectMessage' (check format)
2416  * @param peer identity of the peer to switch the address for
2417  * @param address address of the other peer, NULL if other peer
2418  *                       connected to us
2419  * @param session session to use (or NULL)
2420  * @param ats performance data
2421  * @param ats_count number of entries in ats
2422  */
2423 void
2424 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2425                                    const struct GNUNET_PeerIdentity *peer,
2426                                    const struct GNUNET_HELLO_Address *address,
2427                                    struct Session *session,
2428                                    const struct GNUNET_ATS_Information *ats,
2429                                    uint32_t ats_count)
2430 {
2431   const struct SessionConnectMessage *scm;
2432   struct GNUNET_MessageHeader msg;
2433   struct NeighbourMapEntry *n;
2434   size_t msg_len;
2435   size_t ret;
2436
2437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2438               "Received CONNECT_ACK message from peer `%s'\n",
2439               GNUNET_i2s (peer));
2440
2441   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2442   {
2443     GNUNET_break_op (0);
2444     return;
2445   }
2446   scm = (const struct SessionConnectMessage *) message;
2447   GNUNET_break_op (ntohl (scm->reserved) == 0);
2448   n = lookup_neighbour (peer);
2449   if (NULL == n)
2450   {
2451     /* we did not send 'CONNECT' -- at least not recently */
2452     GNUNET_STATISTICS_update (GST_stats,
2453                               gettext_noop
2454                               ("# unexpected CONNECT_ACK messages (no peer)"),
2455                               1, GNUNET_NO);
2456     return;
2457   }
2458
2459   /* Additional check
2460    *
2461    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2462    *
2463    * We also received an CONNECT message, switched from SENDT to RECV and
2464    * ATS already suggested us an address after a successful blacklist check
2465    */
2466
2467   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2468               "Received CONNECT_ACK message from peer `%s' in state `%s'\n",
2469               GNUNET_i2s (peer),
2470               print_state(n->state));
2471
2472   if ((n->address != NULL) && (n->state == S_CONNECTED))
2473   {
2474     /* After fast reconnect: send ACK (ACK) even when we are connected */
2475     msg_len = sizeof (msg);
2476     msg.size = htons (msg_len);
2477     msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2478
2479     ret = send_with_session(n,
2480               (const char *) &msg, msg_len,
2481               UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2482               NULL, NULL);
2483
2484     if (ret == GNUNET_SYSERR)
2485       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2486                   "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2487                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2488     return;
2489   }
2490
2491   if ((NULL == n->address) ||
2492        ((n->state != S_CONNECT_SENT) &&
2493       ((n->state != S_CONNECT_RECV) && (n->address != NULL))))
2494   {
2495     GNUNET_STATISTICS_update (GST_stats,
2496                               gettext_noop
2497                               ("# unexpected CONNECT_ACK messages"), 1,
2498                               GNUNET_NO);
2499     return;
2500   }
2501   if (n->state != S_CONNECTED)
2502     change_state (n, S_CONNECTED);
2503
2504   if (NULL != session)
2505   {
2506     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2507                      "transport-ats",
2508                      "Giving ATS session %p of plugin %s for peer %s\n",
2509                      session, address->transport_name, GNUNET_i2s (peer));
2510   }
2511   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2512   GNUNET_assert (NULL != n->address);
2513
2514   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2515   {
2516     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2517     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2518     n->address_state = USED;
2519   }
2520
2521   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2522
2523   /* send ACK (ACK) */
2524   msg_len = sizeof (msg);
2525   msg.size = htons (msg_len);
2526   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2527
2528   ret = send_with_session(n,
2529             (const char *) &msg, msg_len,
2530             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2531             NULL, NULL);
2532
2533   if (ret == GNUNET_SYSERR)
2534     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2535                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2536                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2537
2538
2539   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2540     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2541
2542   neighbours_connected++;
2543   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
2544                             GNUNET_NO);
2545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2546               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2547               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2548               __LINE__);
2549   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2550   send_outbound_quota (peer, n->bandwidth_out);
2551
2552 }
2553
2554
2555 void
2556 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2557                            const struct GNUNET_PeerIdentity *peer,
2558                            const struct GNUNET_HELLO_Address *address,
2559                            struct Session *session,
2560                            const struct GNUNET_ATS_Information *ats,
2561                            uint32_t ats_count)
2562 {
2563   struct NeighbourMapEntry *n;
2564
2565   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2566               GNUNET_i2s (peer));
2567   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2568   {
2569     GNUNET_break_op (0);
2570     return;
2571   }
2572   n = lookup_neighbour (peer);
2573   if (NULL == n)
2574   {
2575     GNUNET_break (0);
2576     return;
2577   }
2578   if (S_CONNECTED == n->state)
2579     return;
2580   if (!is_connecting (n))
2581   {
2582     GNUNET_STATISTICS_update (GST_stats,
2583                               gettext_noop ("# unexpected ACK messages"), 1,
2584                               GNUNET_NO);
2585     return;
2586   }
2587   change_state (n, S_CONNECTED);
2588   if (NULL != session)
2589     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2590                      "transport-ats",
2591                      "Giving ATS session %p of plugin %s for peer %s\n",
2592                      session, address->transport_name, GNUNET_i2s (peer));
2593   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2594   GNUNET_assert (n->address != NULL);
2595
2596   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2597   {
2598     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2599     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2600     n->address_state = USED;
2601   }
2602
2603
2604   neighbours_connected++;
2605   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
2606                             GNUNET_NO);
2607
2608   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2609   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2610     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2611   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2612               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2613               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2614               __LINE__);
2615   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2616   send_outbound_quota (peer, n->bandwidth_out);
2617 }
2618
2619 struct BlackListCheckContext
2620 {
2621   struct GNUNET_ATS_Information *ats;
2622
2623   uint32_t ats_count;
2624
2625   struct Session *session;
2626
2627   struct GNUNET_HELLO_Address *address;
2628
2629   struct GNUNET_TIME_Absolute ts;
2630 };
2631
2632
2633 static void
2634 handle_connect_blacklist_cont (void *cls,
2635                                const struct GNUNET_PeerIdentity *peer,
2636                                int result)
2637 {
2638   struct NeighbourMapEntry *n;
2639   struct BlackListCheckContext *bcc = cls;
2640
2641   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2642               "Blacklist check due to CONNECT message: `%s'\n",
2643               GNUNET_i2s (peer),
2644               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2645   /* not allowed */
2646   if (GNUNET_OK != result)
2647   {
2648     GNUNET_HELLO_address_free (bcc->address);
2649     GNUNET_free (bcc);
2650     return;
2651   }
2652
2653   n = lookup_neighbour (peer);
2654   if (NULL == n)
2655     n = setup_neighbour (peer);
2656
2657   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2658   {
2659     if (NULL != bcc->session)
2660       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2661                        "transport-ats",
2662                        "Giving ATS session %p of address `%s' for peer %s\n",
2663                        bcc->session, GST_plugins_a2s (bcc->address),
2664                        GNUNET_i2s (peer));
2665     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2666
2667     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2668                                bcc->ats_count);
2669     n->connect_ts = bcc->ts;
2670   }
2671
2672   GNUNET_HELLO_address_free (bcc->address);
2673   GNUNET_free (bcc);
2674
2675   if (n->state != S_CONNECT_RECV)
2676     change_state (n, S_CONNECT_RECV);
2677
2678
2679   /* Ask ATS for an address to connect via that address */
2680   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2681     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2682   n->ats_suggest =
2683       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2684                                     n);
2685   GNUNET_ATS_suggest_address (GST_ats, peer);
2686 }
2687
2688 /**
2689  * We received a 'SESSION_CONNECT' message from the other peer.
2690  * Consider switching to it.
2691  *
2692  * @param message possibly a 'struct SessionConnectMessage' (check format)
2693  * @param peer identity of the peer to switch the address for
2694  * @param address address of the other peer, NULL if other peer
2695  *                       connected to us
2696  * @param session session to use (or NULL)
2697  * @param ats performance data
2698  * @param ats_count number of entries in ats (excluding 0-termination)
2699  */
2700 void
2701 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2702                                const struct GNUNET_PeerIdentity *peer,
2703                                const struct GNUNET_HELLO_Address *address,
2704                                struct Session *session,
2705                                const struct GNUNET_ATS_Information *ats,
2706                                uint32_t ats_count)
2707 {
2708   const struct SessionConnectMessage *scm;
2709   struct BlackListCheckContext *bcc = NULL;
2710   struct NeighbourMapEntry *n;
2711
2712   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2713               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2714   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2715   {
2716     GNUNET_break_op (0);
2717     return;
2718   }
2719
2720   scm = (const struct SessionConnectMessage *) message;
2721   GNUNET_break_op (ntohl (scm->reserved) == 0);
2722
2723   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2724
2725   n = lookup_neighbour (peer);
2726   if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n->state)))
2727   {
2728     /* connected peer switches addresses or is trying to do a fast reconnect*/
2729     return;
2730   }
2731
2732
2733   /* we are not connected to this peer */
2734   /* do blacklist check */
2735   bcc =
2736       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2737                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2738   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2739   bcc->ats_count = ats_count + 1;
2740   bcc->address = GNUNET_HELLO_address_copy (address);
2741   bcc->session = session;
2742   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2743   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2744   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2745   bcc->ats[ats_count].value =
2746       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2747   GST_blacklist_test_allowed (peer, address->transport_name,
2748                               &handle_connect_blacklist_cont, bcc);
2749 }
2750
2751
2752 /* end of file gnunet-service-transport_neighbours.c */