- fix for 601 assertion
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
61
62 #define FAST_RECONNECT_RATE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100)
63
64 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
65
66 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
67
68 /**
69  * Entry in neighbours.
70  */
71 struct NeighbourMapEntry;
72
73 GNUNET_NETWORK_STRUCT_BEGIN
74
75 /**
76  * Message a peer sends to another to indicate its
77  * preference for communicating via a particular
78  * session (and the desire to establish a real
79  * connection).
80  */
81 struct SessionConnectMessage
82 {
83   /**
84    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
85    */
86   struct GNUNET_MessageHeader header;
87
88   /**
89    * Always zero.
90    */
91   uint32_t reserved GNUNET_PACKED;
92
93   /**
94    * Absolute time at the sender.  Only the most recent connect
95    * message implies which session is preferred by the sender.
96    */
97   struct GNUNET_TIME_AbsoluteNBO timestamp;
98
99 };
100
101
102 struct SessionDisconnectMessage
103 {
104   /**
105    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
106    */
107   struct GNUNET_MessageHeader header;
108
109   /**
110    * Always zero.
111    */
112   uint32_t reserved GNUNET_PACKED;
113
114   /**
115    * Purpose of the signature.  Extends over the timestamp.
116    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
117    */
118   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
119
120   /**
121    * Absolute time at the sender.  Only the most recent connect
122    * message implies which session is preferred by the sender.
123    */
124   struct GNUNET_TIME_AbsoluteNBO timestamp;
125
126   /**
127    * Public key of the sender.
128    */
129   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
130
131   /**
132    * Signature of the peer that sends us the disconnect.  Only
133    * valid if the timestamp is AFTER the timestamp from the
134    * corresponding 'CONNECT' message.
135    */
136   struct GNUNET_CRYPTO_RsaSignature signature;
137
138 };
139 GNUNET_NETWORK_STRUCT_END
140
141 /**
142  * For each neighbour we keep a list of messages
143  * that we still want to transmit to the neighbour.
144  */
145 struct MessageQueue
146 {
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *next;
152
153   /**
154    * This is a doubly linked list.
155    */
156   struct MessageQueue *prev;
157
158   /**
159    * Once this message is actively being transmitted, which
160    * neighbour is it associated with?
161    */
162   struct NeighbourMapEntry *n;
163
164   /**
165    * Function to call once we're done.
166    */
167   GST_NeighbourSendContinuation cont;
168
169   /**
170    * Closure for 'cont'
171    */
172   void *cont_cls;
173
174   /**
175    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
176    * stuck together in memory.  Allocated at the end of this struct.
177    */
178   const char *message_buf;
179
180   /**
181    * Size of the message buf
182    */
183   size_t message_buf_size;
184
185   /**
186    * At what time should we fail?
187    */
188   struct GNUNET_TIME_Absolute timeout;
189
190 };
191
192
193 enum State
194 {
195   /**
196    * fresh peer or completely disconnected
197    */
198   S_NOT_CONNECTED,
199
200   /**
201    * sent CONNECT message to other peer, waiting for CONNECT_ACK
202    */
203   S_CONNECT_SENT,
204
205   /**
206    * received CONNECT message to other peer, sending CONNECT_ACK
207    */
208   S_CONNECT_RECV,
209
210   /**
211    * received ACK or payload
212    */
213   S_CONNECTED,
214
215   /**
216    * connection ended, fast reconnect
217    */
218   S_FAST_RECONNECT,
219
220   /**
221    * Disconnect in progress
222    */
223   S_DISCONNECT
224 };
225
226 enum Address_State
227 {
228   USED,
229   UNUSED,
230   FRESH,
231 };
232
233
234 /**
235  * Entry in neighbours.
236  */
237 struct NeighbourMapEntry
238 {
239
240   /**
241    * Head of list of messages we would like to send to this peer;
242    * must contain at most one message per client.
243    */
244   struct MessageQueue *messages_head;
245
246   /**
247    * Tail of list of messages we would like to send to this peer; must
248    * contain at most one message per client.
249    */
250   struct MessageQueue *messages_tail;
251
252   /**
253    * Are we currently trying to send a message? If so, which one?
254    */
255   struct MessageQueue *is_active;
256
257   /**
258    * Active session for communicating with the peer.
259    */
260   struct Session *session;
261
262   /**
263    * Address we currently use.
264    */
265   struct GNUNET_HELLO_Address *address;
266
267   /**
268    * Address we currently use.
269    */
270   struct GNUNET_HELLO_Address *fast_reconnect_address;
271
272
273   /**
274    * Identity of this neighbour.
275    */
276   struct GNUNET_PeerIdentity id;
277
278   /**
279    * ID of task scheduled to run when this peer is about to
280    * time out (will free resources associated with the peer).
281    */
282   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
283
284   /**
285    * ID of task scheduled to send keepalives.
286    */
287   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
288
289   /**
290    * ID of task scheduled to run when we should try transmitting
291    * the head of the message queue.
292    */
293   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
294
295   /**
296    * Tracker for inbound bandwidth.
297    */
298   struct GNUNET_BANDWIDTH_Tracker in_tracker;
299
300   /**
301    * Inbound bandwidth from ATS, activated when connection is up
302    */
303   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
304
305   /**
306    * Inbound bandwidth from ATS, activated when connection is up
307    */
308   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
309
310   /**
311    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
312    */
313   struct GNUNET_TIME_Absolute connect_ts;
314
315   /**
316    * When did we sent the last keep-alive message?
317    */
318   struct GNUNET_TIME_Absolute keep_alive_sent;
319
320   /**
321    * Latest calculated latency value
322    */
323   struct GNUNET_TIME_Relative latency;
324
325   /**
326    * Timeout for ATS
327    * We asked ATS for a new address for this peer
328    */
329   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
330
331   /**
332    * Delay for ATS request
333    */
334   GNUNET_SCHEDULER_TaskIdentifier ats_request;
335
336   /**
337    * Task the resets the peer state after due to an pending
338    * unsuccessful connection setup
339    */
340   GNUNET_SCHEDULER_TaskIdentifier state_reset;
341
342
343   /**
344    * How often has the other peer (recently) violated the inbound
345    * traffic limit?  Incremented by 10 per violation, decremented by 1
346    * per non-violation (for each time interval).
347    */
348   unsigned int quota_violation_count;
349
350
351   /**
352    * The current state of the peer
353    * Element of enum State
354    */
355   int state;
356
357   /**
358    * Did we sent an KEEP_ALIVE message and are we expecting a response?
359    */
360   int expect_latency_response;
361
362   /**
363    * Was the address used successfully
364    */
365   int address_state;
366
367   /**
368    * Fast reconnect attempts for identical address
369    */
370   unsigned int fast_reconnect_attempts;
371 };
372
373
374 /**
375  * All known neighbours and their HELLOs.
376  */
377 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
378
379 /**
380  * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb
381  */
382 static void *callback_cls;
383
384 /**
385  * Function to call when we connected to a neighbour.
386  */
387 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
388
389 /**
390  * Function to call when we disconnected from a neighbour.
391  */
392 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
393
394 /**
395  * Function to call when we changed an active address of a neighbour.
396  */
397 static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb;
398
399 /**
400  * counter for connected neighbours
401  */
402 static int neighbours_connected;
403
404 /**
405  * Lookup a neighbour entry in the neighbours hash map.
406  *
407  * @param pid identity of the peer to look up
408  * @return the entry, NULL if there is no existing record
409  */
410 static struct NeighbourMapEntry *
411 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
412 {
413   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
414 }
415
416 /**
417  * Disconnect from the given neighbour, clean up the record.
418  *
419  * @param n neighbour to disconnect from
420  */
421 static void
422 disconnect_neighbour (struct NeighbourMapEntry *n);
423
424 #define change_state(n, state, ...) change (n, state, __LINE__)
425
426 static int
427 is_connecting (struct NeighbourMapEntry *n)
428 {
429   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
430     return GNUNET_YES;
431   return GNUNET_NO;
432 }
433
434 static int
435 is_connected (struct NeighbourMapEntry *n)
436 {
437   if (n->state == S_CONNECTED)
438     return GNUNET_YES;
439   return GNUNET_NO;
440 }
441
442 static int
443 is_disconnecting (struct NeighbourMapEntry *n)
444 {
445   if (n->state == S_DISCONNECT)
446     return GNUNET_YES;
447   return GNUNET_NO;
448 }
449
450 static const char *
451 print_state (int state)
452 {
453   switch (state)
454   {
455   case S_CONNECTED:
456     return "S_CONNECTED";
457     break;
458   case S_CONNECT_RECV:
459     return "S_CONNECT_RECV";
460     break;
461   case S_CONNECT_SENT:
462     return "S_CONNECT_SENT";
463     break;
464   case S_DISCONNECT:
465     return "S_DISCONNECT";
466     break;
467   case S_NOT_CONNECTED:
468     return "S_NOT_CONNECTED";
469     break;
470   case S_FAST_RECONNECT:
471     return "S_FAST_RECONNECT";
472     break;
473   default:
474     GNUNET_break (0);
475     break;
476   }
477   return NULL;
478 }
479
480 static int
481 change (struct NeighbourMapEntry *n, int state, int line);
482
483 static void
484 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
485
486
487 static void
488 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
489 {
490   struct NeighbourMapEntry *n = cls;
491
492   if (n == NULL)
493     return;
494
495   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
496   if (n->state == S_CONNECTED)
497     return;
498   GNUNET_STATISTICS_update (GST_stats,
499                             gettext_noop
500                             ("# failed connection attempts due to timeout"), 1,
501                             GNUNET_NO);
502   /* resetting state */
503
504   if (n->state == S_FAST_RECONNECT)
505   {
506     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507                 "Fast reconnect time out, disconnecting peer `%s'\n",
508                 GNUNET_i2s (&n->id));
509     disconnect_neighbour(n);
510     return;
511   }
512
513   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
515               GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECTED", __LINE__);
516
517   n->state = S_NOT_CONNECTED;
518
519   /* destroying address */
520   if (n->address != NULL)
521   {
522     GNUNET_assert (strlen (n->address->transport_name) > 0);
523     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
524   }
525
526   /* request new address */
527   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
528     GNUNET_SCHEDULER_cancel (n->ats_suggest);
529   n->ats_suggest =
530       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
531                                     n);
532   GNUNET_ATS_suggest_address (GST_ats, &n->id);
533 }
534
535 static int
536 change (struct NeighbourMapEntry *n, int state, int line)
537 {
538   int previous_state;
539   /* allowed transitions */
540   int allowed = GNUNET_NO;
541
542   previous_state = n->state;
543
544   switch (n->state)
545   {
546   case S_NOT_CONNECTED:
547     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
548         (state == S_DISCONNECT))
549       allowed = GNUNET_YES;
550     break;
551   case S_CONNECT_RECV:
552     allowed = GNUNET_YES;
553     break;
554   case S_CONNECT_SENT:
555     allowed = GNUNET_YES;
556     break;
557   case S_CONNECTED:
558     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
559       allowed = GNUNET_YES;
560     break;
561   case S_DISCONNECT:
562     break;
563   case S_FAST_RECONNECT:
564     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
565       allowed = GNUNET_YES;
566     break;
567   default:
568     GNUNET_break (0);
569     break;
570   }
571   if (allowed == GNUNET_NO)
572   {
573     char *old = GNUNET_strdup (print_state (n->state));
574     char *new = GNUNET_strdup (print_state (state));
575
576     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
577                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
578                 new, line);
579     GNUNET_break (0);
580     GNUNET_free (old);
581     GNUNET_free (new);
582     return GNUNET_SYSERR;
583   }
584   {
585     char *old = GNUNET_strdup (print_state (n->state));
586     char *new = GNUNET_strdup (print_state (state));
587
588     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
589                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
590                 GNUNET_i2s (&n->id), n, old, new, line);
591     GNUNET_free (old);
592     GNUNET_free (new);
593   }
594   n->state = state;
595
596   switch (n->state)
597   {
598   case S_FAST_RECONNECT:
599   case S_CONNECT_RECV:
600   case S_CONNECT_SENT:
601     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
602       GNUNET_SCHEDULER_cancel (n->state_reset);
603     n->state_reset =
604         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
605     break;
606   case S_CONNECTED:
607   case S_NOT_CONNECTED:
608   case S_DISCONNECT:
609     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
610     {
611 #if DEBUG_TRANSPORT
612       char *old = GNUNET_strdup (print_state (n->state));
613       char *new = GNUNET_strdup (print_state (state));
614
615       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
617                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
618       GNUNET_free (old);
619       GNUNET_free (new);
620 #endif
621       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
622       GNUNET_SCHEDULER_cancel (n->state_reset);
623       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
624     }
625     break;
626
627   default:
628     GNUNET_assert (0);
629   }
630
631   if (NULL != address_change_cb)
632   {
633     if (n->state == S_CONNECTED)
634       address_change_cb (callback_cls, &n->id, n->address);
635     else if (previous_state == S_CONNECTED)
636       address_change_cb (callback_cls, &n->id, NULL);
637   }
638
639   return GNUNET_OK;
640 }
641
642 static ssize_t
643 send_with_session (struct NeighbourMapEntry *n,
644                    const char *msgbuf, size_t msgbuf_size,
645                    uint32_t priority,
646                    struct GNUNET_TIME_Relative timeout,
647                    GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
648 {
649   struct GNUNET_TRANSPORT_PluginFunctions *papi;
650   size_t ret = GNUNET_SYSERR;
651
652   GNUNET_assert (n != NULL);
653   GNUNET_assert (n->session != NULL);
654
655   papi = GST_plugins_find (n->address->transport_name);
656   if (papi == NULL)
657   {
658     if (cont != NULL)
659       cont (cont_cls, &n->id, GNUNET_SYSERR);
660     return GNUNET_SYSERR;
661   }
662
663   ret = papi->send (papi->cls,
664                    n->session,
665                    msgbuf, msgbuf_size,
666                    0,
667                    timeout,
668                    cont, cont_cls);
669
670   if ((ret == -1) && (cont != NULL))
671       cont (cont_cls, &n->id, GNUNET_SYSERR);
672   return ret;
673 }
674
675 /**
676  * Task invoked to start a transmission to another peer.
677  *
678  * @param cls the 'struct NeighbourMapEntry'
679  * @param tc scheduler context
680  */
681 static void
682 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
683
684
685 /**
686  * We're done with our transmission attempt, continue processing.
687  *
688  * @param cls the 'struct MessageQueue' of the message
689  * @param receiver intended receiver
690  * @param success whether it worked or not
691  */
692 static void
693 transmit_send_continuation (void *cls,
694                             const struct GNUNET_PeerIdentity *receiver,
695                             int success)
696 {
697   struct MessageQueue *mq = cls;
698   struct NeighbourMapEntry *n;
699   struct NeighbourMapEntry *tmp;
700
701   tmp = lookup_neighbour (receiver);
702   n = mq->n;
703   if ((NULL != n) && (tmp != NULL) && (tmp == n))
704   {
705     GNUNET_assert (n->is_active == mq);
706     n->is_active = NULL;
707     if (success == GNUNET_YES)
708     {
709       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
710       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
711     }
712   }
713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
714               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
715               (success == GNUNET_OK) ? "successful" : "FAILED");
716   if (NULL != mq->cont)
717     mq->cont (mq->cont_cls, success);
718   GNUNET_free (mq);
719 }
720
721
722 /**
723  * Check the ready list for the given neighbour and if a plugin is
724  * ready for transmission (and if we have a message), do so!
725  *
726  * @param n target peer for which to transmit
727  */
728 static void
729 try_transmission_to_peer (struct NeighbourMapEntry *n)
730 {
731   struct MessageQueue *mq;
732   struct GNUNET_TIME_Relative timeout;
733   ssize_t ret;
734
735   if (n->is_active != NULL)
736   {
737     GNUNET_break (0);
738     return;                     /* transmission already pending */
739   }
740   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
741   {
742     GNUNET_break (0);
743     return;                     /* currently waiting for bandwidth */
744   }
745   while (NULL != (mq = n->messages_head))
746   {
747     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
748     if (timeout.rel_value > 0)
749       break;
750     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
751     n->is_active = mq;
752     mq->n = n;
753     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
754   }
755   if (NULL == mq)
756     return;                     /* no more messages */
757
758   if (n->address == NULL)
759   {
760     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
761                 GNUNET_i2s (&n->id));
762     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
763     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
764     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
765     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
766     return;
767   }
768
769   if (GST_plugins_find (n->address->transport_name) == NULL)
770   {
771     GNUNET_break (0);
772     return;
773   }
774   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
775   n->is_active = mq;
776   mq->n = n;
777
778   if ((n->address->address_length == 0) && (n->session == NULL))
779   {
780     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
781                 GNUNET_i2s (&n->id));
782     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
783     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
784     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
785     return;
786   }
787
788   ret = send_with_session(n,
789               mq->message_buf, mq->message_buf_size,
790               0, timeout,
791               &transmit_send_continuation, mq);
792
793   if (ret == -1)
794   {
795     /* failure, but 'send' would not call continuation in this case,
796      * so we need to do it here! */
797     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
798   }
799
800 }
801
802
803 /**
804  * Task invoked to start a transmission to another peer.
805  *
806  * @param cls the 'struct NeighbourMapEntry'
807  * @param tc scheduler context
808  */
809 static void
810 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
811 {
812   struct NeighbourMapEntry *n = cls;
813
814   GNUNET_assert (NULL != lookup_neighbour (&n->id));
815   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
816   try_transmission_to_peer (n);
817 }
818
819
820 /**
821  * Initialize the neighbours subsystem.
822  *
823  * @param cls closure for callbacks
824  * @param connect_cb function to call if we connect to a peer
825  * @param disconnect_cb function to call if we disconnect from a peer
826  * @param peer_address_cb function to call if we change an active address
827  *                   of a neighbour
828  */
829 void
830 GST_neighbours_start (void *cls,
831                       GNUNET_TRANSPORT_NotifyConnect connect_cb,
832                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
833                       GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
834 {
835   callback_cls = cls;
836   connect_notify_cb = connect_cb;
837   disconnect_notify_cb = disconnect_cb;
838   address_change_cb = peer_address_cb;
839   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
840   neighbours_connected = 0;
841 }
842
843
844 static void
845 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
846                       int result)
847 {
848   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849               "Sending DISCONNECT message to peer `%4s': %i\n",
850               GNUNET_i2s (target), result);
851 }
852
853
854 static int
855 send_disconnect (struct NeighbourMapEntry * n)
856 {
857   size_t ret;
858   struct SessionDisconnectMessage disconnect_msg;
859
860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861               "Sending DISCONNECT message to peer `%4s'\n",
862               GNUNET_i2s (&n->id));
863   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
864   disconnect_msg.header.type =
865       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
866   disconnect_msg.reserved = htonl (0);
867   disconnect_msg.purpose.size =
868       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
869              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
870              sizeof (struct GNUNET_TIME_AbsoluteNBO));
871   disconnect_msg.purpose.purpose =
872       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
873   disconnect_msg.timestamp =
874       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
875   disconnect_msg.public_key = GST_my_public_key;
876   GNUNET_assert (GNUNET_OK ==
877                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
878                                          &disconnect_msg.purpose,
879                                          &disconnect_msg.signature));
880
881   ret = send_with_session (n,
882             (const char *) &disconnect_msg, sizeof (disconnect_msg),
883             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
884             &send_disconnect_cont, NULL);
885
886   if (ret == GNUNET_SYSERR)
887     return GNUNET_SYSERR;
888
889   GNUNET_STATISTICS_update (GST_stats,
890                             gettext_noop
891                             ("# peers disconnected due to external request"), 1,
892                             GNUNET_NO);
893   return GNUNET_OK;
894 }
895
896
897 /**
898  * Disconnect from the given neighbour, clean up the record.
899  *
900  * @param n neighbour to disconnect from
901  */
902 static void
903 disconnect_neighbour (struct NeighbourMapEntry *n)
904 {
905   struct MessageQueue *mq;
906   int previous_state;
907
908   previous_state = n->state;
909
910   if (is_disconnecting (n))
911     return;
912
913   /* send DISCONNECT MESSAGE */
914   if (previous_state == S_CONNECTED)
915   {
916     if (GNUNET_OK == send_disconnect (n))
917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
918                   GNUNET_i2s (&n->id));
919     else
920       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921                   "Could not send DISCONNECT_MSG to `%s'\n",
922                   GNUNET_i2s (&n->id));
923   }
924
925   change_state (n, S_DISCONNECT);
926
927   if (previous_state == S_CONNECTED)
928   {
929     GNUNET_assert (NULL != n->address);
930     if (n->address_state == USED)
931     {
932       GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__);
933       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
934       n->address_state = UNUSED;
935     }
936   }
937
938   if (n->address != NULL)
939   {
940     struct GNUNET_TRANSPORT_PluginFunctions *papi;
941
942     papi = GST_plugins_find (n->address->transport_name);
943     if (papi != NULL)
944       papi->disconnect (papi->cls, &n->id);
945   }
946   while (NULL != (mq = n->messages_head))
947   {
948     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
949     if (NULL != mq->cont)
950       mq->cont (mq->cont_cls, GNUNET_SYSERR);
951     GNUNET_free (mq);
952   }
953   if (NULL != n->is_active)
954   {
955     n->is_active->n = NULL;
956     n->is_active = NULL;
957   }
958
959   switch (previous_state)
960   {
961   case S_CONNECTED:
962     GNUNET_assert (neighbours_connected > 0);
963     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
964     GNUNET_SCHEDULER_cancel (n->keepalive_task);
965     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
966     n->expect_latency_response = GNUNET_NO;
967     neighbours_connected--;
968     GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
969                               GNUNET_NO);
970     disconnect_notify_cb (callback_cls, &n->id);
971     break;
972   case S_FAST_RECONNECT:
973     GNUNET_STATISTICS_update (GST_stats,
974                               gettext_noop ("# fast reconnects failed"), 1,
975                               GNUNET_NO);
976     disconnect_notify_cb (callback_cls, &n->id);
977     break;
978   default:
979     break;
980   }
981
982   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
983
984   GNUNET_assert (GNUNET_YES ==
985                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
986                                                        &n->id.hashPubKey, n));
987   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
988   {
989     GNUNET_SCHEDULER_cancel (n->ats_suggest);
990     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
991   }
992   if (n->ats_request != GNUNET_SCHEDULER_NO_TASK)
993   {
994     GNUNET_SCHEDULER_cancel (n->ats_request);
995     n->ats_request = GNUNET_SCHEDULER_NO_TASK;
996   }
997
998   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
999   {
1000     GNUNET_SCHEDULER_cancel (n->timeout_task);
1001     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1002   }
1003   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
1004   {
1005     GNUNET_SCHEDULER_cancel (n->transmission_task);
1006     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
1007   }
1008   if (NULL != n->fast_reconnect_address)
1009   {
1010     GNUNET_HELLO_address_free (n->fast_reconnect_address);
1011     n->fast_reconnect_address = NULL;
1012   }
1013   if (NULL != n->address)
1014   {
1015     GNUNET_HELLO_address_free (n->address);
1016     n->address = NULL;
1017   }
1018   n->session = NULL;
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
1020               GNUNET_i2s (&n->id), n);
1021   GNUNET_free (n);
1022 }
1023
1024
1025 /**
1026  * Peer has been idle for too long. Disconnect.
1027  *
1028  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1029  * @param tc scheduler context
1030  */
1031 static void
1032 neighbour_timeout_task (void *cls,
1033                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1034 {
1035   struct NeighbourMapEntry *n = cls;
1036
1037   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1038   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer`%4s' disconnected due to timeout\n",
1039               GNUNET_i2s (&n->id));
1040
1041   GNUNET_STATISTICS_update (GST_stats,
1042                             gettext_noop
1043                             ("# peers disconnected due to timeout"), 1,
1044                             GNUNET_NO);
1045   disconnect_neighbour (n);
1046 }
1047
1048
1049 /**
1050  * Send another keepalive message.
1051  *
1052  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1053  * @param tc scheduler context
1054  */
1055 static void
1056 neighbour_keepalive_task (void *cls,
1057                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1058 {
1059   struct NeighbourMapEntry *n = cls;
1060   struct GNUNET_MessageHeader m;
1061   int ret;
1062
1063   GNUNET_assert (S_CONNECTED == n->state);
1064   n->keepalive_task =
1065       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1066                                     &neighbour_keepalive_task, n);
1067
1068   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1069                             GNUNET_NO);
1070   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1071   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1072
1073   ret = send_with_session (n,
1074             (const void *) &m, sizeof (m),
1075             UINT32_MAX /* priority */ ,
1076             GNUNET_TIME_UNIT_FOREVER_REL,
1077             NULL, NULL);
1078
1079   n->expect_latency_response = GNUNET_NO;
1080   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1081   if (ret != GNUNET_SYSERR)
1082   {
1083     n->expect_latency_response = GNUNET_YES;
1084     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1085   }
1086
1087 }
1088
1089
1090 /**
1091  * Disconnect from the given neighbour.
1092  *
1093  * @param cls unused
1094  * @param key hash of neighbour's public key (not used)
1095  * @param value the 'struct NeighbourMapEntry' of the neighbour
1096  */
1097 static int
1098 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1099 {
1100   struct NeighbourMapEntry *n = value;
1101
1102   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1103               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1104   if (S_CONNECTED == n->state)
1105     GNUNET_STATISTICS_update (GST_stats,
1106                               gettext_noop
1107                               ("# peers disconnected due to global disconnect"),
1108                               1, GNUNET_NO);
1109   disconnect_neighbour (n);
1110   return GNUNET_OK;
1111 }
1112
1113
1114 static void
1115 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1116 {
1117   struct NeighbourMapEntry *n = cls;
1118
1119   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1120
1121   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1122               "ATS did not suggested address to connect to peer `%s'\n",
1123               GNUNET_i2s (&n->id));
1124
1125   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# ATS address suggestions failed"), 1,
1126                             GNUNET_NO);
1127
1128
1129   disconnect_neighbour (n);
1130 }
1131
1132 /**
1133  * Cleanup the neighbours subsystem.
1134  */
1135 void
1136 GST_neighbours_stop ()
1137 {
1138   // This can happen during shutdown
1139   if (neighbours == NULL)
1140   {
1141     return;
1142   }
1143
1144   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1145                                          NULL);
1146   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1147 //  GNUNET_assert (neighbours_connected == 0);
1148   neighbours = NULL;
1149   callback_cls = NULL;
1150   connect_notify_cb = NULL;
1151   disconnect_notify_cb = NULL;
1152   address_change_cb = NULL;
1153 }
1154
1155 struct ContinutionContext
1156 {
1157   struct GNUNET_HELLO_Address *address;
1158
1159   struct Session *session;
1160 };
1161
1162 static void
1163 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1164                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1165 {
1166   struct QuotaSetMessage q_msg;
1167
1168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1169               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1170               ntohl (quota.value__), GNUNET_i2s (target));
1171   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1172   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1173   q_msg.quota = quota;
1174   q_msg.peer = (*target);
1175   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1176 }
1177
1178 /**
1179  * We tried to send a SESSION_CONNECT message to another peer.  If this
1180  * succeeded, we change the state.  If it failed, we should tell
1181  * ATS to not use this address anymore (until it is re-validated).
1182  *
1183  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1184  * @param target peer to send the message to
1185  * @param success GNUNET_OK on success
1186  */
1187 static void
1188 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1189                            int success)
1190 {
1191   struct ContinutionContext *cc = cls;
1192   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1193
1194   if (GNUNET_YES != success)
1195   {
1196     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1197     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1198   }
1199   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1200   {
1201     GNUNET_HELLO_address_free (cc->address);
1202     GNUNET_free (cc);
1203     return;
1204   }
1205
1206   if ((GNUNET_YES == success) &&
1207       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1208   {
1209     change_state (n, S_CONNECT_SENT);
1210     GNUNET_HELLO_address_free (cc->address);
1211     GNUNET_free (cc);
1212     return;
1213   }
1214
1215   if ((GNUNET_NO == success) &&
1216       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1217   {
1218     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1219                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1220                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1221     change_state (n, S_NOT_CONNECTED);
1222     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1223       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1224     n->ats_suggest =
1225         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1226                                       n);
1227     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1228   }
1229   GNUNET_HELLO_address_free (cc->address);
1230   GNUNET_free (cc);
1231 }
1232
1233
1234 static void
1235 ats_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1236 {
1237   struct NeighbourMapEntry *n = cls;
1238   n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1239
1240   n->ats_suggest =
1241     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1242                                   n);
1243 }
1244
1245
1246 /**
1247  * We tried to switch addresses with an peer already connected. If it failed,
1248  * we should tell ATS to not use this address anymore (until it is re-validated).
1249  *
1250  * @param cls the 'struct NeighbourMapEntry'
1251  * @param target peer to send the message to
1252  * @param success GNUNET_OK on success
1253  */
1254 static void
1255 send_switch_address_continuation (void *cls,
1256                                   const struct GNUNET_PeerIdentity *target,
1257                                   int success)
1258 {
1259   struct ContinutionContext *cc = cls;
1260   struct NeighbourMapEntry *n;
1261
1262   if (neighbours == NULL)
1263   {
1264     GNUNET_HELLO_address_free (cc->address);
1265     GNUNET_free (cc);
1266     return;                     /* neighbour is going away */
1267   }
1268
1269   n = lookup_neighbour (&cc->address->peer);
1270   if ((n == NULL) || (is_disconnecting (n)))
1271   {
1272     GNUNET_HELLO_address_free (cc->address);
1273     GNUNET_free (cc);
1274     return;                     /* neighbour is going away */
1275   }
1276
1277   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1278   if (GNUNET_YES != success)
1279   {
1280
1281     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1282                 "Failed to switch connected peer `%s' in state %s to address '%s' session %X, asking ATS for new address \n",
1283                 GNUNET_i2s (&n->id), print_state(n->state), GST_plugins_a2s (cc->address), cc->session);
1284
1285     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1286     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1287
1288     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1289     {
1290       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1291       n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1292     }
1293
1294     if (n->state == S_FAST_RECONNECT)
1295     {
1296       if (GNUNET_SCHEDULER_NO_TASK == n->ats_request)
1297       {
1298         /* Throtteled ATS request for fast reconnect */
1299         struct GNUNET_TIME_Relative delay = GNUNET_TIME_relative_multiply(FAST_RECONNECT_RATE, n->fast_reconnect_attempts);
1300         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1301                     "Fast reconnect attempt %u failed, delay ATS request for %llu ms\n", n->fast_reconnect_attempts, delay.rel_value);
1302         n->ats_request =
1303             GNUNET_SCHEDULER_add_delayed (delay,
1304                                           ats_request,
1305                                           n);
1306       }
1307     }
1308     else
1309     {
1310       /* Immediate ATS request for connected peers */
1311       n->ats_suggest =
1312         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1313                                       n);
1314       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1315     }
1316     GNUNET_HELLO_address_free (cc->address);
1317     GNUNET_free (cc);
1318     return;
1319   }
1320   /* Tell ATS that switching addresses was successful */
1321   switch (n->state)
1322   {
1323   case S_CONNECTED:
1324     if (n->address_state == FRESH)
1325     {
1326       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1327       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1328       GNUNET_break (cc->session == n->session);
1329       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1330       n->address_state = USED;
1331     }
1332     break;
1333   case S_FAST_RECONNECT:
1334     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1335                 "Successful fast reconnect to peer `%s'\n",
1336                 GNUNET_i2s (&n->id));
1337     change_state (n, S_CONNECTED);
1338     neighbours_connected++;
1339     GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1340                               GNUNET_NO);
1341
1342     if (n->address_state == FRESH)
1343     {
1344       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1345       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1346       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1347       n->address_state = USED;
1348     }
1349
1350     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1351       n->keepalive_task =
1352           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1353
1354     /* Updating quotas */
1355     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1356     send_outbound_quota (target, n->bandwidth_out);
1357
1358   default:
1359     break;
1360   }
1361   GNUNET_HELLO_address_free (cc->address);
1362   GNUNET_free (cc);
1363 }
1364
1365
1366 /**
1367  * We tried to send a SESSION_CONNECT message to another peer.  If this
1368  * succeeded, we change the state.  If it failed, we should tell
1369  * ATS to not use this address anymore (until it is re-validated).
1370  *
1371  * @param cls the 'struct NeighbourMapEntry'
1372  * @param target peer to send the message to
1373  * @param success GNUNET_OK on success
1374  */
1375 static void
1376 send_connect_ack_continuation (void *cls,
1377                                const struct GNUNET_PeerIdentity *target,
1378                                int success)
1379 {
1380   struct ContinutionContext *cc = cls;
1381   struct NeighbourMapEntry *n;
1382
1383   if (neighbours == NULL)
1384   {
1385     GNUNET_HELLO_address_free (cc->address);
1386     GNUNET_free (cc);
1387     return;                     /* neighbour is going away */
1388   }
1389
1390   n = lookup_neighbour (&cc->address->peer);
1391   if ((n == NULL) || (is_disconnecting (n)))
1392   {
1393     GNUNET_HELLO_address_free (cc->address);
1394     GNUNET_free (cc);
1395     return;                     /* neighbour is going away */
1396   }
1397
1398   if (GNUNET_YES == success)
1399   {
1400     GNUNET_HELLO_address_free (cc->address);
1401     GNUNET_free (cc);
1402     return;                     /* sending successful */
1403   }
1404
1405   /* sending failed, ask for next address  */
1406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1408               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1409   if (n->state != S_NOT_CONNECTED)
1410     change_state (n, S_NOT_CONNECTED);
1411   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1412   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1413
1414   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1415     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1416   n->ats_suggest =
1417       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1418                                     n);
1419   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1420   GNUNET_HELLO_address_free (cc->address);
1421   GNUNET_free (cc);
1422 }
1423
1424
1425 /**
1426  * For an existing neighbour record, set the active connection to
1427  * use the given address.
1428  *
1429  * @param peer identity of the peer to switch the address for
1430  * @param address address of the other peer, NULL if other peer
1431  *                       connected to us
1432  * @param session session to use (or NULL)
1433  * @param ats performance data
1434  * @param ats_count number of entries in ats
1435  * @param bandwidth_in inbound quota to be used when connection is up
1436  * @param bandwidth_out outbound quota to be used when connection is up
1437  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1438  *         connection is not up (yet)
1439  */
1440 int
1441 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
1442                                        const struct GNUNET_HELLO_Address
1443                                        *address,
1444                                        struct Session *session,
1445                                        const struct GNUNET_ATS_Information *ats,
1446                                        uint32_t ats_count,
1447                                        struct GNUNET_BANDWIDTH_Value32NBO
1448                                        bandwidth_in,
1449                                        struct GNUNET_BANDWIDTH_Value32NBO
1450                                        bandwidth_out)
1451 {
1452   struct NeighbourMapEntry *n;
1453   struct SessionConnectMessage connect_msg;
1454   struct ContinutionContext *cc;
1455   size_t msg_len;
1456   size_t ret;
1457
1458   if (neighbours == NULL)
1459   {
1460     /* This can happen during shutdown */
1461     return GNUNET_NO;
1462   }
1463   n = lookup_neighbour (peer);
1464   if (NULL == n)
1465     return GNUNET_NO;
1466   if (n->state == S_DISCONNECT)
1467   {
1468     /* We are disconnecting, nothing to do here */
1469     return GNUNET_NO;
1470   }
1471   GNUNET_assert (address->transport_name != NULL);
1472   if ((session == NULL) && (0 == address->address_length))
1473   {
1474     GNUNET_break_op (0);
1475     /* FIXME: is this actually possible? When does this happen? */
1476     if (strlen (address->transport_name) > 0)
1477       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1478     GNUNET_ATS_suggest_address (GST_ats, peer);
1479     return GNUNET_NO;
1480   }
1481
1482   /* checks successful and neighbour != NULL */
1483   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1484               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1485               (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
1486               session,
1487               GNUNET_i2s (peer),
1488               print_state (n->state));
1489
1490   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1491   {
1492     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1493     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1494   }
1495
1496
1497   if (n->state == S_FAST_RECONNECT)
1498   {
1499     /* Throtteled fast reconnect */
1500
1501     if (NULL == n->fast_reconnect_address)
1502     {
1503       n->fast_reconnect_address = GNUNET_HELLO_address_copy (address);
1504
1505     }
1506     else if (0 == GNUNET_HELLO_address_cmp(address, n->fast_reconnect_address))
1507     {
1508       n->fast_reconnect_attempts ++;
1509
1510       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1511                 "FAST RECONNECT to peer `%s' and address '%s' with identical address attempt %u\n",
1512                 GNUNET_i2s (&n->id), GST_plugins_a2s (address), n->fast_reconnect_attempts);
1513     }
1514   }
1515   else
1516   {
1517     n->fast_reconnect_attempts = 0;
1518   }
1519
1520   /* do not switch addresses just update quotas */
1521   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1522       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1523       (n->session == session))
1524   {
1525     n->bandwidth_in = bandwidth_in;
1526     n->bandwidth_out = bandwidth_out;
1527     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1528     send_outbound_quota (peer, n->bandwidth_out);
1529     return GNUNET_NO;
1530   }
1531   if (n->state == S_CONNECTED)
1532   {
1533     /* mark old address as no longer used */
1534     GNUNET_assert (NULL != n->address);
1535     if (n->address_state == USED)
1536     {
1537       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1538       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1539       n->address_state = UNUSED;
1540     }
1541   }
1542
1543   /* set new address */
1544   if (NULL != n->address)
1545     GNUNET_HELLO_address_free (n->address);
1546   n->address = GNUNET_HELLO_address_copy (address);
1547   n->address_state = FRESH;
1548   n->bandwidth_in = bandwidth_in;
1549   n->bandwidth_out = bandwidth_out;
1550   GNUNET_SCHEDULER_cancel (n->timeout_task);
1551   n->timeout_task =
1552       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1553                                     &neighbour_timeout_task, n);
1554
1555   if (NULL != address_change_cb && n->state == S_CONNECTED)
1556     address_change_cb (callback_cls, &n->id, n->address); 
1557
1558   /* Obtain an session for this address from plugin */
1559   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1560   papi = GST_plugins_find (address->transport_name);
1561
1562   if (papi == NULL)
1563   {
1564     /* we don't have the plugin for this address */
1565     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1566
1567     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1568       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1569     n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1570                                       ats_suggest_cancel,
1571                                       n);
1572     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1573     GNUNET_HELLO_address_free (n->address);
1574     n->address = NULL;
1575     n->session = NULL;
1576     return GNUNET_NO;
1577   }
1578
1579   if (session == NULL)
1580   {
1581     n->session = papi->get_session (papi->cls, address);
1582     /* Session could not be initiated */
1583     if (n->session == NULL)
1584     {
1585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1586                   "Failed to obtain new session %p for peer `%s' and  address '%s'\n",
1587                   n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1588
1589       GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1590
1591       if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1592         GNUNET_SCHEDULER_cancel (n->ats_suggest);
1593       n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1594                                         ats_suggest_cancel,
1595                                         n);
1596       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1597       GNUNET_HELLO_address_free (n->address);
1598       n->address = NULL;
1599       n->session = NULL;
1600       return GNUNET_NO;
1601     }
1602
1603     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604                 "Obtained new session %p for peer `%s' and  address '%s'\n",
1605                  n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1606     /* Telling ATS about new session */
1607     GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0);
1608   }
1609   else
1610   {
1611     n->session = session;
1612     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1613                 "Using existing session %p for peer `%s' and  address '%s'\n",
1614                 n->session,
1615                 GNUNET_i2s (&n->id),
1616                 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>");
1617   }
1618
1619   switch (n->state)
1620   {
1621   case S_NOT_CONNECTED:
1622   case S_CONNECT_SENT:
1623     msg_len = sizeof (struct SessionConnectMessage);
1624     connect_msg.header.size = htons (msg_len);
1625     connect_msg.header.type =
1626         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1627     connect_msg.reserved = htonl (0);
1628     connect_msg.timestamp =
1629         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1630
1631     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1632     cc->session = n->session;
1633     cc->address = GNUNET_HELLO_address_copy (address);
1634
1635     ret = send_with_session (n,
1636       (const char *) &connect_msg, msg_len,
1637       UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1638       &send_connect_continuation, cc);
1639
1640     return GNUNET_NO;
1641   case S_CONNECT_RECV:
1642     /* We received a CONNECT message and asked ATS for an address */
1643     msg_len = sizeof (struct SessionConnectMessage);
1644     connect_msg.header.size = htons (msg_len);
1645     connect_msg.header.type =
1646         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1647     connect_msg.reserved = htonl (0);
1648     connect_msg.timestamp =
1649         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1650     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1651     cc->session = n->session;
1652     cc->address = GNUNET_HELLO_address_copy (address);
1653
1654     ret = send_with_session(n,
1655                             (const void *) &connect_msg, msg_len,
1656                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1657                             &send_connect_ack_continuation,
1658                             cc);
1659     return GNUNET_NO;
1660   case S_CONNECTED:
1661   case S_FAST_RECONNECT:
1662     /* connected peer is switching addresses or tries fast reconnect */
1663     msg_len = sizeof (struct SessionConnectMessage);
1664     connect_msg.header.size = htons (msg_len);
1665     connect_msg.header.type =
1666         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1667     connect_msg.reserved = htonl (0);
1668     connect_msg.timestamp =
1669         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1670     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1671     cc->session = n->session;
1672     cc->address = GNUNET_HELLO_address_copy (address);
1673     ret = send_with_session(n,
1674                             (const void *) &connect_msg, msg_len,
1675                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1676                             &send_switch_address_continuation, cc);
1677     if (ret == GNUNET_SYSERR)
1678     {
1679       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1680                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1681                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1682     }
1683     return GNUNET_NO;
1684   default:
1685     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1686                 "Invalid connection state to switch addresses %u \n", n->state);
1687     GNUNET_break_op (0);
1688     return GNUNET_NO;
1689   }
1690 }
1691
1692
1693 /**
1694  * Obtain current latency information for the given neighbour.
1695  *
1696  * @param peer
1697  * @return observed latency of the address, FOREVER if the address was
1698  *         never successfully validated
1699  */
1700 struct GNUNET_TIME_Relative
1701 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1702 {
1703   struct NeighbourMapEntry *n;
1704
1705   n = lookup_neighbour (peer);
1706   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1707     return GNUNET_TIME_UNIT_FOREVER_REL;
1708
1709   return n->latency;
1710 }
1711
1712 /**
1713  * Obtain current address information for the given neighbour.
1714  *
1715  * @param peer
1716  * @return address currently used
1717  */
1718 struct GNUNET_HELLO_Address *
1719 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1720 {
1721   struct NeighbourMapEntry *n;
1722
1723   n = lookup_neighbour (peer);
1724   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1725     return NULL;
1726
1727   return n->address;
1728 }
1729
1730
1731
1732 /**
1733  * Create an entry in the neighbour map for the given peer
1734  *
1735  * @param peer peer to create an entry for
1736  * @return new neighbour map entry
1737  */
1738 static struct NeighbourMapEntry *
1739 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1740 {
1741   struct NeighbourMapEntry *n;
1742
1743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1744               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1745   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1746   n->id = *peer;
1747   n->state = S_NOT_CONNECTED;
1748   n->latency = GNUNET_TIME_relative_get_forever ();
1749   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1750                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1751                                  MAX_BANDWIDTH_CARRY_S);
1752   n->timeout_task =
1753       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1754                                     &neighbour_timeout_task, n);
1755   GNUNET_assert (GNUNET_OK ==
1756                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1757                                                     &n->id.hashPubKey, n,
1758                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1759   return n;
1760 }
1761
1762
1763 /**
1764  * Try to create a connection to the given target (eventually).
1765  *
1766  * @param target peer to try to connect to
1767  */
1768 void
1769 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1770 {
1771   struct NeighbourMapEntry *n;
1772
1773   // This can happen during shutdown
1774   if (neighbours == NULL)
1775   {
1776     return;
1777   }
1778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1779               GNUNET_i2s (target));
1780   if (0 ==
1781       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1782   {
1783     /* my own hello */
1784     return;
1785   }
1786   n = lookup_neighbour (target);
1787
1788   if (NULL != n)
1789   {
1790     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1791       return;                   /* already connecting or connected */
1792     if (is_disconnecting (n))
1793       change_state (n, S_NOT_CONNECTED);
1794   }
1795
1796
1797   if (n == NULL)
1798     n = setup_neighbour (target);
1799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1800               "Asking ATS for suggested address to connect to peer `%s'\n",
1801               GNUNET_i2s (&n->id));
1802   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1803 }
1804
1805 /**
1806  * Test if we're connected to the given peer.
1807  *
1808  * @param target peer to test
1809  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1810  */
1811 int
1812 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1813 {
1814   struct NeighbourMapEntry *n;
1815
1816   // This can happen during shutdown
1817   if (neighbours == NULL)
1818   {
1819     return GNUNET_NO;
1820   }
1821
1822   n = lookup_neighbour (target);
1823
1824   if ((NULL == n) || (S_CONNECTED != n->state))
1825     return GNUNET_NO;           /* not connected */
1826   return GNUNET_YES;
1827 }
1828
1829 /**
1830  * A session was terminated. Take note.
1831  *
1832  * @param peer identity of the peer where the session died
1833  * @param session session that is gone
1834  */
1835 void
1836 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1837                                    struct Session *session)
1838 {
1839   struct NeighbourMapEntry *n;
1840
1841   if (neighbours == NULL)
1842   {
1843     /* This can happen during shutdown */
1844     return;
1845   }
1846   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1847               session, GNUNET_i2s (peer));
1848   n = lookup_neighbour (peer);
1849   if (NULL == n)
1850     return;
1851   if (session != n->session)
1852     return;                     /* doesn't affect us */
1853   if (n->state == S_CONNECTED)
1854   {
1855     if (n->address_state == USED)
1856     {
1857       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1858       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1859       n->address_state = UNUSED;
1860
1861     }
1862   }
1863
1864   if (NULL != n->address)
1865   {
1866     GNUNET_HELLO_address_free (n->address);
1867     n->address = NULL;
1868   }
1869   n->session = NULL;
1870
1871   /* not connected anymore anyway, shouldn't matter */
1872   if (S_CONNECTED != n->state)
1873     return;
1874
1875   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1876   {
1877     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1878     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1879     n->expect_latency_response = GNUNET_NO;
1880   }
1881
1882   /* connected, try fast reconnect */
1883   /* statistics "transport" : "# peers connected" -= 1
1884    * neighbours_connected -= 1
1885    * BUT: no disconnect_cb to notify clients about disconnect
1886    */
1887
1888   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1889               GNUNET_i2s (peer));
1890
1891   GNUNET_assert (neighbours_connected > 0);
1892   change_state (n, S_FAST_RECONNECT);
1893   neighbours_connected--;
1894   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1895                             GNUNET_NO);
1896
1897
1898   /* We are connected, so ask ATS to switch addresses */
1899   GNUNET_SCHEDULER_cancel (n->timeout_task);
1900   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1901                                     &neighbour_timeout_task, n);
1902   /* try QUICKLY to re-establish a connection, reduce timeout! */
1903   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1904     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1905   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1906                                     &ats_suggest_cancel,
1907                                     n);
1908   GNUNET_ATS_suggest_address (GST_ats, peer);
1909 }
1910
1911
1912 /**
1913  * Transmit a message to the given target using the active connection.
1914  *
1915  * @param target destination
1916  * @param msg message to send
1917  * @param msg_size number of bytes in msg
1918  * @param timeout when to fail with timeout
1919  * @param cont function to call when done
1920  * @param cont_cls closure for 'cont'
1921  */
1922 void
1923 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1924                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1925                      GST_NeighbourSendContinuation cont, void *cont_cls)
1926 {
1927   struct NeighbourMapEntry *n;
1928   struct MessageQueue *mq;
1929
1930   // This can happen during shutdown
1931   if (neighbours == NULL)
1932   {
1933     return;
1934   }
1935
1936   n = lookup_neighbour (target);
1937   if ((n == NULL) || (!is_connected (n)))
1938   {
1939     GNUNET_STATISTICS_update (GST_stats,
1940                               gettext_noop
1941                               ("# messages not sent (no such peer or not connected)"),
1942                               1, GNUNET_NO);
1943     if (n == NULL)
1944       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1945                   "Could not send message to peer `%s': unknown neighbour",
1946                   GNUNET_i2s (target));
1947     else if (!is_connected (n))
1948       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1949                   "Could not send message to peer `%s': not connected\n",
1950                   GNUNET_i2s (target));
1951     if (NULL != cont)
1952       cont (cont_cls, GNUNET_SYSERR);
1953     return;
1954   }
1955
1956   if ((n->session == NULL) && (n->address == NULL))
1957   {
1958     GNUNET_STATISTICS_update (GST_stats,
1959                               gettext_noop
1960                               ("# messages not sent (no such peer or not connected)"),
1961                               1, GNUNET_NO);
1962     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1963                 "Could not send message to peer `%s': no address available\n",
1964                 GNUNET_i2s (target));
1965     if (NULL != cont)
1966       cont (cont_cls, GNUNET_SYSERR);
1967     return;
1968   }
1969
1970   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1971   GNUNET_STATISTICS_update (GST_stats,
1972                             gettext_noop
1973                             ("# bytes in message queue for other peers"),
1974                             msg_size, GNUNET_NO);
1975   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1976   mq->cont = cont;
1977   mq->cont_cls = cont_cls;
1978   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1979   memcpy (&mq[1], msg, msg_size);
1980   mq->message_buf = (const char *) &mq[1];
1981   mq->message_buf_size = msg_size;
1982   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1983   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1984
1985   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1986       (NULL == n->is_active))
1987     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1988 }
1989
1990
1991 /**
1992  * We have received a message from the given sender.  How long should
1993  * we delay before receiving more?  (Also used to keep the peer marked
1994  * as live).
1995  *
1996  * @param sender sender of the message
1997  * @param size size of the message
1998  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1999  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
2000  *                   GNUNET_SYSERR if the connection is not fully up yet
2001  * @return how long to wait before reading more from this sender
2002  */
2003 struct GNUNET_TIME_Relative
2004 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
2005                                         *sender, ssize_t size, int *do_forward)
2006 {
2007   struct NeighbourMapEntry *n;
2008   struct GNUNET_TIME_Relative ret;
2009
2010   // This can happen during shutdown
2011   if (neighbours == NULL)
2012   {
2013     return GNUNET_TIME_UNIT_FOREVER_REL;
2014   }
2015
2016   n = lookup_neighbour (sender);
2017   if (n == NULL)
2018   {
2019     GST_neighbours_try_connect (sender);
2020     n = lookup_neighbour (sender);
2021     if (NULL == n)
2022     {
2023       GNUNET_STATISTICS_update (GST_stats,
2024                                 gettext_noop
2025                                 ("# messages discarded due to lack of neighbour record"),
2026                                 1, GNUNET_NO);
2027       *do_forward = GNUNET_NO;
2028       return GNUNET_TIME_UNIT_ZERO;
2029     }
2030   }
2031   if (!is_connected (n))
2032   {
2033     *do_forward = GNUNET_SYSERR;
2034     return GNUNET_TIME_UNIT_ZERO;
2035   }
2036   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
2037   {
2038     n->quota_violation_count++;
2039     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2040                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
2041                 n->in_tracker.available_bytes_per_s__,
2042                 n->quota_violation_count);
2043     /* Discount 32k per violation */
2044     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
2045   }
2046   else
2047   {
2048     if (n->quota_violation_count > 0)
2049     {
2050       /* try to add 32k back */
2051       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
2052       n->quota_violation_count--;
2053     }
2054   }
2055   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2056   {
2057     GNUNET_STATISTICS_update (GST_stats,
2058                               gettext_noop
2059                               ("# bandwidth quota violations by other peers"),
2060                               1, GNUNET_NO);
2061     *do_forward = GNUNET_NO;
2062     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
2063   }
2064   *do_forward = GNUNET_YES;
2065   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
2066   if (ret.rel_value > 0)
2067   {
2068     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2069                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
2070                 (unsigned long long) n->in_tracker.
2071                 consumption_since_last_update__,
2072                 (unsigned int) n->in_tracker.available_bytes_per_s__,
2073                 (unsigned long long) ret.rel_value);
2074     GNUNET_STATISTICS_update (GST_stats,
2075                               gettext_noop ("# ms throttling suggested"),
2076                               (int64_t) ret.rel_value, GNUNET_NO);
2077   }
2078   return ret;
2079 }
2080
2081
2082 /**
2083  * Keep the connection to the given neighbour alive longer,
2084  * we received a KEEPALIVE (or equivalent).
2085  *
2086  * @param neighbour neighbour to keep alive
2087  */
2088 void
2089 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
2090 {
2091   struct NeighbourMapEntry *n;
2092
2093   // This can happen during shutdown
2094   if (neighbours == NULL)
2095   {
2096     return;
2097   }
2098
2099   n = lookup_neighbour (neighbour);
2100   if (NULL == n)
2101   {
2102     GNUNET_STATISTICS_update (GST_stats,
2103                               gettext_noop
2104                               ("# KEEPALIVE messages discarded (not connected)"),
2105                               1, GNUNET_NO);
2106     return;
2107   }
2108   GNUNET_SCHEDULER_cancel (n->timeout_task);
2109   n->timeout_task =
2110       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2111                                     &neighbour_timeout_task, n);
2112
2113   /* send reply to measure latency */
2114   if (S_CONNECTED != n->state)
2115     return;
2116
2117   struct GNUNET_MessageHeader m;
2118
2119   m.size = htons (sizeof (struct GNUNET_MessageHeader));
2120   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
2121
2122   send_with_session(n,
2123       (const void *) &m, sizeof (m),
2124       UINT32_MAX,
2125       GNUNET_TIME_UNIT_FOREVER_REL,
2126       NULL, NULL);
2127 }
2128
2129 /**
2130  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
2131  * to this peer
2132  *
2133  * @param neighbour neighbour to keep alive
2134  * @param ats performance data
2135  * @param ats_count number of entries in ats
2136  */
2137 void
2138 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
2139                                    const struct GNUNET_ATS_Information *ats,
2140                                    uint32_t ats_count)
2141 {
2142   struct NeighbourMapEntry *n;
2143   struct GNUNET_ATS_Information *ats_new;
2144   uint32_t latency;
2145
2146   if (neighbours == NULL)
2147   {
2148     // This can happen during shutdown
2149     return;
2150   }
2151
2152   n = lookup_neighbour (neighbour);
2153   if ((NULL == n) || (n->state != S_CONNECTED))
2154   {
2155     GNUNET_STATISTICS_update (GST_stats,
2156                               gettext_noop
2157                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2158                               1, GNUNET_NO);
2159     return;
2160   }
2161   if (n->expect_latency_response != GNUNET_YES)
2162   {
2163     GNUNET_STATISTICS_update (GST_stats,
2164                               gettext_noop
2165                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2166                               1, GNUNET_NO);
2167     return;
2168   }
2169   n->expect_latency_response = GNUNET_NO;
2170
2171   GNUNET_assert (n->keep_alive_sent.abs_value !=
2172                  GNUNET_TIME_absolute_get_zero ().abs_value);
2173   n->latency =
2174       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2175                                            GNUNET_TIME_absolute_get ());
2176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2177               GNUNET_i2s (&n->id), n->latency.rel_value);
2178
2179   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2180   {
2181     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2182   }
2183   else
2184   {
2185     ats_new =
2186         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2187                        (ats_count + 1));
2188     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2189
2190     /* add latency */
2191     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2192     if (n->latency.rel_value > UINT32_MAX)
2193       latency = UINT32_MAX;
2194     else
2195       latency = n->latency.rel_value;
2196     ats_new[ats_count].value = htonl (latency);
2197
2198     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2199                                ats_count + 1);
2200     GNUNET_free (ats_new);
2201   }
2202 }
2203
2204
2205 /**
2206  * Change the incoming quota for the given peer.
2207  *
2208  * @param neighbour identity of peer to change qutoa for
2209  * @param quota new quota
2210  */
2211 void
2212 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2213                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2214 {
2215   struct NeighbourMapEntry *n;
2216
2217   // This can happen during shutdown
2218   if (neighbours == NULL)
2219   {
2220     return;
2221   }
2222
2223   n = lookup_neighbour (neighbour);
2224   if (n == NULL)
2225   {
2226     GNUNET_STATISTICS_update (GST_stats,
2227                               gettext_noop
2228                               ("# SET QUOTA messages ignored (no such peer)"),
2229                               1, GNUNET_NO);
2230     return;
2231   }
2232   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2233               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2234               ntohl (quota.value__), GNUNET_i2s (&n->id));
2235   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2236   if (0 != ntohl (quota.value__))
2237     return;
2238   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2239               GNUNET_i2s (&n->id), "SET_QUOTA");
2240   if (is_connected (n))
2241     GNUNET_STATISTICS_update (GST_stats,
2242                               gettext_noop ("# disconnects due to quota of 0"),
2243                               1, GNUNET_NO);
2244   disconnect_neighbour (n);
2245 }
2246
2247
2248 /**
2249  * Closure for the neighbours_iterate function.
2250  */
2251 struct IteratorContext
2252 {
2253   /**
2254    * Function to call on each connected neighbour.
2255    */
2256   GST_NeighbourIterator cb;
2257
2258   /**
2259    * Closure for 'cb'.
2260    */
2261   void *cb_cls;
2262 };
2263
2264
2265 /**
2266  * Call the callback from the closure for each connected neighbour.
2267  *
2268  * @param cls the 'struct IteratorContext'
2269  * @param key the hash of the public key of the neighbour
2270  * @param value the 'struct NeighbourMapEntry'
2271  * @return GNUNET_OK (continue to iterate)
2272  */
2273 static int
2274 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2275 {
2276   struct IteratorContext *ic = cls;
2277   struct NeighbourMapEntry *n = value;
2278
2279   if (!is_connected (n))
2280     return GNUNET_OK;
2281
2282   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2283   return GNUNET_OK;
2284 }
2285
2286
2287 /**
2288  * Iterate over all connected neighbours.
2289  *
2290  * @param cb function to call
2291  * @param cb_cls closure for cb
2292  */
2293 void
2294 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2295 {
2296   struct IteratorContext ic;
2297
2298   // This can happen during shutdown
2299   if (neighbours == NULL)
2300   {
2301     return;
2302   }
2303
2304   ic.cb = cb;
2305   ic.cb_cls = cb_cls;
2306   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2307 }
2308
2309 /**
2310  * If we have an active connection to the given target, it must be shutdown.
2311  *
2312  * @param target peer to disconnect from
2313  */
2314 void
2315 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2316 {
2317   struct NeighbourMapEntry *n;
2318
2319   // This can happen during shutdown
2320   if (neighbours == NULL)
2321   {
2322     return;
2323   }
2324
2325   n = lookup_neighbour (target);
2326   if (NULL == n)
2327     return;                     /* not active */
2328   disconnect_neighbour (n);
2329 }
2330
2331
2332 /**
2333  * We received a disconnect message from the given peer,
2334  * validate and process.
2335  *
2336  * @param peer sender of the message
2337  * @param msg the disconnect message
2338  */
2339 void
2340 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2341                                           *peer,
2342                                           const struct GNUNET_MessageHeader
2343                                           *msg)
2344 {
2345   struct NeighbourMapEntry *n;
2346   const struct SessionDisconnectMessage *sdm;
2347   GNUNET_HashCode hc;
2348
2349   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2350               "Received DISCONNECT message from peer `%s'\n",
2351               GNUNET_i2s (peer));
2352   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2353   {
2354     // GNUNET_break_op (0);
2355     GNUNET_STATISTICS_update (GST_stats,
2356                               gettext_noop
2357                               ("# disconnect messages ignored (old format)"), 1,
2358                               GNUNET_NO);
2359     return;
2360   }
2361   sdm = (const struct SessionDisconnectMessage *) msg;
2362   n = lookup_neighbour (peer);
2363   if (NULL == n)
2364     return;                     /* gone already */
2365   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2366       n->connect_ts.abs_value)
2367   {
2368     GNUNET_STATISTICS_update (GST_stats,
2369                               gettext_noop
2370                               ("# disconnect messages ignored (timestamp)"), 1,
2371                               GNUNET_NO);
2372     return;
2373   }
2374   GNUNET_CRYPTO_hash (&sdm->public_key,
2375                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2376                       &hc);
2377   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2378   {
2379     GNUNET_break_op (0);
2380     return;
2381   }
2382   if (ntohl (sdm->purpose.size) !=
2383       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2384       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2385       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2386   {
2387     GNUNET_break_op (0);
2388     return;
2389   }
2390   if (GNUNET_OK !=
2391       GNUNET_CRYPTO_rsa_verify
2392       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2393        &sdm->signature, &sdm->public_key))
2394   {
2395     GNUNET_break_op (0);
2396     return;
2397   }
2398   GST_neighbours_force_disconnect (peer);
2399 }
2400
2401
2402 /**
2403  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2404  * Consider switching to it.
2405  *
2406  * @param message possibly a 'struct SessionConnectMessage' (check format)
2407  * @param peer identity of the peer to switch the address for
2408  * @param address address of the other peer, NULL if other peer
2409  *                       connected to us
2410  * @param session session to use (or NULL)
2411  * @param ats performance data
2412  * @param ats_count number of entries in ats
2413  */
2414 void
2415 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2416                                    const struct GNUNET_PeerIdentity *peer,
2417                                    const struct GNUNET_HELLO_Address *address,
2418                                    struct Session *session,
2419                                    const struct GNUNET_ATS_Information *ats,
2420                                    uint32_t ats_count)
2421 {
2422   const struct SessionConnectMessage *scm;
2423   struct GNUNET_MessageHeader msg;
2424   struct NeighbourMapEntry *n;
2425   size_t msg_len;
2426   size_t ret;
2427
2428   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2429               "Received CONNECT_ACK message from peer `%s'\n",
2430               GNUNET_i2s (peer));
2431
2432   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2433   {
2434     GNUNET_break_op (0);
2435     return;
2436   }
2437   scm = (const struct SessionConnectMessage *) message;
2438   GNUNET_break_op (ntohl (scm->reserved) == 0);
2439   n = lookup_neighbour (peer);
2440   if (NULL == n)
2441   {
2442     /* we did not send 'CONNECT' -- at least not recently */
2443     GNUNET_STATISTICS_update (GST_stats,
2444                               gettext_noop
2445                               ("# unexpected CONNECT_ACK messages (no peer)"),
2446                               1, GNUNET_NO);
2447     return;
2448   }
2449
2450   /* Additional check
2451    *
2452    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2453    *
2454    * We also received an CONNECT message, switched from SENDT to RECV and
2455    * ATS already suggested us an address after a successful blacklist check
2456    */
2457
2458   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2459               "Received CONNECT_ACK message from peer `%s' in state `%s'\n",
2460               GNUNET_i2s (peer),
2461               print_state(n->state));
2462
2463   if ((n->address != NULL) && (n->state == S_CONNECTED))
2464   {
2465     /* After fast reconnect: send ACK (ACK) even when we are connected */
2466     msg_len = sizeof (msg);
2467     msg.size = htons (msg_len);
2468     msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2469
2470     ret = send_with_session(n,
2471               (const char *) &msg, msg_len,
2472               UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2473               NULL, NULL);
2474
2475     if (ret == GNUNET_SYSERR)
2476       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2477                   "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2478                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2479     return;
2480   }
2481
2482   if ((NULL == n->address) ||
2483        ((n->state != S_CONNECT_SENT) &&
2484       ((n->state != S_CONNECT_RECV) && (n->address != NULL))))
2485   {
2486     GNUNET_STATISTICS_update (GST_stats,
2487                               gettext_noop
2488                               ("# unexpected CONNECT_ACK messages"), 1,
2489                               GNUNET_NO);
2490     return;
2491   }
2492   if (n->state != S_CONNECTED)
2493     change_state (n, S_CONNECTED);
2494
2495   if (NULL != session)
2496   {
2497     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2498                      "transport-ats",
2499                      "Giving ATS session %p of plugin %s for peer %s\n",
2500                      session, address->transport_name, GNUNET_i2s (peer));
2501   }
2502   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2503   GNUNET_assert (NULL != n->address);
2504
2505   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2506   {
2507     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2508     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2509     n->address_state = USED;
2510   }
2511
2512   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2513
2514   /* send ACK (ACK) */
2515   msg_len = sizeof (msg);
2516   msg.size = htons (msg_len);
2517   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2518
2519   ret = send_with_session(n,
2520             (const char *) &msg, msg_len,
2521             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2522             NULL, NULL);
2523
2524   if (ret == GNUNET_SYSERR)
2525     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2526                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2527                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2528
2529
2530   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2531     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2532
2533   neighbours_connected++;
2534   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
2535                             GNUNET_NO);
2536   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2537               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2538               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2539               __LINE__);
2540   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2541   send_outbound_quota (peer, n->bandwidth_out);
2542
2543 }
2544
2545
2546 void
2547 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2548                            const struct GNUNET_PeerIdentity *peer,
2549                            const struct GNUNET_HELLO_Address *address,
2550                            struct Session *session,
2551                            const struct GNUNET_ATS_Information *ats,
2552                            uint32_t ats_count)
2553 {
2554   struct NeighbourMapEntry *n;
2555
2556   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2557               GNUNET_i2s (peer));
2558   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2559   {
2560     GNUNET_break_op (0);
2561     return;
2562   }
2563   n = lookup_neighbour (peer);
2564   if (NULL == n)
2565   {
2566     GNUNET_break (0);
2567     return;
2568   }
2569   if (S_CONNECTED == n->state)
2570     return;
2571   if (!is_connecting (n))
2572   {
2573     GNUNET_STATISTICS_update (GST_stats,
2574                               gettext_noop ("# unexpected ACK messages"), 1,
2575                               GNUNET_NO);
2576     return;
2577   }
2578   change_state (n, S_CONNECTED);
2579   if (NULL != session)
2580     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2581                      "transport-ats",
2582                      "Giving ATS session %p of plugin %s for peer %s\n",
2583                      session, address->transport_name, GNUNET_i2s (peer));
2584   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2585   GNUNET_assert (n->address != NULL);
2586
2587   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2588   {
2589     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2590     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2591     n->address_state = USED;
2592   }
2593
2594
2595   neighbours_connected++;
2596   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
2597                             GNUNET_NO);
2598
2599   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2600   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2601     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2602   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2603               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2604               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2605               __LINE__);
2606   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2607   send_outbound_quota (peer, n->bandwidth_out);
2608 }
2609
2610 struct BlackListCheckContext
2611 {
2612   struct GNUNET_ATS_Information *ats;
2613
2614   uint32_t ats_count;
2615
2616   struct Session *session;
2617
2618   struct GNUNET_HELLO_Address *address;
2619
2620   struct GNUNET_TIME_Absolute ts;
2621 };
2622
2623
2624 static void
2625 handle_connect_blacklist_cont (void *cls,
2626                                const struct GNUNET_PeerIdentity *peer,
2627                                int result)
2628 {
2629   struct NeighbourMapEntry *n;
2630   struct BlackListCheckContext *bcc = cls;
2631
2632   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2633               "Blacklist check due to CONNECT message: `%s'\n",
2634               GNUNET_i2s (peer),
2635               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2636   /* not allowed */
2637   if (GNUNET_OK != result)
2638   {
2639     GNUNET_HELLO_address_free (bcc->address);
2640     GNUNET_free (bcc);
2641     return;
2642   }
2643
2644   n = lookup_neighbour (peer);
2645   if (NULL == n)
2646     n = setup_neighbour (peer);
2647
2648   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2649   {
2650     if (NULL != bcc->session)
2651       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2652                        "transport-ats",
2653                        "Giving ATS session %p of address `%s' for peer %s\n",
2654                        bcc->session, GST_plugins_a2s (bcc->address),
2655                        GNUNET_i2s (peer));
2656     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2657
2658     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2659                                bcc->ats_count);
2660     n->connect_ts = bcc->ts;
2661   }
2662
2663   GNUNET_HELLO_address_free (bcc->address);
2664   GNUNET_free (bcc);
2665
2666   if (n->state != S_CONNECT_RECV)
2667     change_state (n, S_CONNECT_RECV);
2668
2669
2670   /* Ask ATS for an address to connect via that address */
2671   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2672     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2673   n->ats_suggest =
2674       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2675                                     n);
2676   GNUNET_ATS_suggest_address (GST_ats, peer);
2677 }
2678
2679 /**
2680  * We received a 'SESSION_CONNECT' message from the other peer.
2681  * Consider switching to it.
2682  *
2683  * @param message possibly a 'struct SessionConnectMessage' (check format)
2684  * @param peer identity of the peer to switch the address for
2685  * @param address address of the other peer, NULL if other peer
2686  *                       connected to us
2687  * @param session session to use (or NULL)
2688  * @param ats performance data
2689  * @param ats_count number of entries in ats (excluding 0-termination)
2690  */
2691 void
2692 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2693                                const struct GNUNET_PeerIdentity *peer,
2694                                const struct GNUNET_HELLO_Address *address,
2695                                struct Session *session,
2696                                const struct GNUNET_ATS_Information *ats,
2697                                uint32_t ats_count)
2698 {
2699   const struct SessionConnectMessage *scm;
2700   struct BlackListCheckContext *bcc = NULL;
2701   struct NeighbourMapEntry *n;
2702
2703   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2704               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2705   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2706   {
2707     GNUNET_break_op (0);
2708     return;
2709   }
2710
2711   scm = (const struct SessionConnectMessage *) message;
2712   GNUNET_break_op (ntohl (scm->reserved) == 0);
2713
2714   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2715
2716   n = lookup_neighbour (peer);
2717   if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n->state)))
2718   {
2719     /* connected peer switches addresses or is trying to do a fast reconnect*/
2720     return;
2721   }
2722
2723
2724   /* we are not connected to this peer */
2725   /* do blacklist check */
2726   bcc =
2727       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2728                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2729   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2730   bcc->ats_count = ats_count + 1;
2731   bcc->address = GNUNET_HELLO_address_copy (address);
2732   bcc->session = session;
2733   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2734   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2735   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2736   bcc->ats[ats_count].value =
2737       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2738   GST_blacklist_test_allowed (peer, address->transport_name,
2739                               &handle_connect_blacklist_cont, bcc);
2740 }
2741
2742
2743 /* end of file gnunet-service-transport_neighbours.c */