bdb67dbca18f6b2e06ec6aac8a3d73cfc332cfae
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
61
62 #define FAST_RECONNECT_RATE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100)
63
64 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
65
66 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
67
68 /**
69  * Entry in neighbours.
70  */
71 struct NeighbourMapEntry;
72
73 GNUNET_NETWORK_STRUCT_BEGIN
74
75 /**
76  * Message a peer sends to another to indicate its
77  * preference for communicating via a particular
78  * session (and the desire to establish a real
79  * connection).
80  */
81 struct SessionConnectMessage
82 {
83   /**
84    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
85    */
86   struct GNUNET_MessageHeader header;
87
88   /**
89    * Always zero.
90    */
91   uint32_t reserved GNUNET_PACKED;
92
93   /**
94    * Absolute time at the sender.  Only the most recent connect
95    * message implies which session is preferred by the sender.
96    */
97   struct GNUNET_TIME_AbsoluteNBO timestamp;
98
99 };
100
101
102 struct SessionDisconnectMessage
103 {
104   /**
105    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
106    */
107   struct GNUNET_MessageHeader header;
108
109   /**
110    * Always zero.
111    */
112   uint32_t reserved GNUNET_PACKED;
113
114   /**
115    * Purpose of the signature.  Extends over the timestamp.
116    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
117    */
118   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
119
120   /**
121    * Absolute time at the sender.  Only the most recent connect
122    * message implies which session is preferred by the sender.
123    */
124   struct GNUNET_TIME_AbsoluteNBO timestamp;
125
126   /**
127    * Public key of the sender.
128    */
129   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
130
131   /**
132    * Signature of the peer that sends us the disconnect.  Only
133    * valid if the timestamp is AFTER the timestamp from the
134    * corresponding 'CONNECT' message.
135    */
136   struct GNUNET_CRYPTO_RsaSignature signature;
137
138 };
139 GNUNET_NETWORK_STRUCT_END
140
141 /**
142  * For each neighbour we keep a list of messages
143  * that we still want to transmit to the neighbour.
144  */
145 struct MessageQueue
146 {
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *next;
152
153   /**
154    * This is a doubly linked list.
155    */
156   struct MessageQueue *prev;
157
158   /**
159    * Once this message is actively being transmitted, which
160    * neighbour is it associated with?
161    */
162   struct NeighbourMapEntry *n;
163
164   /**
165    * Function to call once we're done.
166    */
167   GST_NeighbourSendContinuation cont;
168
169   /**
170    * Closure for 'cont'
171    */
172   void *cont_cls;
173
174   /**
175    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
176    * stuck together in memory.  Allocated at the end of this struct.
177    */
178   const char *message_buf;
179
180   /**
181    * Size of the message buf
182    */
183   size_t message_buf_size;
184
185   /**
186    * At what time should we fail?
187    */
188   struct GNUNET_TIME_Absolute timeout;
189
190 };
191
192
193 enum State
194 {
195   /**
196    * fresh peer or completely disconnected
197    */
198   S_NOT_CONNECTED,
199
200   /**
201    * sent CONNECT message to other peer, waiting for CONNECT_ACK
202    */
203   S_CONNECT_SENT,
204
205   /**
206    * received CONNECT message to other peer, sending CONNECT_ACK
207    */
208   S_CONNECT_RECV,
209
210   /**
211    * received ACK or payload
212    */
213   S_CONNECTED,
214
215   /**
216    * connection ended, fast reconnect
217    */
218   S_FAST_RECONNECT,
219
220   /**
221    * Disconnect in progress
222    */
223   S_DISCONNECT
224 };
225
226 enum Address_State
227 {
228   USED,
229   UNUSED,
230   FRESH,
231 };
232
233
234 /**
235  * Entry in neighbours.
236  */
237 struct NeighbourMapEntry
238 {
239
240   /**
241    * Head of list of messages we would like to send to this peer;
242    * must contain at most one message per client.
243    */
244   struct MessageQueue *messages_head;
245
246   /**
247    * Tail of list of messages we would like to send to this peer; must
248    * contain at most one message per client.
249    */
250   struct MessageQueue *messages_tail;
251
252   /**
253    * Are we currently trying to send a message? If so, which one?
254    */
255   struct MessageQueue *is_active;
256
257   /**
258    * Active session for communicating with the peer.
259    */
260   struct Session *session;
261
262   /**
263    * Address we currently use.
264    */
265   struct GNUNET_HELLO_Address *address;
266
267   /**
268    * Address we currently use.
269    */
270   struct GNUNET_HELLO_Address *fast_reconnect_address;
271
272
273   /**
274    * Identity of this neighbour.
275    */
276   struct GNUNET_PeerIdentity id;
277
278   /**
279    * ID of task scheduled to run when this peer is about to
280    * time out (will free resources associated with the peer).
281    */
282   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
283
284   /**
285    * ID of task scheduled to send keepalives.
286    */
287   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
288
289   /**
290    * ID of task scheduled to run when we should try transmitting
291    * the head of the message queue.
292    */
293   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
294
295   /**
296    * Tracker for inbound bandwidth.
297    */
298   struct GNUNET_BANDWIDTH_Tracker in_tracker;
299
300   /**
301    * Inbound bandwidth from ATS, activated when connection is up
302    */
303   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
304
305   /**
306    * Inbound bandwidth from ATS, activated when connection is up
307    */
308   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
309
310   /**
311    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
312    */
313   struct GNUNET_TIME_Absolute connect_ts;
314
315   /**
316    * When did we sent the last keep-alive message?
317    */
318   struct GNUNET_TIME_Absolute keep_alive_sent;
319
320   /**
321    * Latest calculated latency value
322    */
323   struct GNUNET_TIME_Relative latency;
324
325   /**
326    * Timeout for ATS
327    * We asked ATS for a new address for this peer
328    */
329   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
330
331   /**
332    * Delay for ATS request
333    */
334   GNUNET_SCHEDULER_TaskIdentifier ats_request;
335
336   /**
337    * Task the resets the peer state after due to an pending
338    * unsuccessful connection setup
339    */
340   GNUNET_SCHEDULER_TaskIdentifier state_reset;
341
342
343   /**
344    * How often has the other peer (recently) violated the inbound
345    * traffic limit?  Incremented by 10 per violation, decremented by 1
346    * per non-violation (for each time interval).
347    */
348   unsigned int quota_violation_count;
349
350
351   /**
352    * The current state of the peer
353    * Element of enum State
354    */
355   int state;
356
357   /**
358    * Did we sent an KEEP_ALIVE message and are we expecting a response?
359    */
360   int expect_latency_response;
361
362   /**
363    * Was the address used successfully
364    */
365   int address_state;
366
367   /**
368    * Fast reconnect attempts for identical address
369    */
370   unsigned int fast_reconnect_attempts;
371 };
372
373
374 /**
375  * All known neighbours and their HELLOs.
376  */
377 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
378
379 /**
380  * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb
381  */
382 static void *callback_cls;
383
384 /**
385  * Function to call when we connected to a neighbour.
386  */
387 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
388
389 /**
390  * Function to call when we disconnected from a neighbour.
391  */
392 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
393
394 /**
395  * Function to call when we changed an active address of a neighbour.
396  */
397 static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb;
398
399 /**
400  * counter for connected neighbours
401  */
402 static int neighbours_connected;
403
404 static unsigned int bytes_in_send_queue;
405 static unsigned int bytes_received;
406
407 /**
408  * Lookup a neighbour entry in the neighbours hash map.
409  *
410  * @param pid identity of the peer to look up
411  * @return the entry, NULL if there is no existing record
412  */
413 static struct NeighbourMapEntry *
414 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
415 {
416   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
417 }
418
419 /**
420  * Disconnect from the given neighbour, clean up the record.
421  *
422  * @param n neighbour to disconnect from
423  */
424 static void
425 disconnect_neighbour (struct NeighbourMapEntry *n);
426
427 #define change_state(n, state, ...) change (n, state, __LINE__)
428
429 static int
430 is_connecting (struct NeighbourMapEntry *n)
431 {
432   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
433     return GNUNET_YES;
434   return GNUNET_NO;
435 }
436
437 static int
438 is_connected (struct NeighbourMapEntry *n)
439 {
440   if (n->state == S_CONNECTED)
441     return GNUNET_YES;
442   return GNUNET_NO;
443 }
444
445 static int
446 is_disconnecting (struct NeighbourMapEntry *n)
447 {
448   if (n->state == S_DISCONNECT)
449     return GNUNET_YES;
450   return GNUNET_NO;
451 }
452
453 static const char *
454 print_state (int state)
455 {
456   switch (state)
457   {
458   case S_CONNECTED:
459     return "S_CONNECTED";
460     break;
461   case S_CONNECT_RECV:
462     return "S_CONNECT_RECV";
463     break;
464   case S_CONNECT_SENT:
465     return "S_CONNECT_SENT";
466     break;
467   case S_DISCONNECT:
468     return "S_DISCONNECT";
469     break;
470   case S_NOT_CONNECTED:
471     return "S_NOT_CONNECTED";
472     break;
473   case S_FAST_RECONNECT:
474     return "S_FAST_RECONNECT";
475     break;
476   default:
477     GNUNET_break (0);
478     break;
479   }
480   return NULL;
481 }
482
483 static int
484 change (struct NeighbourMapEntry *n, int state, int line);
485
486 static void
487 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
488
489
490 static void
491 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
492 {
493   struct NeighbourMapEntry *n = cls;
494
495   if (n == NULL)
496     return;
497
498   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
499   if (n->state == S_CONNECTED)
500     return;
501   GNUNET_STATISTICS_update (GST_stats,
502                             gettext_noop
503                             ("# failed connection attempts due to timeout"), 1,
504                             GNUNET_NO);
505   /* resetting state */
506
507   if (n->state == S_FAST_RECONNECT)
508   {
509     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510                 "Fast reconnect time out, disconnecting peer `%s'\n",
511                 GNUNET_i2s (&n->id));
512     disconnect_neighbour(n);
513     return;
514   }
515
516   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
517               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
518               GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECTED", __LINE__);
519
520   n->state = S_NOT_CONNECTED;
521
522   /* destroying address */
523   if (n->address != NULL)
524   {
525     GNUNET_assert (strlen (n->address->transport_name) > 0);
526     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
527   }
528
529   /* request new address */
530   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
531     GNUNET_SCHEDULER_cancel (n->ats_suggest);
532   n->ats_suggest =
533       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
534                                     n);
535   GNUNET_ATS_suggest_address (GST_ats, &n->id);
536 }
537
538 static int
539 change (struct NeighbourMapEntry *n, int state, int line)
540 {
541   int previous_state;
542   /* allowed transitions */
543   int allowed = GNUNET_NO;
544
545   previous_state = n->state;
546
547   switch (n->state)
548   {
549   case S_NOT_CONNECTED:
550     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
551         (state == S_DISCONNECT))
552       allowed = GNUNET_YES;
553     break;
554   case S_CONNECT_RECV:
555     allowed = GNUNET_YES;
556     break;
557   case S_CONNECT_SENT:
558     allowed = GNUNET_YES;
559     break;
560   case S_CONNECTED:
561     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
562       allowed = GNUNET_YES;
563     break;
564   case S_DISCONNECT:
565     break;
566   case S_FAST_RECONNECT:
567     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
568       allowed = GNUNET_YES;
569     break;
570   default:
571     GNUNET_break (0);
572     break;
573   }
574   if (allowed == GNUNET_NO)
575   {
576     char *old = GNUNET_strdup (print_state (n->state));
577     char *new = GNUNET_strdup (print_state (state));
578
579     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
580                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
581                 new, line);
582     GNUNET_break (0);
583     GNUNET_free (old);
584     GNUNET_free (new);
585     return GNUNET_SYSERR;
586   }
587   {
588     char *old = GNUNET_strdup (print_state (n->state));
589     char *new = GNUNET_strdup (print_state (state));
590
591     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
592                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
593                 GNUNET_i2s (&n->id), n, old, new, line);
594     GNUNET_free (old);
595     GNUNET_free (new);
596   }
597   n->state = state;
598
599   switch (n->state)
600   {
601   case S_FAST_RECONNECT:
602   case S_CONNECT_RECV:
603   case S_CONNECT_SENT:
604     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
605       GNUNET_SCHEDULER_cancel (n->state_reset);
606     n->state_reset =
607         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
608     break;
609   case S_CONNECTED:
610   case S_NOT_CONNECTED:
611   case S_DISCONNECT:
612     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
613     {
614 #if DEBUG_TRANSPORT
615       char *old = GNUNET_strdup (print_state (n->state));
616       char *new = GNUNET_strdup (print_state (state));
617
618       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
619                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
620                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
621       GNUNET_free (old);
622       GNUNET_free (new);
623 #endif
624       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
625       GNUNET_SCHEDULER_cancel (n->state_reset);
626       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
627     }
628     break;
629
630   default:
631     GNUNET_assert (0);
632   }
633
634   if (NULL != address_change_cb)
635   {
636     if (n->state == S_CONNECTED)
637       address_change_cb (callback_cls, &n->id, n->address);
638     else if (previous_state == S_CONNECTED)
639       address_change_cb (callback_cls, &n->id, NULL);
640   }
641
642   return GNUNET_OK;
643 }
644
645 static ssize_t
646 send_with_session (struct NeighbourMapEntry *n,
647                    const char *msgbuf, size_t msgbuf_size,
648                    uint32_t priority,
649                    struct GNUNET_TIME_Relative timeout,
650                    GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
651 {
652   struct GNUNET_TRANSPORT_PluginFunctions *papi;
653   size_t ret = GNUNET_SYSERR;
654
655   GNUNET_assert (n != NULL);
656   GNUNET_assert (n->session != NULL);
657
658   papi = GST_plugins_find (n->address->transport_name);
659   if (papi == NULL)
660   {
661     if (cont != NULL)
662       cont (cont_cls, &n->id, GNUNET_SYSERR);
663     return GNUNET_SYSERR;
664   }
665
666   ret = papi->send (papi->cls,
667                    n->session,
668                    msgbuf, msgbuf_size,
669                    0,
670                    timeout,
671                    cont, cont_cls);
672
673   if ((ret == -1) && (cont != NULL))
674       cont (cont_cls, &n->id, GNUNET_SYSERR);
675   return ret;
676 }
677
678 /**
679  * Task invoked to start a transmission to another peer.
680  *
681  * @param cls the 'struct NeighbourMapEntry'
682  * @param tc scheduler context
683  */
684 static void
685 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
686
687
688 /**
689  * We're done with our transmission attempt, continue processing.
690  *
691  * @param cls the 'struct MessageQueue' of the message
692  * @param receiver intended receiver
693  * @param success whether it worked or not
694  */
695 static void
696 transmit_send_continuation (void *cls,
697                             const struct GNUNET_PeerIdentity *receiver,
698                             int success)
699 {
700   struct MessageQueue *mq = cls;
701   struct NeighbourMapEntry *n;
702   struct NeighbourMapEntry *tmp;
703
704   tmp = lookup_neighbour (receiver);
705   n = mq->n;
706   if ((NULL != n) && (tmp != NULL) && (tmp == n))
707   {
708     GNUNET_assert (n->is_active == mq);
709     n->is_active = NULL;
710     if (success == GNUNET_YES)
711     {
712       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
713       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
714     }
715   }
716
717   GNUNET_assert (bytes_in_send_queue >= mq->message_buf_size);
718   bytes_in_send_queue -= mq->message_buf_size;
719   GNUNET_STATISTICS_set (GST_stats,
720                         gettext_noop
721                         ("# bytes in message queue for other peers"),
722                         bytes_in_send_queue, GNUNET_NO);
723
724   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
725               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
726               (success == GNUNET_OK) ? "successful" : "FAILED");
727   if (NULL != mq->cont)
728     mq->cont (mq->cont_cls, success);
729   GNUNET_free (mq);
730 }
731
732
733 /**
734  * Check the ready list for the given neighbour and if a plugin is
735  * ready for transmission (and if we have a message), do so!
736  *
737  * @param n target peer for which to transmit
738  */
739 static void
740 try_transmission_to_peer (struct NeighbourMapEntry *n)
741 {
742   struct MessageQueue *mq;
743   struct GNUNET_TIME_Relative timeout;
744   ssize_t ret;
745
746   if (n->is_active != NULL)
747   {
748     GNUNET_break (0);
749     return;                     /* transmission already pending */
750   }
751   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
752   {
753     GNUNET_break (0);
754     return;                     /* currently waiting for bandwidth */
755   }
756   while (NULL != (mq = n->messages_head))
757   {
758     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
759     if (timeout.rel_value > 0)
760       break;
761     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
762     n->is_active = mq;
763     mq->n = n;
764     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
765   }
766   if (NULL == mq)
767     return;                     /* no more messages */
768
769   if (n->address == NULL)
770   {
771     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
772                 GNUNET_i2s (&n->id));
773     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
774     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
775     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
776     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
777     return;
778   }
779
780   if (GST_plugins_find (n->address->transport_name) == NULL)
781   {
782     GNUNET_break (0);
783     return;
784   }
785   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
786   n->is_active = mq;
787   mq->n = n;
788
789   if ((n->address->address_length == 0) && (n->session == NULL))
790   {
791     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
792                 GNUNET_i2s (&n->id));
793     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
794     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
795     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
796     return;
797   }
798
799   ret = send_with_session(n,
800               mq->message_buf, mq->message_buf_size,
801               0, timeout,
802               &transmit_send_continuation, mq);
803
804   if (ret == -1)
805   {
806     /* failure, but 'send' would not call continuation in this case,
807      * so we need to do it here! */
808     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
809   }
810
811 }
812
813
814 /**
815  * Task invoked to start a transmission to another peer.
816  *
817  * @param cls the 'struct NeighbourMapEntry'
818  * @param tc scheduler context
819  */
820 static void
821 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
822 {
823   struct NeighbourMapEntry *n = cls;
824
825   GNUNET_assert (NULL != lookup_neighbour (&n->id));
826   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
827   try_transmission_to_peer (n);
828 }
829
830
831 /**
832  * Initialize the neighbours subsystem.
833  *
834  * @param cls closure for callbacks
835  * @param connect_cb function to call if we connect to a peer
836  * @param disconnect_cb function to call if we disconnect from a peer
837  * @param peer_address_cb function to call if we change an active address
838  *                   of a neighbour
839  */
840 void
841 GST_neighbours_start (void *cls,
842                       GNUNET_TRANSPORT_NotifyConnect connect_cb,
843                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
844                       GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
845 {
846   callback_cls = cls;
847   connect_notify_cb = connect_cb;
848   disconnect_notify_cb = disconnect_cb;
849   address_change_cb = peer_address_cb;
850   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
851   neighbours_connected = 0;
852   bytes_in_send_queue = 0;
853   bytes_received = 0;
854 }
855
856
857 static void
858 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
859                       int result)
860 {
861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862               "Sending DISCONNECT message to peer `%4s': %i\n",
863               GNUNET_i2s (target), result);
864 }
865
866
867 static int
868 send_disconnect (struct NeighbourMapEntry * n)
869 {
870   size_t ret;
871   struct SessionDisconnectMessage disconnect_msg;
872
873   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874               "Sending DISCONNECT message to peer `%4s'\n",
875               GNUNET_i2s (&n->id));
876   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
877   disconnect_msg.header.type =
878       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
879   disconnect_msg.reserved = htonl (0);
880   disconnect_msg.purpose.size =
881       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
882              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
883              sizeof (struct GNUNET_TIME_AbsoluteNBO));
884   disconnect_msg.purpose.purpose =
885       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
886   disconnect_msg.timestamp =
887       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
888   disconnect_msg.public_key = GST_my_public_key;
889   GNUNET_assert (GNUNET_OK ==
890                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
891                                          &disconnect_msg.purpose,
892                                          &disconnect_msg.signature));
893
894   ret = send_with_session (n,
895             (const char *) &disconnect_msg, sizeof (disconnect_msg),
896             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
897             &send_disconnect_cont, NULL);
898
899   if (ret == GNUNET_SYSERR)
900     return GNUNET_SYSERR;
901
902   GNUNET_STATISTICS_update (GST_stats,
903                             gettext_noop
904                             ("# peers disconnected due to external request"), 1,
905                             GNUNET_NO);
906   return GNUNET_OK;
907 }
908
909
910 /**
911  * Disconnect from the given neighbour, clean up the record.
912  *
913  * @param n neighbour to disconnect from
914  */
915 static void
916 disconnect_neighbour (struct NeighbourMapEntry *n)
917 {
918   struct MessageQueue *mq;
919   int previous_state;
920
921   previous_state = n->state;
922
923   if (is_disconnecting (n))
924     return;
925
926   /* send DISCONNECT MESSAGE */
927   if (previous_state == S_CONNECTED)
928   {
929     if (GNUNET_OK == send_disconnect (n))
930       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
931                   GNUNET_i2s (&n->id));
932     else
933       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934                   "Could not send DISCONNECT_MSG to `%s'\n",
935                   GNUNET_i2s (&n->id));
936   }
937
938   change_state (n, S_DISCONNECT);
939
940   if (previous_state == S_CONNECTED)
941   {
942     GNUNET_assert (NULL != n->address);
943     if (n->address_state == USED)
944     {
945       GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__);
946       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
947       n->address_state = UNUSED;
948     }
949   }
950
951   if (n->address != NULL)
952   {
953     struct GNUNET_TRANSPORT_PluginFunctions *papi;
954
955     papi = GST_plugins_find (n->address->transport_name);
956     if (papi != NULL)
957       papi->disconnect (papi->cls, &n->id);
958   }
959   while (NULL != (mq = n->messages_head))
960   {
961     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
962     if (NULL != mq->cont)
963       mq->cont (mq->cont_cls, GNUNET_SYSERR);
964     GNUNET_free (mq);
965   }
966   if (NULL != n->is_active)
967   {
968     n->is_active->n = NULL;
969     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
970                 "Failing transmission of active message due to disconnect\n");
971     if (NULL != n->is_active->cont)
972       n->is_active->cont (n->is_active->cont_cls, GNUNET_SYSERR);
973     GNUNET_free (n->is_active);
974     n->is_active = NULL;
975   }
976
977   switch (previous_state)
978   {
979   case S_CONNECTED:
980     GNUNET_assert (neighbours_connected > 0);
981     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
982     GNUNET_SCHEDULER_cancel (n->keepalive_task);
983     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
984     n->expect_latency_response = GNUNET_NO;
985     neighbours_connected--;
986     GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
987                               GNUNET_NO);
988     disconnect_notify_cb (callback_cls, &n->id);
989     break;
990   case S_FAST_RECONNECT:
991     GNUNET_STATISTICS_update (GST_stats,
992                               gettext_noop ("# fast reconnects failed"), 1,
993                               GNUNET_NO);
994     disconnect_notify_cb (callback_cls, &n->id);
995     break;
996   default:
997     break;
998   }
999
1000   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
1001
1002   GNUNET_assert (GNUNET_YES ==
1003                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
1004                                                        &n->id.hashPubKey, n));
1005   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
1006   {
1007     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1008     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1009   }
1010   if (n->ats_request != GNUNET_SCHEDULER_NO_TASK)
1011   {
1012     GNUNET_SCHEDULER_cancel (n->ats_request);
1013     n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1014   }
1015
1016   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
1017   {
1018     GNUNET_SCHEDULER_cancel (n->timeout_task);
1019     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1020   }
1021   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
1022   {
1023     GNUNET_SCHEDULER_cancel (n->transmission_task);
1024     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
1025   }
1026   if (NULL != n->fast_reconnect_address)
1027   {
1028     GNUNET_HELLO_address_free (n->fast_reconnect_address);
1029     n->fast_reconnect_address = NULL;
1030   }
1031   if (NULL != n->address)
1032   {
1033     GNUNET_HELLO_address_free (n->address);
1034     n->address = NULL;
1035   }
1036   n->session = NULL;
1037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
1038               GNUNET_i2s (&n->id), n);
1039   GNUNET_free (n);
1040 }
1041
1042
1043 /**
1044  * Peer has been idle for too long. Disconnect.
1045  *
1046  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1047  * @param tc scheduler context
1048  */
1049 static void
1050 neighbour_timeout_task (void *cls,
1051                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1052 {
1053   struct NeighbourMapEntry *n = cls;
1054
1055   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer`%4s' disconnected due to timeout\n",
1057               GNUNET_i2s (&n->id));
1058
1059   GNUNET_STATISTICS_update (GST_stats,
1060                             gettext_noop
1061                             ("# peers disconnected due to timeout"), 1,
1062                             GNUNET_NO);
1063   disconnect_neighbour (n);
1064 }
1065
1066
1067 /**
1068  * Send another keepalive message.
1069  *
1070  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1071  * @param tc scheduler context
1072  */
1073 static void
1074 neighbour_keepalive_task (void *cls,
1075                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1076 {
1077   struct NeighbourMapEntry *n = cls;
1078   struct GNUNET_MessageHeader m;
1079   int ret;
1080
1081   GNUNET_assert (S_CONNECTED == n->state);
1082   n->keepalive_task =
1083       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1084                                     &neighbour_keepalive_task, n);
1085
1086   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1087                             GNUNET_NO);
1088   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1089   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1090
1091   ret = send_with_session (n,
1092             (const void *) &m, sizeof (m),
1093             UINT32_MAX /* priority */ ,
1094             GNUNET_TIME_UNIT_FOREVER_REL,
1095             NULL, NULL);
1096
1097   n->expect_latency_response = GNUNET_NO;
1098   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1099   if (ret != GNUNET_SYSERR)
1100   {
1101     n->expect_latency_response = GNUNET_YES;
1102     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1103   }
1104
1105 }
1106
1107
1108 /**
1109  * Disconnect from the given neighbour.
1110  *
1111  * @param cls unused
1112  * @param key hash of neighbour's public key (not used)
1113  * @param value the 'struct NeighbourMapEntry' of the neighbour
1114  */
1115 static int
1116 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1117 {
1118   struct NeighbourMapEntry *n = value;
1119
1120   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1121               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1122   if (S_CONNECTED == n->state)
1123     GNUNET_STATISTICS_update (GST_stats,
1124                               gettext_noop
1125                               ("# peers disconnected due to global disconnect"),
1126                               1, GNUNET_NO);
1127   disconnect_neighbour (n);
1128   return GNUNET_OK;
1129 }
1130
1131
1132 static void
1133 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1134 {
1135   struct NeighbourMapEntry *n = cls;
1136
1137   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1138
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "ATS did not suggested address to connect to peer `%s'\n",
1141               GNUNET_i2s (&n->id));
1142
1143   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# ATS address suggestions failed"), 1,
1144                             GNUNET_NO);
1145
1146
1147   disconnect_neighbour (n);
1148 }
1149
1150 /**
1151  * Cleanup the neighbours subsystem.
1152  */
1153 void
1154 GST_neighbours_stop ()
1155 {
1156   // This can happen during shutdown
1157   if (neighbours == NULL)
1158   {
1159     return;
1160   }
1161
1162   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1163                                          NULL);
1164   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1165 //  GNUNET_assert (neighbours_connected == 0);
1166   neighbours = NULL;
1167   callback_cls = NULL;
1168   connect_notify_cb = NULL;
1169   disconnect_notify_cb = NULL;
1170   address_change_cb = NULL;
1171 }
1172
1173 struct ContinutionContext
1174 {
1175   struct GNUNET_HELLO_Address *address;
1176
1177   struct Session *session;
1178 };
1179
1180 static void
1181 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1182                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1183 {
1184   struct QuotaSetMessage q_msg;
1185
1186   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1188               ntohl (quota.value__), GNUNET_i2s (target));
1189   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1190   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1191   q_msg.quota = quota;
1192   q_msg.peer = (*target);
1193   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1194 }
1195
1196 /**
1197  * We tried to send a SESSION_CONNECT message to another peer.  If this
1198  * succeeded, we change the state.  If it failed, we should tell
1199  * ATS to not use this address anymore (until it is re-validated).
1200  *
1201  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1202  * @param target peer to send the message to
1203  * @param success GNUNET_OK on success
1204  */
1205 static void
1206 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1207                            int success)
1208 {
1209   struct ContinutionContext *cc = cls;
1210   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1211
1212   if (GNUNET_YES != success)
1213   {
1214     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1215     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1216   }
1217   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1218   {
1219     GNUNET_HELLO_address_free (cc->address);
1220     GNUNET_free (cc);
1221     return;
1222   }
1223
1224   if ((GNUNET_YES == success) &&
1225       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1226   {
1227     change_state (n, S_CONNECT_SENT);
1228     GNUNET_HELLO_address_free (cc->address);
1229     GNUNET_free (cc);
1230     return;
1231   }
1232
1233   if ((GNUNET_NO == success) &&
1234       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1235   {
1236     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1237                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1238                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1239     change_state (n, S_NOT_CONNECTED);
1240     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1241       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1242     n->ats_suggest =
1243         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1244                                       n);
1245     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1246   }
1247   GNUNET_HELLO_address_free (cc->address);
1248   GNUNET_free (cc);
1249 }
1250
1251
1252 static void
1253 ats_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1254 {
1255   struct NeighbourMapEntry *n = cls;
1256   n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1257
1258   n->ats_suggest =
1259     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1260                                   n);
1261 }
1262
1263
1264 /**
1265  * We tried to switch addresses with an peer already connected. If it failed,
1266  * we should tell ATS to not use this address anymore (until it is re-validated).
1267  *
1268  * @param cls the 'struct NeighbourMapEntry'
1269  * @param target peer to send the message to
1270  * @param success GNUNET_OK on success
1271  */
1272 static void
1273 send_switch_address_continuation (void *cls,
1274                                   const struct GNUNET_PeerIdentity *target,
1275                                   int success)
1276 {
1277   struct ContinutionContext *cc = cls;
1278   struct NeighbourMapEntry *n;
1279
1280   if (neighbours == NULL)
1281   {
1282     GNUNET_HELLO_address_free (cc->address);
1283     GNUNET_free (cc);
1284     return;                     /* neighbour is going away */
1285   }
1286
1287   n = lookup_neighbour (&cc->address->peer);
1288   if ((n == NULL) || (is_disconnecting (n)))
1289   {
1290     GNUNET_HELLO_address_free (cc->address);
1291     GNUNET_free (cc);
1292     return;                     /* neighbour is going away */
1293   }
1294
1295   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1296   if (GNUNET_YES != success)
1297   {
1298
1299     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300                 "Failed to switch connected peer `%s' in state %s to address '%s' session %X, asking ATS for new address \n",
1301                 GNUNET_i2s (&n->id), print_state(n->state), GST_plugins_a2s (cc->address), cc->session);
1302
1303     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1304     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1305
1306     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1307     {
1308       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1309       n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1310     }
1311
1312     if (n->state == S_FAST_RECONNECT)
1313     {
1314       if (GNUNET_SCHEDULER_NO_TASK == n->ats_request)
1315       {
1316         /* Throtteled ATS request for fast reconnect */
1317         struct GNUNET_TIME_Relative delay = GNUNET_TIME_relative_multiply(FAST_RECONNECT_RATE, n->fast_reconnect_attempts);
1318         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319                     "Fast reconnect attempt %u failed, delay ATS request for %llu ms\n", n->fast_reconnect_attempts, delay.rel_value);
1320         n->ats_request =
1321             GNUNET_SCHEDULER_add_delayed (delay,
1322                                           ats_request,
1323                                           n);
1324       }
1325     }
1326     else
1327     {
1328       /* Immediate ATS request for connected peers */
1329       n->ats_suggest =
1330         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1331                                       n);
1332       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1333     }
1334     GNUNET_HELLO_address_free (cc->address);
1335     GNUNET_free (cc);
1336     return;
1337   }
1338   /* Tell ATS that switching addresses was successful */
1339   switch (n->state)
1340   {
1341   case S_CONNECTED:
1342     if (n->address_state == FRESH)
1343     {
1344       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1345       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1346       GNUNET_break (cc->session == n->session);
1347       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1348       n->address_state = USED;
1349     }
1350     break;
1351   case S_FAST_RECONNECT:
1352     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1353                 "Successful fast reconnect to peer `%s'\n",
1354                 GNUNET_i2s (&n->id));
1355     change_state (n, S_CONNECTED);
1356     neighbours_connected++;
1357     GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1358                               GNUNET_NO);
1359
1360     if (n->address_state == FRESH)
1361     {
1362       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1363       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1364       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1365       n->address_state = USED;
1366     }
1367
1368     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1369       n->keepalive_task =
1370           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1371
1372     /* Updating quotas */
1373     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1374     send_outbound_quota (target, n->bandwidth_out);
1375
1376   default:
1377     break;
1378   }
1379   GNUNET_HELLO_address_free (cc->address);
1380   GNUNET_free (cc);
1381 }
1382
1383
1384 /**
1385  * We tried to send a SESSION_CONNECT message to another peer.  If this
1386  * succeeded, we change the state.  If it failed, we should tell
1387  * ATS to not use this address anymore (until it is re-validated).
1388  *
1389  * @param cls the 'struct NeighbourMapEntry'
1390  * @param target peer to send the message to
1391  * @param success GNUNET_OK on success
1392  */
1393 static void
1394 send_connect_ack_continuation (void *cls,
1395                                const struct GNUNET_PeerIdentity *target,
1396                                int success)
1397 {
1398   struct ContinutionContext *cc = cls;
1399   struct NeighbourMapEntry *n;
1400
1401   if (neighbours == NULL)
1402   {
1403     GNUNET_HELLO_address_free (cc->address);
1404     GNUNET_free (cc);
1405     return;                     /* neighbour is going away */
1406   }
1407
1408   n = lookup_neighbour (&cc->address->peer);
1409   if ((n == NULL) || (is_disconnecting (n)))
1410   {
1411     GNUNET_HELLO_address_free (cc->address);
1412     GNUNET_free (cc);
1413     return;                     /* neighbour is going away */
1414   }
1415
1416   if (GNUNET_YES == success)
1417   {
1418     GNUNET_HELLO_address_free (cc->address);
1419     GNUNET_free (cc);
1420     return;                     /* sending successful */
1421   }
1422
1423   /* sending failed, ask for next address  */
1424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1426               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1427   if (n->state != S_NOT_CONNECTED)
1428     change_state (n, S_NOT_CONNECTED);
1429   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1430   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1431
1432   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1433     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1434   n->ats_suggest =
1435       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1436                                     n);
1437   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1438   GNUNET_HELLO_address_free (cc->address);
1439   GNUNET_free (cc);
1440 }
1441
1442
1443 /**
1444  * For an existing neighbour record, set the active connection to
1445  * use the given address.
1446  *
1447  * @param peer identity of the peer to switch the address for
1448  * @param address address of the other peer, NULL if other peer
1449  *                       connected to us
1450  * @param session session to use (or NULL)
1451  * @param ats performance data
1452  * @param ats_count number of entries in ats
1453  * @param bandwidth_in inbound quota to be used when connection is up
1454  * @param bandwidth_out outbound quota to be used when connection is up
1455  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1456  *         connection is not up (yet)
1457  */
1458 int
1459 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
1460                                        const struct GNUNET_HELLO_Address
1461                                        *address,
1462                                        struct Session *session,
1463                                        const struct GNUNET_ATS_Information *ats,
1464                                        uint32_t ats_count,
1465                                        struct GNUNET_BANDWIDTH_Value32NBO
1466                                        bandwidth_in,
1467                                        struct GNUNET_BANDWIDTH_Value32NBO
1468                                        bandwidth_out)
1469 {
1470   struct NeighbourMapEntry *n;
1471   struct SessionConnectMessage connect_msg;
1472   struct ContinutionContext *cc;
1473   size_t msg_len;
1474   size_t ret;
1475
1476   if (neighbours == NULL)
1477   {
1478     /* This can happen during shutdown */
1479     return GNUNET_NO;
1480   }
1481   n = lookup_neighbour (peer);
1482   if (NULL == n)
1483     return GNUNET_NO;
1484   if (n->state == S_DISCONNECT)
1485   {
1486     /* We are disconnecting, nothing to do here */
1487     return GNUNET_NO;
1488   }
1489   GNUNET_assert (address->transport_name != NULL);
1490   if ((session == NULL) && (0 == address->address_length))
1491   {
1492     GNUNET_break_op (0);
1493     /* FIXME: is this actually possible? When does this happen? */
1494     if (strlen (address->transport_name) > 0)
1495       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1496     GNUNET_ATS_suggest_address (GST_ats, peer);
1497     return GNUNET_NO;
1498   }
1499
1500   /* checks successful and neighbour != NULL */
1501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1502               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1503               (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
1504               session,
1505               GNUNET_i2s (peer),
1506               print_state (n->state));
1507
1508   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1509   {
1510     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1511     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1512   }
1513
1514
1515   if (n->state == S_FAST_RECONNECT)
1516   {
1517     /* Throtteled fast reconnect */
1518
1519     if (NULL == n->fast_reconnect_address)
1520     {
1521       n->fast_reconnect_address = GNUNET_HELLO_address_copy (address);
1522
1523     }
1524     else if (0 == GNUNET_HELLO_address_cmp(address, n->fast_reconnect_address))
1525     {
1526       n->fast_reconnect_attempts ++;
1527
1528       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1529                 "FAST RECONNECT to peer `%s' and address '%s' with identical address attempt %u\n",
1530                 GNUNET_i2s (&n->id), GST_plugins_a2s (address), n->fast_reconnect_attempts);
1531     }
1532   }
1533   else
1534   {
1535     n->fast_reconnect_attempts = 0;
1536   }
1537
1538   /* do not switch addresses just update quotas */
1539   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1540       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1541       (n->session == session))
1542   {
1543     n->bandwidth_in = bandwidth_in;
1544     n->bandwidth_out = bandwidth_out;
1545     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1546     send_outbound_quota (peer, n->bandwidth_out);
1547     return GNUNET_NO;
1548   }
1549   if (n->state == S_CONNECTED)
1550   {
1551     /* mark old address as no longer used */
1552     GNUNET_assert (NULL != n->address);
1553     if (n->address_state == USED)
1554     {
1555       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1556       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1557       n->address_state = UNUSED;
1558     }
1559   }
1560
1561   /* set new address */
1562   if (NULL != n->address)
1563     GNUNET_HELLO_address_free (n->address);
1564   n->address = GNUNET_HELLO_address_copy (address);
1565   n->address_state = FRESH;
1566   n->bandwidth_in = bandwidth_in;
1567   n->bandwidth_out = bandwidth_out;
1568   GNUNET_SCHEDULER_cancel (n->timeout_task);
1569   n->timeout_task =
1570       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1571                                     &neighbour_timeout_task, n);
1572
1573   if (NULL != address_change_cb && n->state == S_CONNECTED)
1574     address_change_cb (callback_cls, &n->id, n->address); 
1575
1576   /* Obtain an session for this address from plugin */
1577   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1578   papi = GST_plugins_find (address->transport_name);
1579
1580   if (papi == NULL)
1581   {
1582     /* we don't have the plugin for this address */
1583     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1584
1585     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1586       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1587     n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1588                                       ats_suggest_cancel,
1589                                       n);
1590     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1591     GNUNET_HELLO_address_free (n->address);
1592     n->address = NULL;
1593     n->session = NULL;
1594     return GNUNET_NO;
1595   }
1596
1597   if (session == NULL)
1598   {
1599     n->session = papi->get_session (papi->cls, address);
1600     /* Session could not be initiated */
1601     if (n->session == NULL)
1602     {
1603       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604                   "Failed to obtain new session %p for peer `%s' and  address '%s'\n",
1605                   n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1606
1607       GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1608
1609       if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1610         GNUNET_SCHEDULER_cancel (n->ats_suggest);
1611       n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1612                                         ats_suggest_cancel,
1613                                         n);
1614       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1615       GNUNET_HELLO_address_free (n->address);
1616       n->address = NULL;
1617       n->session = NULL;
1618       return GNUNET_NO;
1619     }
1620
1621     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1622                 "Obtained new session %p for peer `%s' and  address '%s'\n",
1623                  n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1624     /* Telling ATS about new session */
1625     GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0);
1626   }
1627   else
1628   {
1629     n->session = session;
1630     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1631                 "Using existing session %p for peer `%s' and  address '%s'\n",
1632                 n->session,
1633                 GNUNET_i2s (&n->id),
1634                 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>");
1635   }
1636
1637   switch (n->state)
1638   {
1639   case S_NOT_CONNECTED:
1640   case S_CONNECT_SENT:
1641     msg_len = sizeof (struct SessionConnectMessage);
1642     connect_msg.header.size = htons (msg_len);
1643     connect_msg.header.type =
1644         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1645     connect_msg.reserved = htonl (0);
1646     connect_msg.timestamp =
1647         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1648
1649     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1650     cc->session = n->session;
1651     cc->address = GNUNET_HELLO_address_copy (address);
1652     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1653                 "Sending CONNECT message to %s\n",
1654                 GNUNET_i2s (&n->id));
1655     if (n->state != S_CONNECT_RECV)
1656       change_state (n, S_CONNECT_RECV);
1657     ret = send_with_session (n,
1658       (const char *) &connect_msg, msg_len,
1659       UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1660       &send_connect_continuation, cc);
1661
1662     return GNUNET_NO;
1663   case S_CONNECT_RECV:
1664     /* We received a CONNECT message and asked ATS for an address */
1665     msg_len = sizeof (struct SessionConnectMessage);
1666     connect_msg.header.size = htons (msg_len);
1667     connect_msg.header.type =
1668         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1669     connect_msg.reserved = htonl (0);
1670     connect_msg.timestamp =
1671         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1672     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1673     cc->session = n->session;
1674     cc->address = GNUNET_HELLO_address_copy (address);
1675
1676     ret = send_with_session(n,
1677                             (const void *) &connect_msg, msg_len,
1678                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1679                             &send_connect_ack_continuation,
1680                             cc);
1681     return GNUNET_NO;
1682   case S_CONNECTED:
1683   case S_FAST_RECONNECT:
1684     /* connected peer is switching addresses or tries fast reconnect */
1685     msg_len = sizeof (struct SessionConnectMessage);
1686     connect_msg.header.size = htons (msg_len);
1687     connect_msg.header.type =
1688         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1689     connect_msg.reserved = htonl (0);
1690     connect_msg.timestamp =
1691         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1692     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1693     cc->session = n->session;
1694     cc->address = GNUNET_HELLO_address_copy (address);
1695     ret = send_with_session(n,
1696                             (const void *) &connect_msg, msg_len,
1697                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1698                             &send_switch_address_continuation, cc);
1699     if (ret == GNUNET_SYSERR)
1700     {
1701       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1702                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1703                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1704     }
1705     return GNUNET_NO;
1706   default:
1707     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1708                 "Invalid connection state to switch addresses %u \n", n->state);
1709     GNUNET_break_op (0);
1710     return GNUNET_NO;
1711   }
1712 }
1713
1714
1715 /**
1716  * Obtain current latency information for the given neighbour.
1717  *
1718  * @param peer
1719  * @return observed latency of the address, FOREVER if the address was
1720  *         never successfully validated
1721  */
1722 struct GNUNET_TIME_Relative
1723 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1724 {
1725   struct NeighbourMapEntry *n;
1726
1727   n = lookup_neighbour (peer);
1728   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1729     return GNUNET_TIME_UNIT_FOREVER_REL;
1730
1731   return n->latency;
1732 }
1733
1734 /**
1735  * Obtain current address information for the given neighbour.
1736  *
1737  * @param peer
1738  * @return address currently used
1739  */
1740 struct GNUNET_HELLO_Address *
1741 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1742 {
1743   struct NeighbourMapEntry *n;
1744
1745   n = lookup_neighbour (peer);
1746   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1747     return NULL;
1748
1749   return n->address;
1750 }
1751
1752
1753
1754 /**
1755  * Create an entry in the neighbour map for the given peer
1756  *
1757  * @param peer peer to create an entry for
1758  * @return new neighbour map entry
1759  */
1760 static struct NeighbourMapEntry *
1761 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1762 {
1763   struct NeighbourMapEntry *n;
1764
1765   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1766               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1767   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1768   n->id = *peer;
1769   n->state = S_NOT_CONNECTED;
1770   n->latency = GNUNET_TIME_relative_get_forever ();
1771   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1772                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1773                                  MAX_BANDWIDTH_CARRY_S);
1774   n->timeout_task =
1775       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1776                                     &neighbour_timeout_task, n);
1777   GNUNET_assert (GNUNET_OK ==
1778                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1779                                                     &n->id.hashPubKey, n,
1780                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1781   return n;
1782 }
1783
1784
1785 /**
1786  * Try to create a connection to the given target (eventually).
1787  *
1788  * @param target peer to try to connect to
1789  */
1790 void
1791 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1792 {
1793   struct NeighbourMapEntry *n;
1794
1795   // This can happen during shutdown
1796   if (neighbours == NULL)
1797   {
1798     return;
1799   }
1800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1801               GNUNET_i2s (target));
1802   if (0 ==
1803       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1804   {
1805     /* my own hello */
1806     return;
1807   }
1808   n = lookup_neighbour (target);
1809
1810   if (NULL != n)
1811   {
1812     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1813       return;                   /* already connecting or connected */
1814     if (is_disconnecting (n))
1815       change_state (n, S_NOT_CONNECTED);
1816   }
1817
1818
1819   if (n == NULL)
1820     n = setup_neighbour (target);
1821   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1822               "Asking ATS for suggested address to connect to peer `%s'\n",
1823               GNUNET_i2s (&n->id));
1824   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1825 }
1826
1827 /**
1828  * Test if we're connected to the given peer.
1829  *
1830  * @param target peer to test
1831  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1832  */
1833 int
1834 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1835 {
1836   struct NeighbourMapEntry *n;
1837
1838   // This can happen during shutdown
1839   if (neighbours == NULL)
1840   {
1841     return GNUNET_NO;
1842   }
1843
1844   n = lookup_neighbour (target);
1845
1846   if ((NULL == n) || (S_CONNECTED != n->state))
1847     return GNUNET_NO;           /* not connected */
1848   return GNUNET_YES;
1849 }
1850
1851 /**
1852  * A session was terminated. Take note.
1853  *
1854  * @param peer identity of the peer where the session died
1855  * @param session session that is gone
1856  */
1857 void
1858 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1859                                    struct Session *session)
1860 {
1861   struct NeighbourMapEntry *n;
1862
1863   if (neighbours == NULL)
1864   {
1865     /* This can happen during shutdown */
1866     return;
1867   }
1868   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1869               session, GNUNET_i2s (peer));
1870   n = lookup_neighbour (peer);
1871   if (NULL == n)
1872     return;
1873   if (session != n->session)
1874     return;                     /* doesn't affect us */
1875   if (n->state == S_CONNECTED)
1876   {
1877     if (n->address_state == USED)
1878     {
1879       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1880       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1881       n->address_state = UNUSED;
1882
1883     }
1884   }
1885
1886   if (NULL != n->address)
1887   {
1888     GNUNET_HELLO_address_free (n->address);
1889     n->address = NULL;
1890   }
1891   n->session = NULL;
1892
1893   /* not connected anymore anyway, shouldn't matter */
1894   if (S_CONNECTED != n->state)
1895     return;
1896
1897   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1898   {
1899     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1900     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1901     n->expect_latency_response = GNUNET_NO;
1902   }
1903
1904   /* connected, try fast reconnect */
1905   /* statistics "transport" : "# peers connected" -= 1
1906    * neighbours_connected -= 1
1907    * BUT: no disconnect_cb to notify clients about disconnect
1908    */
1909
1910   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1911               GNUNET_i2s (peer));
1912
1913   GNUNET_assert (neighbours_connected > 0);
1914   change_state (n, S_FAST_RECONNECT);
1915   neighbours_connected--;
1916   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1917                             GNUNET_NO);
1918
1919
1920   /* We are connected, so ask ATS to switch addresses */
1921   GNUNET_SCHEDULER_cancel (n->timeout_task);
1922   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1923                                     &neighbour_timeout_task, n);
1924   /* try QUICKLY to re-establish a connection, reduce timeout! */
1925   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1926     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1927   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1928                                     &ats_suggest_cancel,
1929                                     n);
1930   GNUNET_ATS_suggest_address (GST_ats, peer);
1931 }
1932
1933
1934 /**
1935  * Transmit a message to the given target using the active connection.
1936  *
1937  * @param target destination
1938  * @param msg message to send
1939  * @param msg_size number of bytes in msg
1940  * @param timeout when to fail with timeout
1941  * @param cont function to call when done
1942  * @param cont_cls closure for 'cont'
1943  */
1944 void
1945 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1946                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1947                      GST_NeighbourSendContinuation cont, void *cont_cls)
1948 {
1949   struct NeighbourMapEntry *n;
1950   struct MessageQueue *mq;
1951
1952   // This can happen during shutdown
1953   if (neighbours == NULL)
1954   {
1955     return;
1956   }
1957
1958   n = lookup_neighbour (target);
1959   if ((n == NULL) || (!is_connected (n)))
1960   {
1961     GNUNET_STATISTICS_update (GST_stats,
1962                               gettext_noop
1963                               ("# messages not sent (no such peer or not connected)"),
1964                               1, GNUNET_NO);
1965     if (n == NULL)
1966       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967                   "Could not send message to peer `%s': unknown neighbour",
1968                   GNUNET_i2s (target));
1969     else if (!is_connected (n))
1970       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1971                   "Could not send message to peer `%s': not connected\n",
1972                   GNUNET_i2s (target));
1973     if (NULL != cont)
1974       cont (cont_cls, GNUNET_SYSERR);
1975     return;
1976   }
1977
1978   if ((n->session == NULL) && (n->address == NULL))
1979   {
1980     GNUNET_STATISTICS_update (GST_stats,
1981                               gettext_noop
1982                               ("# messages not sent (no such peer or not connected)"),
1983                               1, GNUNET_NO);
1984     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1985                 "Could not send message to peer `%s': no address available\n",
1986                 GNUNET_i2s (target));
1987     if (NULL != cont)
1988       cont (cont_cls, GNUNET_SYSERR);
1989     return;
1990   }
1991
1992   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1993   bytes_in_send_queue += msg_size;
1994   GNUNET_STATISTICS_set (GST_stats,
1995                         gettext_noop
1996                         ("# bytes in message queue for other peers"),
1997                         bytes_in_send_queue, GNUNET_NO);
1998   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1999   mq->cont = cont;
2000   mq->cont_cls = cont_cls;
2001   /* FIXME: this memcpy can be up to 7% of our total runtime! */
2002   memcpy (&mq[1], msg, msg_size);
2003   mq->message_buf = (const char *) &mq[1];
2004   mq->message_buf_size = msg_size;
2005   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
2006   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
2007
2008   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
2009       (NULL == n->is_active))
2010     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
2011 }
2012
2013
2014 /**
2015  * We have received a message from the given sender.  How long should
2016  * we delay before receiving more?  (Also used to keep the peer marked
2017  * as live).
2018  *
2019  * @param sender sender of the message
2020  * @param size size of the message
2021  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
2022  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
2023  *                   GNUNET_SYSERR if the connection is not fully up yet
2024  * @return how long to wait before reading more from this sender
2025  */
2026 struct GNUNET_TIME_Relative
2027 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
2028                                         *sender, ssize_t size, int *do_forward)
2029 {
2030   struct NeighbourMapEntry *n;
2031   struct GNUNET_TIME_Relative ret;
2032
2033   // This can happen during shutdown
2034   if (neighbours == NULL)
2035   {
2036     return GNUNET_TIME_UNIT_FOREVER_REL;
2037   }
2038
2039   n = lookup_neighbour (sender);
2040   if (n == NULL)
2041   {
2042     GST_neighbours_try_connect (sender);
2043     n = lookup_neighbour (sender);
2044     if (NULL == n)
2045     {
2046       GNUNET_STATISTICS_update (GST_stats,
2047                                 gettext_noop
2048                                 ("# messages discarded due to lack of neighbour record"),
2049                                 1, GNUNET_NO);
2050       *do_forward = GNUNET_NO;
2051       return GNUNET_TIME_UNIT_ZERO;
2052     }
2053   }
2054   if (!is_connected (n))
2055   {
2056     *do_forward = GNUNET_SYSERR;
2057     return GNUNET_TIME_UNIT_ZERO;
2058   }
2059   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
2060   {
2061     n->quota_violation_count++;
2062     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2063                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
2064                 n->in_tracker.available_bytes_per_s__,
2065                 n->quota_violation_count);
2066     /* Discount 32k per violation */
2067     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
2068   }
2069   else
2070   {
2071     if (n->quota_violation_count > 0)
2072     {
2073       /* try to add 32k back */
2074       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
2075       n->quota_violation_count--;
2076     }
2077   }
2078   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2079   {
2080     GNUNET_STATISTICS_update (GST_stats,
2081                               gettext_noop
2082                               ("# bandwidth quota violations by other peers"),
2083                               1, GNUNET_NO);
2084     *do_forward = GNUNET_NO;
2085     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
2086   }
2087   *do_forward = GNUNET_YES;
2088   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
2089   if (ret.rel_value > 0)
2090   {
2091     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2092                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
2093                 (unsigned long long) n->in_tracker.
2094                 consumption_since_last_update__,
2095                 (unsigned int) n->in_tracker.available_bytes_per_s__,
2096                 (unsigned long long) ret.rel_value);
2097     GNUNET_STATISTICS_update (GST_stats,
2098                               gettext_noop ("# ms throttling suggested"),
2099                               (int64_t) ret.rel_value, GNUNET_NO);
2100   }
2101   return ret;
2102 }
2103
2104
2105 /**
2106  * Keep the connection to the given neighbour alive longer,
2107  * we received a KEEPALIVE (or equivalent).
2108  *
2109  * @param neighbour neighbour to keep alive
2110  */
2111 void
2112 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
2113 {
2114   struct NeighbourMapEntry *n;
2115
2116   // This can happen during shutdown
2117   if (neighbours == NULL)
2118   {
2119     return;
2120   }
2121
2122   n = lookup_neighbour (neighbour);
2123   if (NULL == n)
2124   {
2125     GNUNET_STATISTICS_update (GST_stats,
2126                               gettext_noop
2127                               ("# KEEPALIVE messages discarded (not connected)"),
2128                               1, GNUNET_NO);
2129     return;
2130   }
2131   GNUNET_SCHEDULER_cancel (n->timeout_task);
2132   n->timeout_task =
2133       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2134                                     &neighbour_timeout_task, n);
2135
2136   /* send reply to measure latency */
2137   if (S_CONNECTED != n->state)
2138     return;
2139
2140   struct GNUNET_MessageHeader m;
2141
2142   m.size = htons (sizeof (struct GNUNET_MessageHeader));
2143   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
2144
2145   send_with_session(n,
2146       (const void *) &m, sizeof (m),
2147       UINT32_MAX,
2148       GNUNET_TIME_UNIT_FOREVER_REL,
2149       NULL, NULL);
2150 }
2151
2152 /**
2153  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
2154  * to this peer
2155  *
2156  * @param neighbour neighbour to keep alive
2157  * @param ats performance data
2158  * @param ats_count number of entries in ats
2159  */
2160 void
2161 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
2162                                    const struct GNUNET_ATS_Information *ats,
2163                                    uint32_t ats_count)
2164 {
2165   struct NeighbourMapEntry *n;
2166   struct GNUNET_ATS_Information *ats_new;
2167   uint32_t latency;
2168
2169   if (neighbours == NULL)
2170   {
2171     // This can happen during shutdown
2172     return;
2173   }
2174
2175   n = lookup_neighbour (neighbour);
2176   if ((NULL == n) || (n->state != S_CONNECTED))
2177   {
2178     GNUNET_STATISTICS_update (GST_stats,
2179                               gettext_noop
2180                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2181                               1, GNUNET_NO);
2182     return;
2183   }
2184   if (n->expect_latency_response != GNUNET_YES)
2185   {
2186     GNUNET_STATISTICS_update (GST_stats,
2187                               gettext_noop
2188                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2189                               1, GNUNET_NO);
2190     return;
2191   }
2192   n->expect_latency_response = GNUNET_NO;
2193
2194   GNUNET_assert (n->keep_alive_sent.abs_value !=
2195                  GNUNET_TIME_absolute_get_zero ().abs_value);
2196   n->latency =
2197       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2198                                            GNUNET_TIME_absolute_get ());
2199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2200               GNUNET_i2s (&n->id), n->latency.rel_value);
2201
2202   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2203   {
2204     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2205   }
2206   else
2207   {
2208     ats_new =
2209         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2210                        (ats_count + 1));
2211     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2212
2213     /* add latency */
2214     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2215     if (n->latency.rel_value > UINT32_MAX)
2216       latency = UINT32_MAX;
2217     else
2218       latency = n->latency.rel_value;
2219     ats_new[ats_count].value = htonl (latency);
2220
2221     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2222                                ats_count + 1);
2223     GNUNET_free (ats_new);
2224   }
2225 }
2226
2227
2228 /**
2229  * Change the incoming quota for the given peer.
2230  *
2231  * @param neighbour identity of peer to change qutoa for
2232  * @param quota new quota
2233  */
2234 void
2235 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2236                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2237 {
2238   struct NeighbourMapEntry *n;
2239
2240   // This can happen during shutdown
2241   if (neighbours == NULL)
2242   {
2243     return;
2244   }
2245
2246   n = lookup_neighbour (neighbour);
2247   if (n == NULL)
2248   {
2249     GNUNET_STATISTICS_update (GST_stats,
2250                               gettext_noop
2251                               ("# SET QUOTA messages ignored (no such peer)"),
2252                               1, GNUNET_NO);
2253     return;
2254   }
2255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2256               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2257               ntohl (quota.value__), GNUNET_i2s (&n->id));
2258   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2259   if (0 != ntohl (quota.value__))
2260     return;
2261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2262               GNUNET_i2s (&n->id), "SET_QUOTA");
2263   if (is_connected (n))
2264     GNUNET_STATISTICS_update (GST_stats,
2265                               gettext_noop ("# disconnects due to quota of 0"),
2266                               1, GNUNET_NO);
2267   disconnect_neighbour (n);
2268 }
2269
2270
2271 /**
2272  * Closure for the neighbours_iterate function.
2273  */
2274 struct IteratorContext
2275 {
2276   /**
2277    * Function to call on each connected neighbour.
2278    */
2279   GST_NeighbourIterator cb;
2280
2281   /**
2282    * Closure for 'cb'.
2283    */
2284   void *cb_cls;
2285 };
2286
2287
2288 /**
2289  * Call the callback from the closure for each connected neighbour.
2290  *
2291  * @param cls the 'struct IteratorContext'
2292  * @param key the hash of the public key of the neighbour
2293  * @param value the 'struct NeighbourMapEntry'
2294  * @return GNUNET_OK (continue to iterate)
2295  */
2296 static int
2297 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2298 {
2299   struct IteratorContext *ic = cls;
2300   struct NeighbourMapEntry *n = value;
2301
2302   if (!is_connected (n))
2303     return GNUNET_OK;
2304
2305   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2306   return GNUNET_OK;
2307 }
2308
2309
2310 /**
2311  * Iterate over all connected neighbours.
2312  *
2313  * @param cb function to call
2314  * @param cb_cls closure for cb
2315  */
2316 void
2317 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2318 {
2319   struct IteratorContext ic;
2320
2321   // This can happen during shutdown
2322   if (neighbours == NULL)
2323   {
2324     return;
2325   }
2326
2327   ic.cb = cb;
2328   ic.cb_cls = cb_cls;
2329   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2330 }
2331
2332 /**
2333  * If we have an active connection to the given target, it must be shutdown.
2334  *
2335  * @param target peer to disconnect from
2336  */
2337 void
2338 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2339 {
2340   struct NeighbourMapEntry *n;
2341
2342   // This can happen during shutdown
2343   if (neighbours == NULL)
2344   {
2345     return;
2346   }
2347
2348   n = lookup_neighbour (target);
2349   if (NULL == n)
2350     return;                     /* not active */
2351   disconnect_neighbour (n);
2352 }
2353
2354
2355 /**
2356  * We received a disconnect message from the given peer,
2357  * validate and process.
2358  *
2359  * @param peer sender of the message
2360  * @param msg the disconnect message
2361  */
2362 void
2363 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2364                                           *peer,
2365                                           const struct GNUNET_MessageHeader
2366                                           *msg)
2367 {
2368   struct NeighbourMapEntry *n;
2369   const struct SessionDisconnectMessage *sdm;
2370   GNUNET_HashCode hc;
2371
2372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2373               "Received DISCONNECT message from peer `%s'\n",
2374               GNUNET_i2s (peer));
2375   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2376   {
2377     // GNUNET_break_op (0);
2378     GNUNET_STATISTICS_update (GST_stats,
2379                               gettext_noop
2380                               ("# disconnect messages ignored (old format)"), 1,
2381                               GNUNET_NO);
2382     return;
2383   }
2384   sdm = (const struct SessionDisconnectMessage *) msg;
2385   n = lookup_neighbour (peer);
2386   if (NULL == n)
2387     return;                     /* gone already */
2388   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2389       n->connect_ts.abs_value)
2390   {
2391     GNUNET_STATISTICS_update (GST_stats,
2392                               gettext_noop
2393                               ("# disconnect messages ignored (timestamp)"), 1,
2394                               GNUNET_NO);
2395     return;
2396   }
2397   GNUNET_CRYPTO_hash (&sdm->public_key,
2398                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2399                       &hc);
2400   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2401   {
2402     GNUNET_break_op (0);
2403     return;
2404   }
2405   if (ntohl (sdm->purpose.size) !=
2406       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2407       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2408       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2409   {
2410     GNUNET_break_op (0);
2411     return;
2412   }
2413   if (GNUNET_OK !=
2414       GNUNET_CRYPTO_rsa_verify
2415       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2416        &sdm->signature, &sdm->public_key))
2417   {
2418     GNUNET_break_op (0);
2419     return;
2420   }
2421   GST_neighbours_force_disconnect (peer);
2422 }
2423
2424
2425 /**
2426  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2427  * Consider switching to it.
2428  *
2429  * @param message possibly a 'struct SessionConnectMessage' (check format)
2430  * @param peer identity of the peer to switch the address for
2431  * @param address address of the other peer, NULL if other peer
2432  *                       connected to us
2433  * @param session session to use (or NULL)
2434  * @param ats performance data
2435  * @param ats_count number of entries in ats
2436  */
2437 void
2438 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2439                                    const struct GNUNET_PeerIdentity *peer,
2440                                    const struct GNUNET_HELLO_Address *address,
2441                                    struct Session *session,
2442                                    const struct GNUNET_ATS_Information *ats,
2443                                    uint32_t ats_count)
2444 {
2445   const struct SessionConnectMessage *scm;
2446   struct GNUNET_MessageHeader msg;
2447   struct NeighbourMapEntry *n;
2448   size_t msg_len;
2449   size_t ret;
2450
2451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2452               "Received CONNECT_ACK message from peer `%s'\n",
2453               GNUNET_i2s (peer));
2454
2455   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2456   {
2457     GNUNET_break_op (0);
2458     return;
2459   }
2460   scm = (const struct SessionConnectMessage *) message;
2461   GNUNET_break_op (ntohl (scm->reserved) == 0);
2462   n = lookup_neighbour (peer);
2463   if (NULL == n)
2464   {
2465     /* we did not send 'CONNECT' -- at least not recently */
2466     GNUNET_STATISTICS_update (GST_stats,
2467                               gettext_noop
2468                               ("# unexpected CONNECT_ACK messages (no peer)"),
2469                               1, GNUNET_NO);
2470     return;
2471   }
2472
2473   /* Additional check
2474    *
2475    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2476    *
2477    * We also received an CONNECT message, switched from SENDT to RECV and
2478    * ATS already suggested us an address after a successful blacklist check
2479    */
2480
2481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2482               "Received CONNECT_ACK message from peer `%s' in state `%s'\n",
2483               GNUNET_i2s (peer),
2484               print_state(n->state));
2485
2486   if ((n->address != NULL) && (n->state == S_CONNECTED))
2487   {
2488     /* After fast reconnect: send ACK (ACK) even when we are connected */
2489     msg_len = sizeof (msg);
2490     msg.size = htons (msg_len);
2491     msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2492
2493     ret = send_with_session(n,
2494               (const char *) &msg, msg_len,
2495               UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2496               NULL, NULL);
2497
2498     if (ret == GNUNET_SYSERR)
2499       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2500                   "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2501                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2502     return;
2503   }
2504
2505   if ((NULL == n->address) ||
2506        ((n->state != S_CONNECT_SENT) &&
2507       ((n->state != S_CONNECT_RECV) && (n->address != NULL))))
2508   {
2509     GNUNET_STATISTICS_update (GST_stats,
2510                               gettext_noop
2511                               ("# unexpected CONNECT_ACK messages"), 1,
2512                               GNUNET_NO);
2513     return;
2514   }
2515   if (n->state != S_CONNECTED)
2516     change_state (n, S_CONNECTED);
2517
2518   if (NULL != session)
2519   {
2520     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2521                      "transport-ats",
2522                      "Giving ATS session %p of plugin %s for peer %s\n",
2523                      session, address->transport_name, GNUNET_i2s (peer));
2524   }
2525   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2526   GNUNET_assert (NULL != n->address);
2527
2528   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2529   {
2530     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2531     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2532     n->address_state = USED;
2533   }
2534
2535   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2536
2537   /* send ACK (ACK) */
2538   msg_len = sizeof (msg);
2539   msg.size = htons (msg_len);
2540   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2541
2542   ret = send_with_session(n,
2543             (const char *) &msg, msg_len,
2544             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2545             NULL, NULL);
2546
2547   if (ret == GNUNET_SYSERR)
2548     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2549                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2550                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2551
2552
2553   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2554     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2555
2556   neighbours_connected++;
2557   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
2558                             GNUNET_NO);
2559   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2560               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2561               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2562               __LINE__);
2563   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2564   send_outbound_quota (peer, n->bandwidth_out);
2565
2566 }
2567
2568
2569 void
2570 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2571                            const struct GNUNET_PeerIdentity *peer,
2572                            const struct GNUNET_HELLO_Address *address,
2573                            struct Session *session,
2574                            const struct GNUNET_ATS_Information *ats,
2575                            uint32_t ats_count)
2576 {
2577   struct NeighbourMapEntry *n;
2578
2579   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2580               GNUNET_i2s (peer));
2581   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2582   {
2583     GNUNET_break_op (0);
2584     return;
2585   }
2586   n = lookup_neighbour (peer);
2587   if (NULL == n)
2588   {
2589     GNUNET_break (0);
2590     return;
2591   }
2592   if (S_CONNECTED == n->state)
2593     return;
2594   if (!is_connecting (n))
2595   {
2596     GNUNET_STATISTICS_update (GST_stats,
2597                               gettext_noop ("# unexpected ACK messages"), 1,
2598                               GNUNET_NO);
2599     return;
2600   }
2601   change_state (n, S_CONNECTED);
2602   if (NULL != session)
2603     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2604                      "transport-ats",
2605                      "Giving ATS session %p of plugin %s for peer %s\n",
2606                      session, address->transport_name, GNUNET_i2s (peer));
2607   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2608   GNUNET_assert (n->address != NULL);
2609
2610   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2611   {
2612     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2613     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2614     n->address_state = USED;
2615   }
2616
2617
2618   neighbours_connected++;
2619   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
2620                             GNUNET_NO);
2621
2622   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2623   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2624     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2625   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2626               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2627               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2628               __LINE__);
2629   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2630   send_outbound_quota (peer, n->bandwidth_out);
2631 }
2632
2633 struct BlackListCheckContext
2634 {
2635   struct GNUNET_ATS_Information *ats;
2636
2637   uint32_t ats_count;
2638
2639   struct Session *session;
2640
2641   struct GNUNET_HELLO_Address *address;
2642
2643   struct GNUNET_TIME_Absolute ts;
2644 };
2645
2646
2647 static void
2648 handle_connect_blacklist_cont (void *cls,
2649                                const struct GNUNET_PeerIdentity *peer,
2650                                int result)
2651 {
2652   struct NeighbourMapEntry *n;
2653   struct BlackListCheckContext *bcc = cls;
2654
2655   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2656               "Blacklist check due to CONNECT message: `%s'\n",
2657               GNUNET_i2s (peer),
2658               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2659   /* not allowed */
2660   if (GNUNET_OK != result)
2661   {
2662     GNUNET_HELLO_address_free (bcc->address);
2663     GNUNET_free (bcc);
2664     return;
2665   }
2666
2667   n = lookup_neighbour (peer);
2668   if (NULL == n)
2669     n = setup_neighbour (peer);
2670
2671   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2672   {
2673     if (NULL != bcc->session)
2674       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2675                        "transport-ats",
2676                        "Giving ATS session %p of address `%s' for peer %s\n",
2677                        bcc->session, GST_plugins_a2s (bcc->address),
2678                        GNUNET_i2s (peer));
2679     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2680
2681     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2682                                bcc->ats_count);
2683     n->connect_ts = bcc->ts;
2684   }
2685
2686   GNUNET_HELLO_address_free (bcc->address);
2687   GNUNET_free (bcc);
2688
2689   if (n->state != S_CONNECT_RECV)
2690     change_state (n, S_CONNECT_RECV);
2691
2692
2693   /* Ask ATS for an address to connect via that address */
2694   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2695     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2696   n->ats_suggest =
2697       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2698                                     n);
2699   GNUNET_ATS_suggest_address (GST_ats, peer);
2700 }
2701
2702 /**
2703  * We received a 'SESSION_CONNECT' message from the other peer.
2704  * Consider switching to it.
2705  *
2706  * @param message possibly a 'struct SessionConnectMessage' (check format)
2707  * @param peer identity of the peer to switch the address for
2708  * @param address address of the other peer, NULL if other peer
2709  *                       connected to us
2710  * @param session session to use (or NULL)
2711  * @param ats performance data
2712  * @param ats_count number of entries in ats (excluding 0-termination)
2713  */
2714 void
2715 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2716                                const struct GNUNET_PeerIdentity *peer,
2717                                const struct GNUNET_HELLO_Address *address,
2718                                struct Session *session,
2719                                const struct GNUNET_ATS_Information *ats,
2720                                uint32_t ats_count)
2721 {
2722   const struct SessionConnectMessage *scm;
2723   struct BlackListCheckContext *bcc = NULL;
2724   struct NeighbourMapEntry *n;
2725
2726   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2727               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2728   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2729   {
2730     GNUNET_break_op (0);
2731     return;
2732   }
2733
2734   scm = (const struct SessionConnectMessage *) message;
2735   GNUNET_break_op (ntohl (scm->reserved) == 0);
2736
2737   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2738
2739   n = lookup_neighbour (peer);
2740   if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n->state)))
2741   {
2742     /* connected peer switches addresses or is trying to do a fast reconnect*/
2743     return;
2744   }
2745
2746
2747   /* we are not connected to this peer */
2748   /* do blacklist check */
2749   bcc =
2750       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2751                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2752   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2753   bcc->ats_count = ats_count + 1;
2754   bcc->address = GNUNET_HELLO_address_copy (address);
2755   bcc->session = session;
2756   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2757   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2758   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2759   bcc->ats[ats_count].value =
2760       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2761   GST_blacklist_test_allowed (peer, address->transport_name,
2762                               &handle_connect_blacklist_cont, bcc);
2763 }
2764
2765
2766 /* end of file gnunet-service-transport_neighbours.c */