-fix
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
61
62 #define FAST_RECONNECT_RATE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100)
63
64 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
65
66 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
67
68 /**
69  * Entry in neighbours.
70  */
71 struct NeighbourMapEntry;
72
73 GNUNET_NETWORK_STRUCT_BEGIN
74
75 /**
76  * Message a peer sends to another to indicate its
77  * preference for communicating via a particular
78  * session (and the desire to establish a real
79  * connection).
80  */
81 struct SessionConnectMessage
82 {
83   /**
84    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
85    */
86   struct GNUNET_MessageHeader header;
87
88   /**
89    * Always zero.
90    */
91   uint32_t reserved GNUNET_PACKED;
92
93   /**
94    * Absolute time at the sender.  Only the most recent connect
95    * message implies which session is preferred by the sender.
96    */
97   struct GNUNET_TIME_AbsoluteNBO timestamp;
98
99 };
100
101
102 struct SessionDisconnectMessage
103 {
104   /**
105    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
106    */
107   struct GNUNET_MessageHeader header;
108
109   /**
110    * Always zero.
111    */
112   uint32_t reserved GNUNET_PACKED;
113
114   /**
115    * Purpose of the signature.  Extends over the timestamp.
116    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
117    */
118   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
119
120   /**
121    * Absolute time at the sender.  Only the most recent connect
122    * message implies which session is preferred by the sender.
123    */
124   struct GNUNET_TIME_AbsoluteNBO timestamp;
125
126   /**
127    * Public key of the sender.
128    */
129   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
130
131   /**
132    * Signature of the peer that sends us the disconnect.  Only
133    * valid if the timestamp is AFTER the timestamp from the
134    * corresponding 'CONNECT' message.
135    */
136   struct GNUNET_CRYPTO_RsaSignature signature;
137
138 };
139 GNUNET_NETWORK_STRUCT_END
140
141 /**
142  * For each neighbour we keep a list of messages
143  * that we still want to transmit to the neighbour.
144  */
145 struct MessageQueue
146 {
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *next;
152
153   /**
154    * This is a doubly linked list.
155    */
156   struct MessageQueue *prev;
157
158   /**
159    * Once this message is actively being transmitted, which
160    * neighbour is it associated with?
161    */
162   struct NeighbourMapEntry *n;
163
164   /**
165    * Function to call once we're done.
166    */
167   GST_NeighbourSendContinuation cont;
168
169   /**
170    * Closure for 'cont'
171    */
172   void *cont_cls;
173
174   /**
175    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
176    * stuck together in memory.  Allocated at the end of this struct.
177    */
178   const char *message_buf;
179
180   /**
181    * Size of the message buf
182    */
183   size_t message_buf_size;
184
185   /**
186    * At what time should we fail?
187    */
188   struct GNUNET_TIME_Absolute timeout;
189
190 };
191
192
193 enum State
194 {
195   /**
196    * fresh peer or completely disconnected
197    */
198   S_NOT_CONNECTED,
199
200   /**
201    * sent CONNECT message to other peer, waiting for CONNECT_ACK
202    */
203   S_CONNECT_SENT,
204
205   /**
206    * received CONNECT message to other peer, sending CONNECT_ACK
207    */
208   S_CONNECT_RECV,
209
210   /**
211    * received ACK or payload
212    */
213   S_CONNECTED,
214
215   /**
216    * connection ended, fast reconnect
217    */
218   S_FAST_RECONNECT,
219
220   /**
221    * Disconnect in progress
222    */
223   S_DISCONNECT
224 };
225
226 enum Address_State
227 {
228   USED,
229   UNUSED,
230   FRESH,
231 };
232
233
234 /**
235  * Entry in neighbours.
236  */
237 struct NeighbourMapEntry
238 {
239
240   /**
241    * Head of list of messages we would like to send to this peer;
242    * must contain at most one message per client.
243    */
244   struct MessageQueue *messages_head;
245
246   /**
247    * Tail of list of messages we would like to send to this peer; must
248    * contain at most one message per client.
249    */
250   struct MessageQueue *messages_tail;
251
252   /**
253    * Are we currently trying to send a message? If so, which one?
254    */
255   struct MessageQueue *is_active;
256
257   /**
258    * Active session for communicating with the peer.
259    */
260   struct Session *session;
261
262   /**
263    * Address we currently use.
264    */
265   struct GNUNET_HELLO_Address *address;
266
267   /**
268    * Address we currently use.
269    */
270   struct GNUNET_HELLO_Address *fast_reconnect_address;
271
272
273   /**
274    * Identity of this neighbour.
275    */
276   struct GNUNET_PeerIdentity id;
277
278   /**
279    * ID of task scheduled to run when this peer is about to
280    * time out (will free resources associated with the peer).
281    */
282   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
283
284   /**
285    * ID of task scheduled to send keepalives.
286    */
287   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
288
289   /**
290    * ID of task scheduled to run when we should try transmitting
291    * the head of the message queue.
292    */
293   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
294
295   /**
296    * Tracker for inbound bandwidth.
297    */
298   struct GNUNET_BANDWIDTH_Tracker in_tracker;
299
300   /**
301    * Inbound bandwidth from ATS, activated when connection is up
302    */
303   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
304
305   /**
306    * Inbound bandwidth from ATS, activated when connection is up
307    */
308   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
309
310   /**
311    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
312    */
313   struct GNUNET_TIME_Absolute connect_ts;
314
315   /**
316    * When did we sent the last keep-alive message?
317    */
318   struct GNUNET_TIME_Absolute keep_alive_sent;
319
320   /**
321    * Latest calculated latency value
322    */
323   struct GNUNET_TIME_Relative latency;
324
325   /**
326    * Timeout for ATS
327    * We asked ATS for a new address for this peer
328    */
329   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
330
331   /**
332    * Delay for ATS request
333    */
334   GNUNET_SCHEDULER_TaskIdentifier ats_request;
335
336   /**
337    * Task the resets the peer state after due to an pending
338    * unsuccessful connection setup
339    */
340   GNUNET_SCHEDULER_TaskIdentifier state_reset;
341
342
343   /**
344    * How often has the other peer (recently) violated the inbound
345    * traffic limit?  Incremented by 10 per violation, decremented by 1
346    * per non-violation (for each time interval).
347    */
348   unsigned int quota_violation_count;
349
350
351   /**
352    * The current state of the peer
353    * Element of enum State
354    */
355   int state;
356
357   /**
358    * Did we sent an KEEP_ALIVE message and are we expecting a response?
359    */
360   int expect_latency_response;
361
362   /**
363    * Was the address used successfully
364    */
365   int address_state;
366
367   /**
368    * Fast reconnect attempts for identical address
369    */
370   unsigned int fast_reconnect_attempts;
371 };
372
373
374 /**
375  * All known neighbours and their HELLOs.
376  */
377 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
378
379 /**
380  * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb
381  */
382 static void *callback_cls;
383
384 /**
385  * Function to call when we connected to a neighbour.
386  */
387 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
388
389 /**
390  * Function to call when we disconnected from a neighbour.
391  */
392 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
393
394 /**
395  * Function to call when we changed an active address of a neighbour.
396  */
397 static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb;
398
399 /**
400  * counter for connected neighbours
401  */
402 static int neighbours_connected;
403
404 /**
405  * Lookup a neighbour entry in the neighbours hash map.
406  *
407  * @param pid identity of the peer to look up
408  * @return the entry, NULL if there is no existing record
409  */
410 static struct NeighbourMapEntry *
411 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
412 {
413   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
414 }
415
416 /**
417  * Disconnect from the given neighbour, clean up the record.
418  *
419  * @param n neighbour to disconnect from
420  */
421 static void
422 disconnect_neighbour (struct NeighbourMapEntry *n);
423
424 #define change_state(n, state, ...) change (n, state, __LINE__)
425
426 static int
427 is_connecting (struct NeighbourMapEntry *n)
428 {
429   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
430     return GNUNET_YES;
431   return GNUNET_NO;
432 }
433
434 static int
435 is_connected (struct NeighbourMapEntry *n)
436 {
437   if (n->state == S_CONNECTED)
438     return GNUNET_YES;
439   return GNUNET_NO;
440 }
441
442 static int
443 is_disconnecting (struct NeighbourMapEntry *n)
444 {
445   if (n->state == S_DISCONNECT)
446     return GNUNET_YES;
447   return GNUNET_NO;
448 }
449
450 static const char *
451 print_state (int state)
452 {
453   switch (state)
454   {
455   case S_CONNECTED:
456     return "S_CONNECTED";
457     break;
458   case S_CONNECT_RECV:
459     return "S_CONNECT_RECV";
460     break;
461   case S_CONNECT_SENT:
462     return "S_CONNECT_SENT";
463     break;
464   case S_DISCONNECT:
465     return "S_DISCONNECT";
466     break;
467   case S_NOT_CONNECTED:
468     return "S_NOT_CONNECTED";
469     break;
470   case S_FAST_RECONNECT:
471     return "S_FAST_RECONNECT";
472     break;
473   default:
474     GNUNET_break (0);
475     break;
476   }
477   return NULL;
478 }
479
480 static int
481 change (struct NeighbourMapEntry *n, int state, int line);
482
483 static void
484 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
485
486
487 static void
488 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
489 {
490   struct NeighbourMapEntry *n = cls;
491
492   if (n == NULL)
493     return;
494
495   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
496   if (n->state == S_CONNECTED)
497     return;
498
499 #if DEBUG_TRANSPORT
500   GNUNET_STATISTICS_update (GST_stats,
501                             gettext_noop
502                             ("# failed connection attempts due to timeout"), 1,
503                             GNUNET_NO);
504 #endif
505
506   /* resetting state */
507
508   if (n->state == S_FAST_RECONNECT)
509   {
510     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511                 "Fast reconnect time out, disconnecting peer `%s'\n",
512                 GNUNET_i2s (&n->id));
513     disconnect_neighbour(n);
514     return;
515   }
516
517   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
519               GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECTED", __LINE__);
520
521   n->state = S_NOT_CONNECTED;
522
523   /* destroying address */
524   if (n->address != NULL)
525   {
526     GNUNET_assert (strlen (n->address->transport_name) > 0);
527     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
528   }
529
530   /* request new address */
531   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
532     GNUNET_SCHEDULER_cancel (n->ats_suggest);
533   n->ats_suggest =
534       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
535                                     n);
536   GNUNET_ATS_suggest_address (GST_ats, &n->id);
537 }
538
539 static int
540 change (struct NeighbourMapEntry *n, int state, int line)
541 {
542   int previous_state;
543   /* allowed transitions */
544   int allowed = GNUNET_NO;
545
546   previous_state = n->state;
547
548   switch (n->state)
549   {
550   case S_NOT_CONNECTED:
551     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
552         (state == S_DISCONNECT))
553       allowed = GNUNET_YES;
554     break;
555   case S_CONNECT_RECV:
556     allowed = GNUNET_YES;
557     break;
558   case S_CONNECT_SENT:
559     allowed = GNUNET_YES;
560     break;
561   case S_CONNECTED:
562     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
563       allowed = GNUNET_YES;
564     break;
565   case S_DISCONNECT:
566     break;
567   case S_FAST_RECONNECT:
568     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
569       allowed = GNUNET_YES;
570     break;
571   default:
572     GNUNET_break (0);
573     break;
574   }
575   if (allowed == GNUNET_NO)
576   {
577     char *old = GNUNET_strdup (print_state (n->state));
578     char *new = GNUNET_strdup (print_state (state));
579
580     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
581                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
582                 new, line);
583     GNUNET_break (0);
584     GNUNET_free (old);
585     GNUNET_free (new);
586     return GNUNET_SYSERR;
587   }
588   {
589     char *old = GNUNET_strdup (print_state (n->state));
590     char *new = GNUNET_strdup (print_state (state));
591
592     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
593                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
594                 GNUNET_i2s (&n->id), n, old, new, line);
595     GNUNET_free (old);
596     GNUNET_free (new);
597   }
598   n->state = state;
599
600   switch (n->state)
601   {
602   case S_FAST_RECONNECT:
603   case S_CONNECT_RECV:
604   case S_CONNECT_SENT:
605     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
606       GNUNET_SCHEDULER_cancel (n->state_reset);
607     n->state_reset =
608         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
609     break;
610   case S_CONNECTED:
611   case S_NOT_CONNECTED:
612   case S_DISCONNECT:
613     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
614     {
615 #if DEBUG_TRANSPORT
616       char *old = GNUNET_strdup (print_state (n->state));
617       char *new = GNUNET_strdup (print_state (state));
618
619       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
620                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
621                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
622       GNUNET_free (old);
623       GNUNET_free (new);
624 #endif
625       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
626       GNUNET_SCHEDULER_cancel (n->state_reset);
627       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
628     }
629     break;
630
631   default:
632     GNUNET_assert (0);
633   }
634
635   if (NULL != address_change_cb)
636   {
637     if (n->state == S_CONNECTED)
638       address_change_cb (callback_cls, &n->id, n->address);
639     else if (previous_state == S_CONNECTED)
640       address_change_cb (callback_cls, &n->id, NULL);
641   }
642
643   return GNUNET_OK;
644 }
645
646 static ssize_t
647 send_with_session (struct NeighbourMapEntry *n,
648                    const char *msgbuf, size_t msgbuf_size,
649                    uint32_t priority,
650                    struct GNUNET_TIME_Relative timeout,
651                    GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
652 {
653   struct GNUNET_TRANSPORT_PluginFunctions *papi;
654   size_t ret = GNUNET_SYSERR;
655
656   GNUNET_assert (n != NULL);
657   GNUNET_assert (n->session != NULL);
658
659   papi = GST_plugins_find (n->address->transport_name);
660   if (papi == NULL)
661   {
662     if (cont != NULL)
663       cont (cont_cls, &n->id, GNUNET_SYSERR);
664     return GNUNET_SYSERR;
665   }
666
667   ret = papi->send (papi->cls,
668                    n->session,
669                    msgbuf, msgbuf_size,
670                    0,
671                    timeout,
672                    cont, cont_cls);
673
674   if ((ret == -1) && (cont != NULL))
675       cont (cont_cls, &n->id, GNUNET_SYSERR);
676   return ret;
677 }
678
679 /**
680  * Task invoked to start a transmission to another peer.
681  *
682  * @param cls the 'struct NeighbourMapEntry'
683  * @param tc scheduler context
684  */
685 static void
686 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
687
688
689 /**
690  * We're done with our transmission attempt, continue processing.
691  *
692  * @param cls the 'struct MessageQueue' of the message
693  * @param receiver intended receiver
694  * @param success whether it worked or not
695  */
696 static void
697 transmit_send_continuation (void *cls,
698                             const struct GNUNET_PeerIdentity *receiver,
699                             int success)
700 {
701   struct MessageQueue *mq = cls;
702   struct NeighbourMapEntry *n;
703   struct NeighbourMapEntry *tmp;
704
705   tmp = lookup_neighbour (receiver);
706   n = mq->n;
707   if ((NULL != n) && (tmp != NULL) && (tmp == n))
708   {
709     GNUNET_assert (n->is_active == mq);
710     n->is_active = NULL;
711     if (success == GNUNET_YES)
712     {
713       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
714       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
715     }
716   }
717 #if DEBUG_TRANSPORT
718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
719               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
720               (success == GNUNET_OK) ? "successful" : "FAILED");
721 #endif
722   if (NULL != mq->cont)
723     mq->cont (mq->cont_cls, success);
724   GNUNET_free (mq);
725 }
726
727
728 /**
729  * Check the ready list for the given neighbour and if a plugin is
730  * ready for transmission (and if we have a message), do so!
731  *
732  * @param n target peer for which to transmit
733  */
734 static void
735 try_transmission_to_peer (struct NeighbourMapEntry *n)
736 {
737   struct MessageQueue *mq;
738   struct GNUNET_TIME_Relative timeout;
739   ssize_t ret;
740
741   if (n->is_active != NULL)
742   {
743     GNUNET_break (0);
744     return;                     /* transmission already pending */
745   }
746   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
747   {
748     GNUNET_break (0);
749     return;                     /* currently waiting for bandwidth */
750   }
751   while (NULL != (mq = n->messages_head))
752   {
753     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
754     if (timeout.rel_value > 0)
755       break;
756     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
757     n->is_active = mq;
758     mq->n = n;
759     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
760   }
761   if (NULL == mq)
762     return;                     /* no more messages */
763
764   if (n->address == NULL)
765   {
766 #if DEBUG_TRANSPORT
767     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
768                 GNUNET_i2s (&n->id));
769 #endif
770     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
771     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
772     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
773     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
774     return;
775   }
776
777   if (GST_plugins_find (n->address->transport_name) == NULL)
778   {
779     GNUNET_break (0);
780     return;
781   }
782   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
783   n->is_active = mq;
784   mq->n = n;
785
786   if ((n->address->address_length == 0) && (n->session == NULL))
787   {
788     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
789                 GNUNET_i2s (&n->id));
790     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
791     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
792     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
793     return;
794   }
795
796   ret = send_with_session(n,
797               mq->message_buf, mq->message_buf_size,
798               0, timeout,
799               &transmit_send_continuation, mq);
800
801   if (ret == -1)
802   {
803     /* failure, but 'send' would not call continuation in this case,
804      * so we need to do it here! */
805     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
806   }
807
808 }
809
810
811 /**
812  * Task invoked to start a transmission to another peer.
813  *
814  * @param cls the 'struct NeighbourMapEntry'
815  * @param tc scheduler context
816  */
817 static void
818 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
819 {
820   struct NeighbourMapEntry *n = cls;
821
822   GNUNET_assert (NULL != lookup_neighbour (&n->id));
823   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
824   try_transmission_to_peer (n);
825 }
826
827
828 /**
829  * Initialize the neighbours subsystem.
830  *
831  * @param cls closure for callbacks
832  * @param connect_cb function to call if we connect to a peer
833  * @param disconnect_cb function to call if we disconnect from a peer
834  * @param peer_address_cb function to call if we change an active address
835  *                   of a neighbour
836  */
837 void
838 GST_neighbours_start (void *cls,
839                       GNUNET_TRANSPORT_NotifyConnect connect_cb,
840                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
841                       GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
842 {
843   callback_cls = cls;
844   connect_notify_cb = connect_cb;
845   disconnect_notify_cb = disconnect_cb;
846   address_change_cb = peer_address_cb;
847   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
848 }
849
850
851 static void
852 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
853                       int result)
854 {
855 #if DEBUG_TRANSPORT
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "Sending DISCONNECT message to peer `%4s': %i\n",
858               GNUNET_i2s (target), result);
859 #endif
860 }
861
862
863 static int
864 send_disconnect (struct NeighbourMapEntry * n)
865 {
866   size_t ret;
867   struct SessionDisconnectMessage disconnect_msg;
868
869 #if DEBUG_TRANSPORT
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871               "Sending DISCONNECT message to peer `%4s'\n",
872               GNUNET_i2s (&n->id));
873 #endif
874
875   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
876   disconnect_msg.header.type =
877       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
878   disconnect_msg.reserved = htonl (0);
879   disconnect_msg.purpose.size =
880       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
881              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
882              sizeof (struct GNUNET_TIME_AbsoluteNBO));
883   disconnect_msg.purpose.purpose =
884       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
885   disconnect_msg.timestamp =
886       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
887   disconnect_msg.public_key = GST_my_public_key;
888   GNUNET_assert (GNUNET_OK ==
889                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
890                                          &disconnect_msg.purpose,
891                                          &disconnect_msg.signature));
892
893   ret = send_with_session (n,
894             (const char *) &disconnect_msg, sizeof (disconnect_msg),
895             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
896             &send_disconnect_cont, NULL);
897
898   if (ret == GNUNET_SYSERR)
899     return GNUNET_SYSERR;
900
901   GNUNET_STATISTICS_update (GST_stats,
902                             gettext_noop
903                             ("# peers disconnected due to external request"), 1,
904                             GNUNET_NO);
905   return GNUNET_OK;
906 }
907
908
909 /**
910  * Disconnect from the given neighbour, clean up the record.
911  *
912  * @param n neighbour to disconnect from
913  */
914 static void
915 disconnect_neighbour (struct NeighbourMapEntry *n)
916 {
917   struct MessageQueue *mq;
918   int previous_state;
919
920   previous_state = n->state;
921
922   if (is_disconnecting (n))
923     return;
924
925   /* send DISCONNECT MESSAGE */
926   if (previous_state == S_CONNECTED)
927   {
928     if (GNUNET_OK == send_disconnect (n))
929       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
930                   GNUNET_i2s (&n->id));
931     else
932       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933                   "Could not send DISCONNECT_MSG to `%s'\n",
934                   GNUNET_i2s (&n->id));
935   }
936
937   change_state (n, S_DISCONNECT);
938
939   if (previous_state == S_CONNECTED)
940   {
941     GNUNET_assert (NULL != n->address);
942     if (n->address_state == USED)
943     {
944       GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__);
945       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
946       n->address_state = UNUSED;
947     }
948   }
949
950   if (n->address != NULL)
951   {
952     struct GNUNET_TRANSPORT_PluginFunctions *papi;
953
954     papi = GST_plugins_find (n->address->transport_name);
955     if (papi != NULL)
956       papi->disconnect (papi->cls, &n->id);
957   }
958   while (NULL != (mq = n->messages_head))
959   {
960     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
961     if (NULL != mq->cont)
962       mq->cont (mq->cont_cls, GNUNET_SYSERR);
963     GNUNET_free (mq);
964   }
965   if (NULL != n->is_active)
966   {
967     n->is_active->n = NULL;
968     n->is_active = NULL;
969   }
970
971   switch (previous_state)
972   {
973   case S_CONNECTED:
974     GNUNET_assert (neighbours_connected > 0);
975     neighbours_connected--;
976     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
977     GNUNET_SCHEDULER_cancel (n->keepalive_task);
978     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
979     n->expect_latency_response = GNUNET_NO;
980     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
981                               GNUNET_NO);
982     disconnect_notify_cb (callback_cls, &n->id);
983     break;
984   case S_FAST_RECONNECT:
985     GNUNET_STATISTICS_update (GST_stats,
986                               gettext_noop ("# fast reconnects failed"), 1,
987                               GNUNET_NO);
988     disconnect_notify_cb (callback_cls, &n->id);
989     break;
990   default:
991     break;
992   }
993
994   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
995
996   GNUNET_assert (GNUNET_YES ==
997                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
998                                                        &n->id.hashPubKey, n));
999   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
1000   {
1001     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1002     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1003   }
1004   if (n->ats_request != GNUNET_SCHEDULER_NO_TASK)
1005   {
1006     GNUNET_SCHEDULER_cancel (n->ats_request);
1007     n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1008   }
1009
1010   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
1011   {
1012     GNUNET_SCHEDULER_cancel (n->timeout_task);
1013     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1014   }
1015   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
1016   {
1017     GNUNET_SCHEDULER_cancel (n->transmission_task);
1018     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
1019   }
1020   if (NULL != n->fast_reconnect_address)
1021   {
1022     GNUNET_HELLO_address_free (n->fast_reconnect_address);
1023     n->fast_reconnect_address = NULL;
1024   }
1025   if (NULL != n->address)
1026   {
1027     GNUNET_HELLO_address_free (n->address);
1028     n->address = NULL;
1029   }
1030   n->session = NULL;
1031   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
1032               GNUNET_i2s (&n->id), n);
1033   GNUNET_free (n);
1034 }
1035
1036
1037 /**
1038  * Peer has been idle for too long. Disconnect.
1039  *
1040  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1041  * @param tc scheduler context
1042  */
1043 static void
1044 neighbour_timeout_task (void *cls,
1045                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1046 {
1047   struct NeighbourMapEntry *n = cls;
1048
1049   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1050   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer`%4s' disconnected due to timeout\n",
1051               GNUNET_i2s (&n->id));
1052
1053   GNUNET_STATISTICS_update (GST_stats,
1054                             gettext_noop
1055                             ("# peers disconnected due to timeout"), 1,
1056                             GNUNET_NO);
1057   disconnect_neighbour (n);
1058 }
1059
1060
1061 /**
1062  * Send another keepalive message.
1063  *
1064  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1065  * @param tc scheduler context
1066  */
1067 static void
1068 neighbour_keepalive_task (void *cls,
1069                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1070 {
1071   struct NeighbourMapEntry *n = cls;
1072   struct GNUNET_MessageHeader m;
1073   int ret;
1074
1075   GNUNET_assert (S_CONNECTED == n->state);
1076   n->keepalive_task =
1077       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1078                                     &neighbour_keepalive_task, n);
1079
1080   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1081                             GNUNET_NO);
1082   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1083   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1084
1085   ret = send_with_session (n,
1086             (const void *) &m, sizeof (m),
1087             UINT32_MAX /* priority */ ,
1088             GNUNET_TIME_UNIT_FOREVER_REL,
1089             NULL, NULL);
1090
1091   n->expect_latency_response = GNUNET_NO;
1092   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1093   if (ret != GNUNET_SYSERR)
1094   {
1095     n->expect_latency_response = GNUNET_YES;
1096     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1097   }
1098
1099 }
1100
1101
1102 /**
1103  * Disconnect from the given neighbour.
1104  *
1105  * @param cls unused
1106  * @param key hash of neighbour's public key (not used)
1107  * @param value the 'struct NeighbourMapEntry' of the neighbour
1108  */
1109 static int
1110 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1111 {
1112   struct NeighbourMapEntry *n = value;
1113
1114 #if DEBUG_TRANSPORT
1115   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1116               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1117 #endif
1118   if (S_CONNECTED == n->state)
1119     GNUNET_STATISTICS_update (GST_stats,
1120                               gettext_noop
1121                               ("# peers disconnected due to global disconnect"),
1122                               1, GNUNET_NO);
1123   disconnect_neighbour (n);
1124   return GNUNET_OK;
1125 }
1126
1127
1128 static void
1129 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1130 {
1131   struct NeighbourMapEntry *n = cls;
1132
1133   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1134
1135   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1136               "ATS did not suggested address to connect to peer `%s'\n",
1137               GNUNET_i2s (&n->id));
1138
1139   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# ATS address suggestions failed"), 1,
1140                             GNUNET_NO);
1141
1142
1143   disconnect_neighbour (n);
1144 }
1145
1146 /**
1147  * Cleanup the neighbours subsystem.
1148  */
1149 void
1150 GST_neighbours_stop ()
1151 {
1152   // This can happen during shutdown
1153   if (neighbours == NULL)
1154   {
1155     return;
1156   }
1157
1158   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1159                                          NULL);
1160   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1161 //  GNUNET_assert (neighbours_connected == 0);
1162   neighbours = NULL;
1163   callback_cls = NULL;
1164   connect_notify_cb = NULL;
1165   disconnect_notify_cb = NULL;
1166   address_change_cb = NULL;
1167 }
1168
1169 struct ContinutionContext
1170 {
1171   struct GNUNET_HELLO_Address *address;
1172
1173   struct Session *session;
1174 };
1175
1176 static void
1177 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1178                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1179 {
1180   struct QuotaSetMessage q_msg;
1181
1182 #if DEBUG_TRANSPORT
1183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1185               ntohl (quota.value__), GNUNET_i2s (target));
1186 #endif
1187   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1188   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1189   q_msg.quota = quota;
1190   q_msg.peer = (*target);
1191   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1192 }
1193
1194 /**
1195  * We tried to send a SESSION_CONNECT message to another peer.  If this
1196  * succeeded, we change the state.  If it failed, we should tell
1197  * ATS to not use this address anymore (until it is re-validated).
1198  *
1199  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1200  * @param target peer to send the message to
1201  * @param success GNUNET_OK on success
1202  */
1203 static void
1204 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1205                            int success)
1206 {
1207   struct ContinutionContext *cc = cls;
1208   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1209
1210   if (GNUNET_YES != success)
1211   {
1212     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1213     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1214   }
1215   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1216   {
1217     GNUNET_HELLO_address_free (cc->address);
1218     GNUNET_free (cc);
1219     return;
1220   }
1221
1222   if ((GNUNET_YES == success) &&
1223       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1224   {
1225     change_state (n, S_CONNECT_SENT);
1226     GNUNET_HELLO_address_free (cc->address);
1227     GNUNET_free (cc);
1228     return;
1229   }
1230
1231   if ((GNUNET_NO == success) &&
1232       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1233   {
1234 #if DEBUG_TRANSPORT
1235     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1236                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1237                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1238 #endif
1239     change_state (n, S_NOT_CONNECTED);
1240     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1241       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1242     n->ats_suggest =
1243         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1244                                       n);
1245     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1246   }
1247   GNUNET_HELLO_address_free (cc->address);
1248   GNUNET_free (cc);
1249 }
1250
1251
1252 static void
1253 ats_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1254 {
1255   struct NeighbourMapEntry *n = cls;
1256   n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1257
1258   n->ats_suggest =
1259     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1260                                   n);
1261 }
1262
1263
1264 /**
1265  * We tried to switch addresses with an peer already connected. If it failed,
1266  * we should tell ATS to not use this address anymore (until it is re-validated).
1267  *
1268  * @param cls the 'struct NeighbourMapEntry'
1269  * @param target peer to send the message to
1270  * @param success GNUNET_OK on success
1271  */
1272 static void
1273 send_switch_address_continuation (void *cls,
1274                                   const struct GNUNET_PeerIdentity *target,
1275                                   int success)
1276 {
1277   struct ContinutionContext *cc = cls;
1278   struct NeighbourMapEntry *n;
1279
1280   if (neighbours == NULL)
1281   {
1282     GNUNET_HELLO_address_free (cc->address);
1283     GNUNET_free (cc);
1284     return;                     /* neighbour is going away */
1285   }
1286
1287   n = lookup_neighbour (&cc->address->peer);
1288   if ((n == NULL) || (is_disconnecting (n)))
1289   {
1290     GNUNET_HELLO_address_free (cc->address);
1291     GNUNET_free (cc);
1292     return;                     /* neighbour is going away */
1293   }
1294
1295   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1296   if (GNUNET_YES != success)
1297   {
1298
1299     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300                 "Failed to switch connected peer `%s' in state %s to address '%s' session %X, asking ATS for new address \n",
1301                 GNUNET_i2s (&n->id), print_state(n->state), GST_plugins_a2s (cc->address), cc->session);
1302
1303     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1304     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1305
1306     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1307     {
1308       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1309       n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1310     }
1311
1312     if (n->state == S_FAST_RECONNECT)
1313     {
1314       if (GNUNET_SCHEDULER_NO_TASK == n->ats_request)
1315       {
1316         /* Throtteled ATS request for fast reconnect */
1317         struct GNUNET_TIME_Relative delay = GNUNET_TIME_relative_multiply(FAST_RECONNECT_RATE, n->fast_reconnect_attempts);
1318         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319                     "Fast reconnect attempt %u failed, delay ATS request for %llu ms\n", n->fast_reconnect_attempts, delay.rel_value);
1320         n->ats_request =
1321             GNUNET_SCHEDULER_add_delayed (delay,
1322                                           ats_request,
1323                                           n);
1324       }
1325     }
1326     else
1327     {
1328       /* Immediate ATS request for connected peers */
1329       n->ats_suggest =
1330         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1331                                       n);
1332       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1333     }
1334     GNUNET_HELLO_address_free (cc->address);
1335     GNUNET_free (cc);
1336     return;
1337   }
1338   /* Tell ATS that switching addresses was successful */
1339   switch (n->state)
1340   {
1341   case S_CONNECTED:
1342     if (n->address_state == FRESH)
1343     {
1344       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1345       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1346       if (cc->session != n->session)
1347         GNUNET_break (0);
1348       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1349       n->address_state = USED;
1350     }
1351     break;
1352   case S_FAST_RECONNECT:
1353 #if DEBUG_TRANSPORT
1354     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355                 "Successful fast reconnect to peer `%s'\n",
1356                 GNUNET_i2s (&n->id));
1357 #endif
1358     change_state (n, S_CONNECTED);
1359     neighbours_connected++;
1360     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1361                               GNUNET_NO);
1362
1363     if (n->address_state == FRESH)
1364     {
1365       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1366       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1367       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1368       n->address_state = USED;
1369     }
1370
1371     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1372       n->keepalive_task =
1373           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1374
1375     /* Updating quotas */
1376     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1377     send_outbound_quota (target, n->bandwidth_out);
1378
1379   default:
1380     break;
1381   }
1382   GNUNET_HELLO_address_free (cc->address);
1383   GNUNET_free (cc);
1384 }
1385
1386
1387 /**
1388  * We tried to send a SESSION_CONNECT message to another peer.  If this
1389  * succeeded, we change the state.  If it failed, we should tell
1390  * ATS to not use this address anymore (until it is re-validated).
1391  *
1392  * @param cls the 'struct NeighbourMapEntry'
1393  * @param target peer to send the message to
1394  * @param success GNUNET_OK on success
1395  */
1396 static void
1397 send_connect_ack_continuation (void *cls,
1398                                const struct GNUNET_PeerIdentity *target,
1399                                int success)
1400 {
1401   struct ContinutionContext *cc = cls;
1402   struct NeighbourMapEntry *n;
1403
1404   if (neighbours == NULL)
1405   {
1406     GNUNET_HELLO_address_free (cc->address);
1407     GNUNET_free (cc);
1408     return;                     /* neighbour is going away */
1409   }
1410
1411   n = lookup_neighbour (&cc->address->peer);
1412   if ((n == NULL) || (is_disconnecting (n)))
1413   {
1414     GNUNET_HELLO_address_free (cc->address);
1415     GNUNET_free (cc);
1416     return;                     /* neighbour is going away */
1417   }
1418
1419   if (GNUNET_YES == success)
1420   {
1421     GNUNET_HELLO_address_free (cc->address);
1422     GNUNET_free (cc);
1423     return;                     /* sending successful */
1424   }
1425
1426   /* sending failed, ask for next address  */
1427 #if DEBUG_TRANSPORT
1428   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1429               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1430               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1431 #endif
1432   if (n->state != S_NOT_CONNECTED)
1433     change_state (n, S_NOT_CONNECTED);
1434   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1435   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1436
1437   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1438     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1439   n->ats_suggest =
1440       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1441                                     n);
1442   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1443   GNUNET_HELLO_address_free (cc->address);
1444   GNUNET_free (cc);
1445 }
1446
1447
1448 /**
1449  * For an existing neighbour record, set the active connection to
1450  * use the given address.
1451  *
1452  * @param peer identity of the peer to switch the address for
1453  * @param address address of the other peer, NULL if other peer
1454  *                       connected to us
1455  * @param session session to use (or NULL)
1456  * @param ats performance data
1457  * @param ats_count number of entries in ats
1458  * @param bandwidth_in inbound quota to be used when connection is up
1459  * @param bandwidth_out outbound quota to be used when connection is up
1460  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1461  *         connection is not up (yet)
1462  */
1463 int
1464 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
1465                                        const struct GNUNET_HELLO_Address
1466                                        *address,
1467                                        struct Session *session,
1468                                        const struct GNUNET_ATS_Information *ats,
1469                                        uint32_t ats_count,
1470                                        struct GNUNET_BANDWIDTH_Value32NBO
1471                                        bandwidth_in,
1472                                        struct GNUNET_BANDWIDTH_Value32NBO
1473                                        bandwidth_out)
1474 {
1475   struct NeighbourMapEntry *n;
1476   struct SessionConnectMessage connect_msg;
1477   struct ContinutionContext *cc;
1478   size_t msg_len;
1479   size_t ret;
1480
1481   if (neighbours == NULL)
1482   {
1483     /* This can happen during shutdown */
1484     return GNUNET_NO;
1485   }
1486   n = lookup_neighbour (peer);
1487   if (NULL == n)
1488     return GNUNET_NO;
1489   if (n->state == S_DISCONNECT)
1490   {
1491     /* We are disconnecting, nothing to do here */
1492     return GNUNET_NO;
1493   }
1494   GNUNET_assert (address->transport_name != NULL);
1495   if ((session == NULL) && (0 == address->address_length))
1496   {
1497     GNUNET_break_op (0);
1498     /* FIXME: is this actually possible? When does this happen? */
1499     if (strlen (address->transport_name) > 0)
1500       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1501     GNUNET_ATS_suggest_address (GST_ats, peer);
1502     return GNUNET_NO;
1503   }
1504
1505   /* checks successful and neighbour != NULL */
1506   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1507               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1508               (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
1509               session,
1510               GNUNET_i2s (peer),
1511               print_state (n->state));
1512
1513   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1514   {
1515     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1516     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1517   }
1518
1519
1520   if (n->state == S_FAST_RECONNECT)
1521   {
1522     /* Throtteled fast reconnect */
1523
1524     if (NULL == n->fast_reconnect_address)
1525     {
1526       n->fast_reconnect_address = GNUNET_HELLO_address_copy (address);
1527
1528     }
1529     else if (0 == GNUNET_HELLO_address_cmp(address, n->fast_reconnect_address))
1530     {
1531       n->fast_reconnect_attempts ++;
1532
1533       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1534                 "FAST RECONNECT to peer `%s' and address '%s' with identical address attempt %u\n",
1535                 GNUNET_i2s (&n->id), GST_plugins_a2s (address), n->fast_reconnect_attempts);
1536     }
1537   }
1538   else
1539   {
1540     n->fast_reconnect_attempts = 0;
1541   }
1542
1543   /* do not switch addresses just update quotas */
1544   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1545       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1546       (n->session == session))
1547   {
1548     n->bandwidth_in = bandwidth_in;
1549     n->bandwidth_out = bandwidth_out;
1550     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1551     send_outbound_quota (peer, n->bandwidth_out);
1552     return GNUNET_NO;
1553   }
1554   if (n->state == S_CONNECTED)
1555   {
1556     /* mark old address as no longer used */
1557     GNUNET_assert (NULL != n->address);
1558     if (n->address_state == USED)
1559     {
1560       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1561       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1562       n->address_state = UNUSED;
1563     }
1564   }
1565
1566   /* set new address */
1567   if (NULL != n->address)
1568     GNUNET_HELLO_address_free (n->address);
1569   n->address = GNUNET_HELLO_address_copy (address);
1570   n->address_state = FRESH;
1571   n->bandwidth_in = bandwidth_in;
1572   n->bandwidth_out = bandwidth_out;
1573   GNUNET_SCHEDULER_cancel (n->timeout_task);
1574   n->timeout_task =
1575       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1576                                     &neighbour_timeout_task, n);
1577
1578   if (NULL != address_change_cb && n->state == S_CONNECTED)
1579     address_change_cb (callback_cls, &n->id, n->address); 
1580
1581   /* Obtain an session for this address from plugin */
1582   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1583   papi = GST_plugins_find (address->transport_name);
1584
1585   if (papi == NULL)
1586   {
1587     /* we don't have the plugin for this address */
1588     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1589
1590     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1591       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1592     n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1593                                       ats_suggest_cancel,
1594                                       n);
1595     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1596     GNUNET_HELLO_address_free (n->address);
1597     n->address = NULL;
1598     n->session = NULL;
1599     return GNUNET_NO;
1600   }
1601
1602   if (session == NULL)
1603   {
1604     n->session = papi->get_session (papi->cls, address);
1605     /* Session could not be initiated */
1606     if (n->session == NULL)
1607     {
1608       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1609                   "Failed to obtain new session %p for peer `%s' and  address '%s'\n",
1610                   n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1611
1612       GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1613
1614       if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1615         GNUNET_SCHEDULER_cancel (n->ats_suggest);
1616       n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1617                                         ats_suggest_cancel,
1618                                         n);
1619       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1620       GNUNET_HELLO_address_free (n->address);
1621       n->address = NULL;
1622       n->session = NULL;
1623       return GNUNET_NO;
1624     }
1625
1626     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1627                 "Obtained new session %p for peer `%s' and  address '%s'\n",
1628                  n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1629     /* Telling ATS about new session */
1630     GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0);
1631   }
1632   else
1633   {
1634     n->session = session;
1635     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636                 "Using existing session %p for peer `%s' and  address '%s'\n",
1637                 n->session,
1638                 GNUNET_i2s (&n->id),
1639                 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>");
1640   }
1641
1642   switch (n->state)
1643   {
1644   case S_NOT_CONNECTED:
1645   case S_CONNECT_SENT:
1646     msg_len = sizeof (struct SessionConnectMessage);
1647     connect_msg.header.size = htons (msg_len);
1648     connect_msg.header.type =
1649         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1650     connect_msg.reserved = htonl (0);
1651     connect_msg.timestamp =
1652         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1653
1654     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1655     cc->session = n->session;
1656     cc->address = GNUNET_HELLO_address_copy (address);
1657
1658     ret = send_with_session (n,
1659       (const char *) &connect_msg, msg_len,
1660       UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1661       &send_connect_continuation, cc);
1662
1663     return GNUNET_NO;
1664   case S_CONNECT_RECV:
1665     /* We received a CONNECT message and asked ATS for an address */
1666     msg_len = sizeof (struct SessionConnectMessage);
1667     connect_msg.header.size = htons (msg_len);
1668     connect_msg.header.type =
1669         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1670     connect_msg.reserved = htonl (0);
1671     connect_msg.timestamp =
1672         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1673     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1674     cc->session = n->session;
1675     cc->address = GNUNET_HELLO_address_copy (address);
1676
1677     ret = send_with_session(n,
1678                             (const void *) &connect_msg, msg_len,
1679                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1680                             &send_connect_ack_continuation,
1681                             cc);
1682     return GNUNET_NO;
1683   case S_CONNECTED:
1684   case S_FAST_RECONNECT:
1685     /* connected peer is switching addresses or tries fast reconnect */
1686     msg_len = sizeof (struct SessionConnectMessage);
1687     connect_msg.header.size = htons (msg_len);
1688     connect_msg.header.type =
1689         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1690     connect_msg.reserved = htonl (0);
1691     connect_msg.timestamp =
1692         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1693     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1694     cc->session = n->session;
1695     cc->address = GNUNET_HELLO_address_copy (address);
1696     ret = send_with_session(n,
1697                             (const void *) &connect_msg, msg_len,
1698                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1699                             &send_switch_address_continuation, cc);
1700     if (ret == GNUNET_SYSERR)
1701     {
1702       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1703                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1704                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1705     }
1706     return GNUNET_NO;
1707   default:
1708     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1709                 "Invalid connection state to switch addresses %u \n", n->state);
1710     GNUNET_break_op (0);
1711     return GNUNET_NO;
1712   }
1713 }
1714
1715
1716 /**
1717  * Obtain current latency information for the given neighbour.
1718  *
1719  * @param peer
1720  * @return observed latency of the address, FOREVER if the address was
1721  *         never successfully validated
1722  */
1723 struct GNUNET_TIME_Relative
1724 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1725 {
1726   struct NeighbourMapEntry *n;
1727
1728   n = lookup_neighbour (peer);
1729   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1730     return GNUNET_TIME_UNIT_FOREVER_REL;
1731
1732   return n->latency;
1733 }
1734
1735 /**
1736  * Obtain current address information for the given neighbour.
1737  *
1738  * @param peer
1739  * @return address currently used
1740  */
1741 struct GNUNET_HELLO_Address *
1742 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1743 {
1744   struct NeighbourMapEntry *n;
1745
1746   n = lookup_neighbour (peer);
1747   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1748     return NULL;
1749
1750   return n->address;
1751 }
1752
1753
1754
1755 /**
1756  * Create an entry in the neighbour map for the given peer
1757  *
1758  * @param peer peer to create an entry for
1759  * @return new neighbour map entry
1760  */
1761 static struct NeighbourMapEntry *
1762 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1763 {
1764   struct NeighbourMapEntry *n;
1765
1766 #if DEBUG_TRANSPORT
1767   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1768               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1769 #endif
1770   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1771   n->id = *peer;
1772   n->state = S_NOT_CONNECTED;
1773   n->latency = GNUNET_TIME_relative_get_forever ();
1774   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1775                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1776                                  MAX_BANDWIDTH_CARRY_S);
1777   n->timeout_task =
1778       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1779                                     &neighbour_timeout_task, n);
1780   GNUNET_assert (GNUNET_OK ==
1781                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1782                                                     &n->id.hashPubKey, n,
1783                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1784   return n;
1785 }
1786
1787
1788 /**
1789  * Try to create a connection to the given target (eventually).
1790  *
1791  * @param target peer to try to connect to
1792  */
1793 void
1794 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1795 {
1796   struct NeighbourMapEntry *n;
1797
1798   // This can happen during shutdown
1799   if (neighbours == NULL)
1800   {
1801     return;
1802   }
1803 #if DEBUG_TRANSPORT
1804   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1805               GNUNET_i2s (target));
1806 #endif
1807   if (0 ==
1808       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1809   {
1810     /* my own hello */
1811     return;
1812   }
1813   n = lookup_neighbour (target);
1814
1815   if (NULL != n)
1816   {
1817     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1818       return;                   /* already connecting or connected */
1819     if (is_disconnecting (n))
1820       change_state (n, S_NOT_CONNECTED);
1821   }
1822
1823
1824   if (n == NULL)
1825     n = setup_neighbour (target);
1826 #if DEBUG_TRANSPORT
1827   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1828               "Asking ATS for suggested address to connect to peer `%s'\n",
1829               GNUNET_i2s (&n->id));
1830 #endif
1831
1832   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1833 }
1834
1835 /**
1836  * Test if we're connected to the given peer.
1837  *
1838  * @param target peer to test
1839  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1840  */
1841 int
1842 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1843 {
1844   struct NeighbourMapEntry *n;
1845
1846   // This can happen during shutdown
1847   if (neighbours == NULL)
1848   {
1849     return GNUNET_NO;
1850   }
1851
1852   n = lookup_neighbour (target);
1853
1854   if ((NULL == n) || (S_CONNECTED != n->state))
1855     return GNUNET_NO;           /* not connected */
1856   return GNUNET_YES;
1857 }
1858
1859 /**
1860  * A session was terminated. Take note.
1861  *
1862  * @param peer identity of the peer where the session died
1863  * @param session session that is gone
1864  */
1865 void
1866 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1867                                    struct Session *session)
1868 {
1869   struct NeighbourMapEntry *n;
1870
1871   if (neighbours == NULL)
1872   {
1873     /* This can happen during shutdown */
1874     return;
1875   }
1876
1877 #if DEBUG_TRANSPORT
1878   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1879               session, GNUNET_i2s (peer));
1880 #endif
1881
1882   n = lookup_neighbour (peer);
1883   if (NULL == n)
1884     return;
1885   if (session != n->session)
1886     return;                     /* doesn't affect us */
1887   if (n->state == S_CONNECTED)
1888   {
1889     if (n->address_state == USED)
1890     {
1891       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1892       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1893       n->address_state = UNUSED;
1894
1895     }
1896   }
1897
1898   if (NULL != n->address)
1899   {
1900     GNUNET_HELLO_address_free (n->address);
1901     n->address = NULL;
1902   }
1903   n->session = NULL;
1904
1905   /* not connected anymore anyway, shouldn't matter */
1906   if (S_CONNECTED != n->state)
1907     return;
1908
1909   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1910   {
1911     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1912     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1913     n->expect_latency_response = GNUNET_NO;
1914   }
1915
1916   /* connected, try fast reconnect */
1917   /* statistics "transport" : "# peers connected" -= 1
1918    * neighbours_connected -= 1
1919    * BUT: no disconnect_cb to notify clients about disconnect
1920    */
1921
1922   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1923               GNUNET_i2s (peer));
1924
1925   GNUNET_assert (neighbours_connected > 0);
1926   change_state (n, S_FAST_RECONNECT);
1927   neighbours_connected--;
1928   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
1929                             GNUNET_NO);
1930
1931
1932   /* We are connected, so ask ATS to switch addresses */
1933   GNUNET_SCHEDULER_cancel (n->timeout_task);
1934   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1935                                     &neighbour_timeout_task, n);
1936   /* try QUICKLY to re-establish a connection, reduce timeout! */
1937   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1938     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1939   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1940                                     &ats_suggest_cancel,
1941                                     n);
1942   GNUNET_ATS_suggest_address (GST_ats, peer);
1943 }
1944
1945
1946 /**
1947  * Transmit a message to the given target using the active connection.
1948  *
1949  * @param target destination
1950  * @param msg message to send
1951  * @param msg_size number of bytes in msg
1952  * @param timeout when to fail with timeout
1953  * @param cont function to call when done
1954  * @param cont_cls closure for 'cont'
1955  */
1956 void
1957 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1958                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1959                      GST_NeighbourSendContinuation cont, void *cont_cls)
1960 {
1961   struct NeighbourMapEntry *n;
1962   struct MessageQueue *mq;
1963
1964   // This can happen during shutdown
1965   if (neighbours == NULL)
1966   {
1967     return;
1968   }
1969
1970   n = lookup_neighbour (target);
1971   if ((n == NULL) || (!is_connected (n)))
1972   {
1973     GNUNET_STATISTICS_update (GST_stats,
1974                               gettext_noop
1975                               ("# messages not sent (no such peer or not connected)"),
1976                               1, GNUNET_NO);
1977 #if DEBUG_TRANSPORT
1978     if (n == NULL)
1979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1980                   "Could not send message to peer `%s': unknown neighbour",
1981                   GNUNET_i2s (target));
1982     else if (!is_connected (n))
1983       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984                   "Could not send message to peer `%s': not connected\n",
1985                   GNUNET_i2s (target));
1986 #endif
1987     if (NULL != cont)
1988       cont (cont_cls, GNUNET_SYSERR);
1989     return;
1990   }
1991
1992   if ((n->session == NULL) && (n->address == NULL))
1993   {
1994     GNUNET_STATISTICS_update (GST_stats,
1995                               gettext_noop
1996                               ("# messages not sent (no such peer or not connected)"),
1997                               1, GNUNET_NO);
1998 #if DEBUG_TRANSPORT
1999     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2000                 "Could not send message to peer `%s': no address available\n",
2001                 GNUNET_i2s (target));
2002 #endif
2003
2004     if (NULL != cont)
2005       cont (cont_cls, GNUNET_SYSERR);
2006     return;
2007   }
2008
2009   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
2010   GNUNET_STATISTICS_update (GST_stats,
2011                             gettext_noop
2012                             ("# bytes in message queue for other peers"),
2013                             msg_size, GNUNET_NO);
2014   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
2015   mq->cont = cont;
2016   mq->cont_cls = cont_cls;
2017   /* FIXME: this memcpy can be up to 7% of our total runtime! */
2018   memcpy (&mq[1], msg, msg_size);
2019   mq->message_buf = (const char *) &mq[1];
2020   mq->message_buf_size = msg_size;
2021   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
2022   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
2023
2024   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
2025       (NULL == n->is_active))
2026     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
2027 }
2028
2029
2030 /**
2031  * We have received a message from the given sender.  How long should
2032  * we delay before receiving more?  (Also used to keep the peer marked
2033  * as live).
2034  *
2035  * @param sender sender of the message
2036  * @param size size of the message
2037  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
2038  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
2039  *                   GNUNET_SYSERR if the connection is not fully up yet
2040  * @return how long to wait before reading more from this sender
2041  */
2042 struct GNUNET_TIME_Relative
2043 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
2044                                         *sender, ssize_t size, int *do_forward)
2045 {
2046   struct NeighbourMapEntry *n;
2047   struct GNUNET_TIME_Relative ret;
2048
2049   // This can happen during shutdown
2050   if (neighbours == NULL)
2051   {
2052     return GNUNET_TIME_UNIT_FOREVER_REL;
2053   }
2054
2055   n = lookup_neighbour (sender);
2056   if (n == NULL)
2057   {
2058     GST_neighbours_try_connect (sender);
2059     n = lookup_neighbour (sender);
2060     if (NULL == n)
2061     {
2062       GNUNET_STATISTICS_update (GST_stats,
2063                                 gettext_noop
2064                                 ("# messages discarded due to lack of neighbour record"),
2065                                 1, GNUNET_NO);
2066       *do_forward = GNUNET_NO;
2067       return GNUNET_TIME_UNIT_ZERO;
2068     }
2069   }
2070   if (!is_connected (n))
2071   {
2072     *do_forward = GNUNET_SYSERR;
2073     return GNUNET_TIME_UNIT_ZERO;
2074   }
2075   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
2076   {
2077     n->quota_violation_count++;
2078 #if DEBUG_TRANSPORT
2079     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2080                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
2081                 n->in_tracker.available_bytes_per_s__,
2082                 n->quota_violation_count);
2083 #endif
2084     /* Discount 32k per violation */
2085     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
2086   }
2087   else
2088   {
2089     if (n->quota_violation_count > 0)
2090     {
2091       /* try to add 32k back */
2092       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
2093       n->quota_violation_count--;
2094     }
2095   }
2096   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2097   {
2098     GNUNET_STATISTICS_update (GST_stats,
2099                               gettext_noop
2100                               ("# bandwidth quota violations by other peers"),
2101                               1, GNUNET_NO);
2102     *do_forward = GNUNET_NO;
2103     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
2104   }
2105   *do_forward = GNUNET_YES;
2106   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
2107   if (ret.rel_value > 0)
2108   {
2109 #if DEBUG_TRANSPORT
2110     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2111                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
2112                 (unsigned long long) n->in_tracker.
2113                 consumption_since_last_update__,
2114                 (unsigned int) n->in_tracker.available_bytes_per_s__,
2115                 (unsigned long long) ret.rel_value);
2116 #endif
2117     GNUNET_STATISTICS_update (GST_stats,
2118                               gettext_noop ("# ms throttling suggested"),
2119                               (int64_t) ret.rel_value, GNUNET_NO);
2120   }
2121   return ret;
2122 }
2123
2124
2125 /**
2126  * Keep the connection to the given neighbour alive longer,
2127  * we received a KEEPALIVE (or equivalent).
2128  *
2129  * @param neighbour neighbour to keep alive
2130  */
2131 void
2132 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
2133 {
2134   struct NeighbourMapEntry *n;
2135
2136   // This can happen during shutdown
2137   if (neighbours == NULL)
2138   {
2139     return;
2140   }
2141
2142   n = lookup_neighbour (neighbour);
2143   if (NULL == n)
2144   {
2145     GNUNET_STATISTICS_update (GST_stats,
2146                               gettext_noop
2147                               ("# KEEPALIVE messages discarded (not connected)"),
2148                               1, GNUNET_NO);
2149     return;
2150   }
2151   GNUNET_SCHEDULER_cancel (n->timeout_task);
2152   n->timeout_task =
2153       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2154                                     &neighbour_timeout_task, n);
2155
2156   /* send reply to measure latency */
2157   if (S_CONNECTED != n->state)
2158     return;
2159
2160   struct GNUNET_MessageHeader m;
2161
2162   m.size = htons (sizeof (struct GNUNET_MessageHeader));
2163   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
2164
2165   send_with_session(n,
2166       (const void *) &m, sizeof (m),
2167       UINT32_MAX,
2168       GNUNET_TIME_UNIT_FOREVER_REL,
2169       NULL, NULL);
2170 }
2171
2172 /**
2173  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
2174  * to this peer
2175  *
2176  * @param neighbour neighbour to keep alive
2177  * @param ats performance data
2178  * @param ats_count number of entries in ats
2179  */
2180 void
2181 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
2182                                    const struct GNUNET_ATS_Information *ats,
2183                                    uint32_t ats_count)
2184 {
2185   struct NeighbourMapEntry *n;
2186   struct GNUNET_ATS_Information *ats_new;
2187   uint32_t latency;
2188
2189   if (neighbours == NULL)
2190   {
2191     // This can happen during shutdown
2192     return;
2193   }
2194
2195   n = lookup_neighbour (neighbour);
2196   if ((NULL == n) || (n->state != S_CONNECTED))
2197   {
2198     GNUNET_STATISTICS_update (GST_stats,
2199                               gettext_noop
2200                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2201                               1, GNUNET_NO);
2202     return;
2203   }
2204   if (n->expect_latency_response != GNUNET_YES)
2205   {
2206     GNUNET_STATISTICS_update (GST_stats,
2207                               gettext_noop
2208                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2209                               1, GNUNET_NO);
2210     return;
2211   }
2212   n->expect_latency_response = GNUNET_NO;
2213
2214   GNUNET_assert (n->keep_alive_sent.abs_value !=
2215                  GNUNET_TIME_absolute_get_zero ().abs_value);
2216   n->latency =
2217       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2218                                            GNUNET_TIME_absolute_get ());
2219 #if DEBUG_TRANSPORT
2220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2221               GNUNET_i2s (&n->id), n->latency.rel_value);
2222 #endif
2223
2224
2225   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2226   {
2227     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2228   }
2229   else
2230   {
2231     ats_new =
2232         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2233                        (ats_count + 1));
2234     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2235
2236     /* add latency */
2237     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2238     if (n->latency.rel_value > UINT32_MAX)
2239       latency = UINT32_MAX;
2240     else
2241       latency = n->latency.rel_value;
2242     ats_new[ats_count].value = htonl (latency);
2243
2244     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2245                                ats_count + 1);
2246     GNUNET_free (ats_new);
2247   }
2248 }
2249
2250
2251 /**
2252  * Change the incoming quota for the given peer.
2253  *
2254  * @param neighbour identity of peer to change qutoa for
2255  * @param quota new quota
2256  */
2257 void
2258 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2259                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2260 {
2261   struct NeighbourMapEntry *n;
2262
2263   // This can happen during shutdown
2264   if (neighbours == NULL)
2265   {
2266     return;
2267   }
2268
2269   n = lookup_neighbour (neighbour);
2270   if (n == NULL)
2271   {
2272     GNUNET_STATISTICS_update (GST_stats,
2273                               gettext_noop
2274                               ("# SET QUOTA messages ignored (no such peer)"),
2275                               1, GNUNET_NO);
2276     return;
2277   }
2278 #if DEBUG_TRANSPORT
2279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2280               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2281               ntohl (quota.value__), GNUNET_i2s (&n->id));
2282 #endif
2283   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2284   if (0 != ntohl (quota.value__))
2285     return;
2286 #if DEBUG_TRANSPORT
2287   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2288               GNUNET_i2s (&n->id), "SET_QUOTA");
2289 #endif
2290   if (is_connected (n))
2291     GNUNET_STATISTICS_update (GST_stats,
2292                               gettext_noop ("# disconnects due to quota of 0"),
2293                               1, GNUNET_NO);
2294   disconnect_neighbour (n);
2295 }
2296
2297
2298 /**
2299  * Closure for the neighbours_iterate function.
2300  */
2301 struct IteratorContext
2302 {
2303   /**
2304    * Function to call on each connected neighbour.
2305    */
2306   GST_NeighbourIterator cb;
2307
2308   /**
2309    * Closure for 'cb'.
2310    */
2311   void *cb_cls;
2312 };
2313
2314
2315 /**
2316  * Call the callback from the closure for each connected neighbour.
2317  *
2318  * @param cls the 'struct IteratorContext'
2319  * @param key the hash of the public key of the neighbour
2320  * @param value the 'struct NeighbourMapEntry'
2321  * @return GNUNET_OK (continue to iterate)
2322  */
2323 static int
2324 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2325 {
2326   struct IteratorContext *ic = cls;
2327   struct NeighbourMapEntry *n = value;
2328
2329   if (!is_connected (n))
2330     return GNUNET_OK;
2331
2332   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2333   return GNUNET_OK;
2334 }
2335
2336
2337 /**
2338  * Iterate over all connected neighbours.
2339  *
2340  * @param cb function to call
2341  * @param cb_cls closure for cb
2342  */
2343 void
2344 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2345 {
2346   struct IteratorContext ic;
2347
2348   // This can happen during shutdown
2349   if (neighbours == NULL)
2350   {
2351     return;
2352   }
2353
2354   ic.cb = cb;
2355   ic.cb_cls = cb_cls;
2356   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2357 }
2358
2359 /**
2360  * If we have an active connection to the given target, it must be shutdown.
2361  *
2362  * @param target peer to disconnect from
2363  */
2364 void
2365 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2366 {
2367   struct NeighbourMapEntry *n;
2368
2369   // This can happen during shutdown
2370   if (neighbours == NULL)
2371   {
2372     return;
2373   }
2374
2375   n = lookup_neighbour (target);
2376   if (NULL == n)
2377     return;                     /* not active */
2378   disconnect_neighbour (n);
2379 }
2380
2381
2382 /**
2383  * We received a disconnect message from the given peer,
2384  * validate and process.
2385  *
2386  * @param peer sender of the message
2387  * @param msg the disconnect message
2388  */
2389 void
2390 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2391                                           *peer,
2392                                           const struct GNUNET_MessageHeader
2393                                           *msg)
2394 {
2395   struct NeighbourMapEntry *n;
2396   const struct SessionDisconnectMessage *sdm;
2397   GNUNET_HashCode hc;
2398
2399 #if DEBUG_TRANSPORT
2400   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2401               "Received DISCONNECT message from peer `%s'\n",
2402               GNUNET_i2s (peer));
2403 #endif
2404
2405   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2406   {
2407     // GNUNET_break_op (0);
2408     GNUNET_STATISTICS_update (GST_stats,
2409                               gettext_noop
2410                               ("# disconnect messages ignored (old format)"), 1,
2411                               GNUNET_NO);
2412     return;
2413   }
2414   sdm = (const struct SessionDisconnectMessage *) msg;
2415   n = lookup_neighbour (peer);
2416   if (NULL == n)
2417     return;                     /* gone already */
2418   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2419       n->connect_ts.abs_value)
2420   {
2421     GNUNET_STATISTICS_update (GST_stats,
2422                               gettext_noop
2423                               ("# disconnect messages ignored (timestamp)"), 1,
2424                               GNUNET_NO);
2425     return;
2426   }
2427   GNUNET_CRYPTO_hash (&sdm->public_key,
2428                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2429                       &hc);
2430   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2431   {
2432     GNUNET_break_op (0);
2433     return;
2434   }
2435   if (ntohl (sdm->purpose.size) !=
2436       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2437       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2438       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2439   {
2440     GNUNET_break_op (0);
2441     return;
2442   }
2443   if (GNUNET_OK !=
2444       GNUNET_CRYPTO_rsa_verify
2445       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2446        &sdm->signature, &sdm->public_key))
2447   {
2448     GNUNET_break_op (0);
2449     return;
2450   }
2451   GST_neighbours_force_disconnect (peer);
2452 }
2453
2454
2455 /**
2456  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2457  * Consider switching to it.
2458  *
2459  * @param message possibly a 'struct SessionConnectMessage' (check format)
2460  * @param peer identity of the peer to switch the address for
2461  * @param address address of the other peer, NULL if other peer
2462  *                       connected to us
2463  * @param session session to use (or NULL)
2464  * @param ats performance data
2465  * @param ats_count number of entries in ats
2466  */
2467 void
2468 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2469                                    const struct GNUNET_PeerIdentity *peer,
2470                                    const struct GNUNET_HELLO_Address *address,
2471                                    struct Session *session,
2472                                    const struct GNUNET_ATS_Information *ats,
2473                                    uint32_t ats_count)
2474 {
2475   const struct SessionConnectMessage *scm;
2476   struct GNUNET_MessageHeader msg;
2477   struct NeighbourMapEntry *n;
2478   size_t msg_len;
2479   size_t ret;
2480
2481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2482               "Received CONNECT_ACK message from peer `%s'\n",
2483               GNUNET_i2s (peer));
2484
2485   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2486   {
2487     GNUNET_break_op (0);
2488     return;
2489   }
2490   scm = (const struct SessionConnectMessage *) message;
2491   GNUNET_break_op (ntohl (scm->reserved) == 0);
2492   n = lookup_neighbour (peer);
2493   if (NULL == n)
2494   {
2495     /* we did not send 'CONNECT' -- at least not recently */
2496     GNUNET_STATISTICS_update (GST_stats,
2497                               gettext_noop
2498                               ("# unexpected CONNECT_ACK messages (no peer)"),
2499                               1, GNUNET_NO);
2500     return;
2501   }
2502
2503   /* Additional check
2504    *
2505    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2506    *
2507    * We also received an CONNECT message, switched from SENDT to RECV and
2508    * ATS already suggested us an address after a successful blacklist check
2509    */
2510
2511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2512               "Received CONNECT_ACK message from peer `%s' in state `%s'\n",
2513               GNUNET_i2s (peer),
2514               print_state(n->state));
2515
2516   if ((n->address != NULL) && (n->state == S_CONNECTED))
2517   {
2518     /* After fast reconnect: send ACK (ACK) even when we are connected */
2519     msg_len = sizeof (msg);
2520     msg.size = htons (msg_len);
2521     msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2522
2523     ret = send_with_session(n,
2524               (const char *) &msg, msg_len,
2525               UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2526               NULL, NULL);
2527
2528     if (ret == GNUNET_SYSERR)
2529       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2530                   "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2531                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2532     return;
2533   }
2534
2535   if ((n->state != S_CONNECT_SENT) &&
2536       ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2537   {
2538     GNUNET_STATISTICS_update (GST_stats,
2539                               gettext_noop
2540                               ("# unexpected CONNECT_ACK messages"), 1,
2541                               GNUNET_NO);
2542     return;
2543   }
2544   if (n->state != S_CONNECTED)
2545     change_state (n, S_CONNECTED);
2546
2547   if (NULL != session)
2548   {
2549     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2550                      "transport-ats",
2551                      "Giving ATS session %p of plugin %s for peer %s\n",
2552                      session, address->transport_name, GNUNET_i2s (peer));
2553   }
2554   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2555   GNUNET_assert (NULL != n->address);
2556
2557   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2558   {
2559     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2560     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2561     n->address_state = USED;
2562   }
2563
2564   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2565
2566   /* send ACK (ACK) */
2567   msg_len = sizeof (msg);
2568   msg.size = htons (msg_len);
2569   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2570
2571   ret = send_with_session(n,
2572             (const char *) &msg, msg_len,
2573             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2574             NULL, NULL);
2575
2576   if (ret == GNUNET_SYSERR)
2577     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2578                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2579                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2580
2581
2582   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2583     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2584
2585   neighbours_connected++;
2586   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2587                             GNUNET_NO);
2588 #if DEBUG_TRANSPORT
2589   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2590               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2591               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2592               __LINE__);
2593 #endif
2594   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2595   send_outbound_quota (peer, n->bandwidth_out);
2596
2597 }
2598
2599
2600 void
2601 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2602                            const struct GNUNET_PeerIdentity *peer,
2603                            const struct GNUNET_HELLO_Address *address,
2604                            struct Session *session,
2605                            const struct GNUNET_ATS_Information *ats,
2606                            uint32_t ats_count)
2607 {
2608   struct NeighbourMapEntry *n;
2609
2610 #if DEBUG_TRANSPORT
2611   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2612               GNUNET_i2s (peer));
2613 #endif
2614
2615   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2616   {
2617     GNUNET_break_op (0);
2618     return;
2619   }
2620   n = lookup_neighbour (peer);
2621   if (NULL == n)
2622   {
2623     GNUNET_break (0);
2624     return;
2625   }
2626   if (S_CONNECTED == n->state)
2627     return;
2628   if (!is_connecting (n))
2629   {
2630     GNUNET_STATISTICS_update (GST_stats,
2631                               gettext_noop ("# unexpected ACK messages"), 1,
2632                               GNUNET_NO);
2633     return;
2634   }
2635   change_state (n, S_CONNECTED);
2636   if (NULL != session)
2637     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2638                      "transport-ats",
2639                      "Giving ATS session %p of plugin %s for peer %s\n",
2640                      session, address->transport_name, GNUNET_i2s (peer));
2641   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2642   GNUNET_assert (n->address != NULL);
2643
2644   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2645   {
2646     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2647     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2648     n->address_state = USED;
2649   }
2650
2651
2652   neighbours_connected++;
2653   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2654                             GNUNET_NO);
2655
2656   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2657   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2658     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2659 #if DEBUG_TRANSPORT
2660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2661               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2662               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2663               __LINE__);
2664 #endif
2665   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2666   send_outbound_quota (peer, n->bandwidth_out);
2667 }
2668
2669 struct BlackListCheckContext
2670 {
2671   struct GNUNET_ATS_Information *ats;
2672
2673   uint32_t ats_count;
2674
2675   struct Session *session;
2676
2677   struct GNUNET_HELLO_Address *address;
2678
2679   struct GNUNET_TIME_Absolute ts;
2680 };
2681
2682
2683 static void
2684 handle_connect_blacklist_cont (void *cls,
2685                                const struct GNUNET_PeerIdentity *peer,
2686                                int result)
2687 {
2688   struct NeighbourMapEntry *n;
2689   struct BlackListCheckContext *bcc = cls;
2690
2691 #if DEBUG_TRANSPORT
2692   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2693               "Blacklist check due to CONNECT message: `%s'\n",
2694               GNUNET_i2s (peer),
2695               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2696 #endif
2697
2698   /* not allowed */
2699   if (GNUNET_OK != result)
2700   {
2701     GNUNET_HELLO_address_free (bcc->address);
2702     GNUNET_free (bcc);
2703     return;
2704   }
2705
2706   n = lookup_neighbour (peer);
2707   if (NULL == n)
2708     n = setup_neighbour (peer);
2709
2710   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2711   {
2712     if (NULL != bcc->session)
2713       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2714                        "transport-ats",
2715                        "Giving ATS session %p of address `%s' for peer %s\n",
2716                        bcc->session, GST_plugins_a2s (bcc->address),
2717                        GNUNET_i2s (peer));
2718     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2719
2720     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2721                                bcc->ats_count);
2722     n->connect_ts = bcc->ts;
2723   }
2724
2725   GNUNET_HELLO_address_free (bcc->address);
2726   GNUNET_free (bcc);
2727
2728   if (n->state != S_CONNECT_RECV)
2729     change_state (n, S_CONNECT_RECV);
2730
2731
2732   /* Ask ATS for an address to connect via that address */
2733   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2734     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2735   n->ats_suggest =
2736       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2737                                     n);
2738   GNUNET_ATS_suggest_address (GST_ats, peer);
2739 }
2740
2741 /**
2742  * We received a 'SESSION_CONNECT' message from the other peer.
2743  * Consider switching to it.
2744  *
2745  * @param message possibly a 'struct SessionConnectMessage' (check format)
2746  * @param peer identity of the peer to switch the address for
2747  * @param address address of the other peer, NULL if other peer
2748  *                       connected to us
2749  * @param session session to use (or NULL)
2750  * @param ats performance data
2751  * @param ats_count number of entries in ats (excluding 0-termination)
2752  */
2753 void
2754 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2755                                const struct GNUNET_PeerIdentity *peer,
2756                                const struct GNUNET_HELLO_Address *address,
2757                                struct Session *session,
2758                                const struct GNUNET_ATS_Information *ats,
2759                                uint32_t ats_count)
2760 {
2761   const struct SessionConnectMessage *scm;
2762   struct BlackListCheckContext *bcc = NULL;
2763   struct NeighbourMapEntry *n;
2764
2765 #if DEBUG_TRANSPORT
2766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2767               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2768 #endif
2769
2770   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2771   {
2772     GNUNET_break_op (0);
2773     return;
2774   }
2775
2776   scm = (const struct SessionConnectMessage *) message;
2777   GNUNET_break_op (ntohl (scm->reserved) == 0);
2778
2779   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2780
2781   n = lookup_neighbour (peer);
2782   if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n->state)))
2783   {
2784     /* connected peer switches addresses or is trying to do a fast reconnect*/
2785     return;
2786   }
2787
2788
2789   /* we are not connected to this peer */
2790   /* do blacklist check */
2791   bcc =
2792       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2793                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2794   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2795   bcc->ats_count = ats_count + 1;
2796   bcc->address = GNUNET_HELLO_address_copy (address);
2797   bcc->session = session;
2798   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2799   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2800   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2801   bcc->ats[ats_count].value =
2802       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2803   GST_blacklist_test_allowed (peer, address->transport_name,
2804                               &handle_connect_blacklist_cont, bcc);
2805 }
2806
2807
2808 /* end of file gnunet-service-transport_neighbours.c */