(no commit message)
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
61
62 #define FAST_RECONNECT_RATE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100)
63
64 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
65
66 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
67
68 #define  TEST_NEW_CODE GNUNET_NO
69
70 /**
71  * Entry in neighbours.
72  */
73 struct NeighbourMapEntry;
74
75 GNUNET_NETWORK_STRUCT_BEGIN
76
77 /**
78  * Message a peer sends to another to indicate its
79  * preference for communicating via a particular
80  * session (and the desire to establish a real
81  * connection).
82  */
83 struct SessionConnectMessage
84 {
85   /**
86    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
87    */
88   struct GNUNET_MessageHeader header;
89
90   /**
91    * Always zero.
92    */
93   uint32_t reserved GNUNET_PACKED;
94
95   /**
96    * Absolute time at the sender.  Only the most recent connect
97    * message implies which session is preferred by the sender.
98    */
99   struct GNUNET_TIME_AbsoluteNBO timestamp;
100
101 };
102
103
104 struct SessionDisconnectMessage
105 {
106   /**
107    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
108    */
109   struct GNUNET_MessageHeader header;
110
111   /**
112    * Always zero.
113    */
114   uint32_t reserved GNUNET_PACKED;
115
116   /**
117    * Purpose of the signature.  Extends over the timestamp.
118    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
119    */
120   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
121
122   /**
123    * Absolute time at the sender.  Only the most recent connect
124    * message implies which session is preferred by the sender.
125    */
126   struct GNUNET_TIME_AbsoluteNBO timestamp;
127
128   /**
129    * Public key of the sender.
130    */
131   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
132
133   /**
134    * Signature of the peer that sends us the disconnect.  Only
135    * valid if the timestamp is AFTER the timestamp from the
136    * corresponding 'CONNECT' message.
137    */
138   struct GNUNET_CRYPTO_RsaSignature signature;
139
140 };
141 GNUNET_NETWORK_STRUCT_END
142
143 /**
144  * For each neighbour we keep a list of messages
145  * that we still want to transmit to the neighbour.
146  */
147 struct MessageQueue
148 {
149
150   /**
151    * This is a doubly linked list.
152    */
153   struct MessageQueue *next;
154
155   /**
156    * This is a doubly linked list.
157    */
158   struct MessageQueue *prev;
159
160   /**
161    * Once this message is actively being transmitted, which
162    * neighbour is it associated with?
163    */
164   struct NeighbourMapEntry *n;
165
166   /**
167    * Function to call once we're done.
168    */
169   GST_NeighbourSendContinuation cont;
170
171   /**
172    * Closure for 'cont'
173    */
174   void *cont_cls;
175
176   /**
177    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
178    * stuck together in memory.  Allocated at the end of this struct.
179    */
180   const char *message_buf;
181
182   /**
183    * Size of the message buf
184    */
185   size_t message_buf_size;
186
187   /**
188    * At what time should we fail?
189    */
190   struct GNUNET_TIME_Absolute timeout;
191
192 };
193
194
195 enum State
196 {
197   /**
198    * fresh peer or completely disconnected
199    */
200   S_NOT_CONNECTED,
201
202   /**
203    * sent CONNECT message to other peer, waiting for CONNECT_ACK
204    */
205   S_CONNECT_SENT,
206
207   /**
208    * received CONNECT message to other peer, sending CONNECT_ACK
209    */
210   S_CONNECT_RECV,
211
212   /**
213    * received ACK or payload
214    */
215   S_CONNECTED,
216
217   /**
218    * connection ended, fast reconnect
219    */
220   S_FAST_RECONNECT,
221
222   /**
223    * Disconnect in progress
224    */
225   S_DISCONNECT
226 };
227
228 enum Address_State
229 {
230   USED,
231   UNUSED,
232   FRESH,
233 };
234
235
236 /**
237  * Entry in neighbours.
238  */
239 struct NeighbourMapEntry
240 {
241
242   /**
243    * Head of list of messages we would like to send to this peer;
244    * must contain at most one message per client.
245    */
246   struct MessageQueue *messages_head;
247
248   /**
249    * Tail of list of messages we would like to send to this peer; must
250    * contain at most one message per client.
251    */
252   struct MessageQueue *messages_tail;
253
254   /**
255    * Are we currently trying to send a message? If so, which one?
256    */
257   struct MessageQueue *is_active;
258
259   /**
260    * Active session for communicating with the peer.
261    */
262   struct Session *session;
263
264   /**
265    * Address we currently use.
266    */
267   struct GNUNET_HELLO_Address *address;
268
269   /**
270    * Address we currently use.
271    */
272   struct GNUNET_HELLO_Address *fast_reconnect_address;
273
274
275   /**
276    * Identity of this neighbour.
277    */
278   struct GNUNET_PeerIdentity id;
279
280   /**
281    * ID of task scheduled to run when this peer is about to
282    * time out (will free resources associated with the peer).
283    */
284   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
285
286   /**
287    * ID of task scheduled to send keepalives.
288    */
289   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
290
291   /**
292    * ID of task scheduled to run when we should try transmitting
293    * the head of the message queue.
294    */
295   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
296
297   /**
298    * Tracker for inbound bandwidth.
299    */
300   struct GNUNET_BANDWIDTH_Tracker in_tracker;
301
302   /**
303    * Inbound bandwidth from ATS, activated when connection is up
304    */
305   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
306
307   /**
308    * Inbound bandwidth from ATS, activated when connection is up
309    */
310   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
311
312   /**
313    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
314    */
315   struct GNUNET_TIME_Absolute connect_ts;
316
317   /**
318    * When did we sent the last keep-alive message?
319    */
320   struct GNUNET_TIME_Absolute keep_alive_sent;
321
322   /**
323    * Latest calculated latency value
324    */
325   struct GNUNET_TIME_Relative latency;
326
327   /**
328    * Timeout for ATS
329    * We asked ATS for a new address for this peer
330    */
331   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
332
333   /**
334    * Delay for ATS request
335    */
336   GNUNET_SCHEDULER_TaskIdentifier ats_request;
337
338   /**
339    * Task the resets the peer state after due to an pending
340    * unsuccessful connection setup
341    */
342   GNUNET_SCHEDULER_TaskIdentifier state_reset;
343
344
345   /**
346    * How often has the other peer (recently) violated the inbound
347    * traffic limit?  Incremented by 10 per violation, decremented by 1
348    * per non-violation (for each time interval).
349    */
350   unsigned int quota_violation_count;
351
352
353   /**
354    * The current state of the peer
355    * Element of enum State
356    */
357   int state;
358
359   /**
360    * Did we sent an KEEP_ALIVE message and are we expecting a response?
361    */
362   int expect_latency_response;
363
364   /**
365    * Was the address used successfully
366    */
367   int address_state;
368
369   /**
370    * Fast reconnect attempts for identical address
371    */
372   unsigned int fast_reconnect_attempts;
373 };
374
375
376 /**
377  * All known neighbours and their HELLOs.
378  */
379 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
380
381 /**
382  * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb
383  */
384 static void *callback_cls;
385
386 /**
387  * Function to call when we connected to a neighbour.
388  */
389 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
390
391 /**
392  * Function to call when we disconnected from a neighbour.
393  */
394 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
395
396 /**
397  * Function to call when we changed an active address of a neighbour.
398  */
399 static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb;
400
401 /**
402  * counter for connected neighbours
403  */
404 static int neighbours_connected;
405
406 /**
407  * Lookup a neighbour entry in the neighbours hash map.
408  *
409  * @param pid identity of the peer to look up
410  * @return the entry, NULL if there is no existing record
411  */
412 static struct NeighbourMapEntry *
413 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
414 {
415   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
416 }
417
418 /**
419  * Disconnect from the given neighbour, clean up the record.
420  *
421  * @param n neighbour to disconnect from
422  */
423 static void
424 disconnect_neighbour (struct NeighbourMapEntry *n);
425
426 #define change_state(n, state, ...) change (n, state, __LINE__)
427
428 static int
429 is_connecting (struct NeighbourMapEntry *n)
430 {
431   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
432     return GNUNET_YES;
433   return GNUNET_NO;
434 }
435
436 static int
437 is_connected (struct NeighbourMapEntry *n)
438 {
439   if (n->state == S_CONNECTED)
440     return GNUNET_YES;
441   return GNUNET_NO;
442 }
443
444 static int
445 is_disconnecting (struct NeighbourMapEntry *n)
446 {
447   if (n->state == S_DISCONNECT)
448     return GNUNET_YES;
449   return GNUNET_NO;
450 }
451
452 static const char *
453 print_state (int state)
454 {
455   switch (state)
456   {
457   case S_CONNECTED:
458     return "S_CONNECTED";
459     break;
460   case S_CONNECT_RECV:
461     return "S_CONNECT_RECV";
462     break;
463   case S_CONNECT_SENT:
464     return "S_CONNECT_SENT";
465     break;
466   case S_DISCONNECT:
467     return "S_DISCONNECT";
468     break;
469   case S_NOT_CONNECTED:
470     return "S_NOT_CONNECTED";
471     break;
472   case S_FAST_RECONNECT:
473     return "S_FAST_RECONNECT";
474     break;
475   default:
476     GNUNET_break (0);
477     break;
478   }
479   return NULL;
480 }
481
482 static int
483 change (struct NeighbourMapEntry *n, int state, int line);
484
485 static void
486 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
487
488
489 static void
490 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
491 {
492   struct NeighbourMapEntry *n = cls;
493
494   if (n == NULL)
495     return;
496
497   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
498   if (n->state == S_CONNECTED)
499     return;
500
501 #if DEBUG_TRANSPORT
502   GNUNET_STATISTICS_update (GST_stats,
503                             gettext_noop
504                             ("# failed connection attempts due to timeout"), 1,
505                             GNUNET_NO);
506 #endif
507
508   /* resetting state */
509
510   if (n->state == S_FAST_RECONNECT)
511   {
512     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
513                 "Fast reconnect time out, disconnecting peer `%s'\n",
514                 GNUNET_i2s (&n->id));
515     disconnect_neighbour(n);
516     return;
517   }
518
519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
520               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
521               GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECTED", __LINE__);
522
523   n->state = S_NOT_CONNECTED;
524
525   /* destroying address */
526   if (n->address != NULL)
527   {
528     GNUNET_assert (strlen (n->address->transport_name) > 0);
529     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
530   }
531
532   /* request new address */
533   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
534     GNUNET_SCHEDULER_cancel (n->ats_suggest);
535   n->ats_suggest =
536       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
537                                     n);
538   GNUNET_ATS_suggest_address (GST_ats, &n->id);
539 }
540
541 static int
542 change (struct NeighbourMapEntry *n, int state, int line)
543 {
544   int previous_state;
545   /* allowed transitions */
546   int allowed = GNUNET_NO;
547
548   previous_state = n->state;
549
550   switch (n->state)
551   {
552   case S_NOT_CONNECTED:
553     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
554         (state == S_DISCONNECT))
555       allowed = GNUNET_YES;
556     break;
557   case S_CONNECT_RECV:
558     allowed = GNUNET_YES;
559     break;
560   case S_CONNECT_SENT:
561     allowed = GNUNET_YES;
562     break;
563   case S_CONNECTED:
564     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
565       allowed = GNUNET_YES;
566     break;
567   case S_DISCONNECT:
568     break;
569   case S_FAST_RECONNECT:
570     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
571       allowed = GNUNET_YES;
572     break;
573   default:
574     GNUNET_break (0);
575     break;
576   }
577   if (allowed == GNUNET_NO)
578   {
579     char *old = GNUNET_strdup (print_state (n->state));
580     char *new = GNUNET_strdup (print_state (state));
581
582     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
583                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
584                 new, line);
585     GNUNET_break (0);
586     GNUNET_free (old);
587     GNUNET_free (new);
588     return GNUNET_SYSERR;
589   }
590   {
591     char *old = GNUNET_strdup (print_state (n->state));
592     char *new = GNUNET_strdup (print_state (state));
593
594     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
595                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
596                 GNUNET_i2s (&n->id), n, old, new, line);
597     GNUNET_free (old);
598     GNUNET_free (new);
599   }
600   n->state = state;
601
602   switch (n->state)
603   {
604   case S_FAST_RECONNECT:
605   case S_CONNECT_RECV:
606   case S_CONNECT_SENT:
607     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
608       GNUNET_SCHEDULER_cancel (n->state_reset);
609     n->state_reset =
610         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
611     break;
612   case S_CONNECTED:
613   case S_NOT_CONNECTED:
614   case S_DISCONNECT:
615     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
616     {
617 #if DEBUG_TRANSPORT
618       char *old = GNUNET_strdup (print_state (n->state));
619       char *new = GNUNET_strdup (print_state (state));
620
621       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
622                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
623                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
624       GNUNET_free (old);
625       GNUNET_free (new);
626 #endif
627       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
628       GNUNET_SCHEDULER_cancel (n->state_reset);
629       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
630     }
631     break;
632
633   default:
634     GNUNET_assert (0);
635   }
636
637   if (NULL != address_change_cb)
638   {
639     if (n->state == S_CONNECTED)
640       address_change_cb (callback_cls, &n->id, n->address);
641     else if (previous_state == S_CONNECTED)
642       address_change_cb (callback_cls, &n->id, NULL);
643   }
644
645   return GNUNET_OK;
646 }
647
648 static ssize_t
649 send_with_session (struct NeighbourMapEntry *n,
650                    const char *msgbuf, size_t msgbuf_size,
651                    uint32_t priority,
652                    struct GNUNET_TIME_Relative timeout,
653                    GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
654 {
655   struct GNUNET_TRANSPORT_PluginFunctions *papi;
656   size_t ret = GNUNET_SYSERR;
657
658   GNUNET_assert (n != NULL);
659   GNUNET_assert (n->session != NULL);
660
661   papi = GST_plugins_find (n->address->transport_name);
662   if (papi == NULL)
663   {
664     if (cont != NULL)
665       cont (cont_cls, &n->id, GNUNET_SYSERR);
666     return GNUNET_SYSERR;
667   }
668
669   ret = papi->send (papi->cls,
670                    n->session,
671                    msgbuf, msgbuf_size,
672                    0,
673                    timeout,
674                    cont, cont_cls);
675
676   if ((ret == -1) && (cont != NULL))
677       cont (cont_cls, &n->id, GNUNET_SYSERR);
678   return ret;
679 }
680
681 /**
682  * Task invoked to start a transmission to another peer.
683  *
684  * @param cls the 'struct NeighbourMapEntry'
685  * @param tc scheduler context
686  */
687 static void
688 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
689
690
691 /**
692  * We're done with our transmission attempt, continue processing.
693  *
694  * @param cls the 'struct MessageQueue' of the message
695  * @param receiver intended receiver
696  * @param success whether it worked or not
697  */
698 static void
699 transmit_send_continuation (void *cls,
700                             const struct GNUNET_PeerIdentity *receiver,
701                             int success)
702 {
703   struct MessageQueue *mq = cls;
704   struct NeighbourMapEntry *n;
705   struct NeighbourMapEntry *tmp;
706
707   tmp = lookup_neighbour (receiver);
708   n = mq->n;
709   if ((NULL != n) && (tmp != NULL) && (tmp == n))
710   {
711     GNUNET_assert (n->is_active == mq);
712     n->is_active = NULL;
713     if (success == GNUNET_YES)
714     {
715       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
716       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
717     }
718   }
719 #if DEBUG_TRANSPORT
720   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
721               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
722               (success == GNUNET_OK) ? "successful" : "FAILED");
723 #endif
724   if (NULL != mq->cont)
725     mq->cont (mq->cont_cls, success);
726   GNUNET_free (mq);
727 }
728
729
730 /**
731  * Check the ready list for the given neighbour and if a plugin is
732  * ready for transmission (and if we have a message), do so!
733  *
734  * @param n target peer for which to transmit
735  */
736 static void
737 try_transmission_to_peer (struct NeighbourMapEntry *n)
738 {
739   struct MessageQueue *mq;
740   struct GNUNET_TIME_Relative timeout;
741   ssize_t ret;
742
743   if (n->is_active != NULL)
744   {
745     GNUNET_break (0);
746     return;                     /* transmission already pending */
747   }
748   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
749   {
750     GNUNET_break (0);
751     return;                     /* currently waiting for bandwidth */
752   }
753   while (NULL != (mq = n->messages_head))
754   {
755     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
756     if (timeout.rel_value > 0)
757       break;
758     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
759     n->is_active = mq;
760     mq->n = n;
761     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
762   }
763   if (NULL == mq)
764     return;                     /* no more messages */
765
766   if (n->address == NULL)
767   {
768 #if DEBUG_TRANSPORT
769     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
770                 GNUNET_i2s (&n->id));
771 #endif
772     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
773     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
774     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
775     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
776     return;
777   }
778
779   if (GST_plugins_find (n->address->transport_name) == NULL)
780   {
781     GNUNET_break (0);
782     return;
783   }
784   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
785   n->is_active = mq;
786   mq->n = n;
787
788   if ((n->address->address_length == 0) && (n->session == NULL))
789   {
790     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
791                 GNUNET_i2s (&n->id));
792     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
793     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
794     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
795     return;
796   }
797
798   ret = send_with_session(n,
799               mq->message_buf, mq->message_buf_size,
800               0, timeout,
801               &transmit_send_continuation, mq);
802
803   if (ret == -1)
804   {
805     /* failure, but 'send' would not call continuation in this case,
806      * so we need to do it here! */
807     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
808   }
809
810 }
811
812
813 /**
814  * Task invoked to start a transmission to another peer.
815  *
816  * @param cls the 'struct NeighbourMapEntry'
817  * @param tc scheduler context
818  */
819 static void
820 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
821 {
822   struct NeighbourMapEntry *n = cls;
823
824   GNUNET_assert (NULL != lookup_neighbour (&n->id));
825   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
826   try_transmission_to_peer (n);
827 }
828
829
830 /**
831  * Initialize the neighbours subsystem.
832  *
833  * @param cls closure for callbacks
834  * @param connect_cb function to call if we connect to a peer
835  * @param disconnect_cb function to call if we disconnect from a peer
836  * @param peer_address_cb function to call if we change an active address
837  *                   of a neighbour
838  */
839 void
840 GST_neighbours_start (void *cls,
841                       GNUNET_TRANSPORT_NotifyConnect connect_cb,
842                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
843                       GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
844 {
845   callback_cls = cls;
846   connect_notify_cb = connect_cb;
847   disconnect_notify_cb = disconnect_cb;
848   address_change_cb = peer_address_cb;
849   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
850 }
851
852
853 static void
854 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
855                       int result)
856 {
857 #if DEBUG_TRANSPORT
858   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859               "Sending DISCONNECT message to peer `%4s': %i\n",
860               GNUNET_i2s (target), result);
861 #endif
862 }
863
864
865 static int
866 send_disconnect (struct NeighbourMapEntry * n)
867 {
868   size_t ret;
869   struct SessionDisconnectMessage disconnect_msg;
870
871 #if DEBUG_TRANSPORT
872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873               "Sending DISCONNECT message to peer `%4s'\n",
874               GNUNET_i2s (&n->id));
875 #endif
876
877   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
878   disconnect_msg.header.type =
879       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
880   disconnect_msg.reserved = htonl (0);
881   disconnect_msg.purpose.size =
882       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
883              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
884              sizeof (struct GNUNET_TIME_AbsoluteNBO));
885   disconnect_msg.purpose.purpose =
886       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
887   disconnect_msg.timestamp =
888       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
889   disconnect_msg.public_key = GST_my_public_key;
890   GNUNET_assert (GNUNET_OK ==
891                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
892                                          &disconnect_msg.purpose,
893                                          &disconnect_msg.signature));
894
895   ret = send_with_session (n,
896             (const char *) &disconnect_msg, sizeof (disconnect_msg),
897             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
898             &send_disconnect_cont, NULL);
899
900   if (ret == GNUNET_SYSERR)
901     return GNUNET_SYSERR;
902
903   GNUNET_STATISTICS_update (GST_stats,
904                             gettext_noop
905                             ("# peers disconnected due to external request"), 1,
906                             GNUNET_NO);
907   return GNUNET_OK;
908 }
909
910
911 /**
912  * Disconnect from the given neighbour, clean up the record.
913  *
914  * @param n neighbour to disconnect from
915  */
916 static void
917 disconnect_neighbour (struct NeighbourMapEntry *n)
918 {
919   struct MessageQueue *mq;
920   int previous_state;
921
922   previous_state = n->state;
923
924   if (is_disconnecting (n))
925     return;
926
927   /* send DISCONNECT MESSAGE */
928   if (previous_state == S_CONNECTED)
929   {
930     if (GNUNET_OK == send_disconnect (n))
931       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
932                   GNUNET_i2s (&n->id));
933     else
934       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935                   "Could not send DISCONNECT_MSG to `%s'\n",
936                   GNUNET_i2s (&n->id));
937   }
938
939   change_state (n, S_DISCONNECT);
940
941   if (previous_state == S_CONNECTED)
942   {
943     GNUNET_assert (NULL != n->address);
944     if (n->address_state == USED)
945     {
946       GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__);
947       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
948       n->address_state = UNUSED;
949     }
950   }
951
952   if (n->address != NULL)
953   {
954     struct GNUNET_TRANSPORT_PluginFunctions *papi;
955
956     papi = GST_plugins_find (n->address->transport_name);
957     if (papi != NULL)
958       papi->disconnect (papi->cls, &n->id);
959   }
960   while (NULL != (mq = n->messages_head))
961   {
962     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
963     if (NULL != mq->cont)
964       mq->cont (mq->cont_cls, GNUNET_SYSERR);
965     GNUNET_free (mq);
966   }
967   if (NULL != n->is_active)
968   {
969     n->is_active->n = NULL;
970     n->is_active = NULL;
971   }
972
973   switch (previous_state)
974   {
975   case S_CONNECTED:
976     GNUNET_assert (neighbours_connected > 0);
977     neighbours_connected--;
978     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
979     GNUNET_SCHEDULER_cancel (n->keepalive_task);
980     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
981     n->expect_latency_response = GNUNET_NO;
982     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
983                               GNUNET_NO);
984     disconnect_notify_cb (callback_cls, &n->id);
985     break;
986   case S_FAST_RECONNECT:
987     GNUNET_STATISTICS_update (GST_stats,
988                               gettext_noop ("# fast reconnects failed"), 1,
989                               GNUNET_NO);
990     disconnect_notify_cb (callback_cls, &n->id);
991     break;
992   default:
993     break;
994   }
995
996   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
997
998   GNUNET_assert (GNUNET_YES ==
999                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
1000                                                        &n->id.hashPubKey, n));
1001   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
1002   {
1003     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1004     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1005   }
1006   if (n->ats_request != GNUNET_SCHEDULER_NO_TASK)
1007   {
1008     GNUNET_SCHEDULER_cancel (n->ats_request);
1009     n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1010   }
1011
1012   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
1013   {
1014     GNUNET_SCHEDULER_cancel (n->timeout_task);
1015     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1016   }
1017   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
1018   {
1019     GNUNET_SCHEDULER_cancel (n->transmission_task);
1020     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
1021   }
1022   if (NULL != n->fast_reconnect_address)
1023   {
1024     GNUNET_HELLO_address_free (n->fast_reconnect_address);
1025     n->fast_reconnect_address = NULL;
1026   }
1027   if (NULL != n->address)
1028   {
1029     GNUNET_HELLO_address_free (n->address);
1030     n->address = NULL;
1031   }
1032   n->session = NULL;
1033   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
1034               GNUNET_i2s (&n->id), n);
1035   GNUNET_free (n);
1036 }
1037
1038
1039 /**
1040  * Peer has been idle for too long. Disconnect.
1041  *
1042  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1043  * @param tc scheduler context
1044  */
1045 static void
1046 neighbour_timeout_task (void *cls,
1047                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1048 {
1049   struct NeighbourMapEntry *n = cls;
1050
1051   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1052   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer`%4s' disconnected due to timeout\n",
1053               GNUNET_i2s (&n->id));
1054
1055   GNUNET_STATISTICS_update (GST_stats,
1056                             gettext_noop
1057                             ("# peers disconnected due to timeout"), 1,
1058                             GNUNET_NO);
1059   disconnect_neighbour (n);
1060 }
1061
1062
1063 /**
1064  * Send another keepalive message.
1065  *
1066  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1067  * @param tc scheduler context
1068  */
1069 static void
1070 neighbour_keepalive_task (void *cls,
1071                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1072 {
1073   struct NeighbourMapEntry *n = cls;
1074   struct GNUNET_MessageHeader m;
1075   int ret;
1076
1077   GNUNET_assert (S_CONNECTED == n->state);
1078   n->keepalive_task =
1079       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1080                                     &neighbour_keepalive_task, n);
1081
1082   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1083                             GNUNET_NO);
1084   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1085   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1086
1087   ret = send_with_session (n,
1088             (const void *) &m, sizeof (m),
1089             UINT32_MAX /* priority */ ,
1090             GNUNET_TIME_UNIT_FOREVER_REL,
1091             NULL, NULL);
1092
1093   n->expect_latency_response = GNUNET_NO;
1094   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1095   if (ret != GNUNET_SYSERR)
1096   {
1097     n->expect_latency_response = GNUNET_YES;
1098     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1099   }
1100
1101 }
1102
1103
1104 /**
1105  * Disconnect from the given neighbour.
1106  *
1107  * @param cls unused
1108  * @param key hash of neighbour's public key (not used)
1109  * @param value the 'struct NeighbourMapEntry' of the neighbour
1110  */
1111 static int
1112 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1113 {
1114   struct NeighbourMapEntry *n = value;
1115
1116 #if DEBUG_TRANSPORT
1117   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1118               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1119 #endif
1120   if (S_CONNECTED == n->state)
1121     GNUNET_STATISTICS_update (GST_stats,
1122                               gettext_noop
1123                               ("# peers disconnected due to global disconnect"),
1124                               1, GNUNET_NO);
1125   disconnect_neighbour (n);
1126   return GNUNET_OK;
1127 }
1128
1129
1130 static void
1131 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1132 {
1133   struct NeighbourMapEntry *n = cls;
1134
1135   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1136
1137   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1138               "ATS did not suggested address to connect to peer `%s'\n",
1139               GNUNET_i2s (&n->id));
1140
1141   disconnect_neighbour (n);
1142 }
1143
1144 /**
1145  * Cleanup the neighbours subsystem.
1146  */
1147 void
1148 GST_neighbours_stop ()
1149 {
1150   // This can happen during shutdown
1151   if (neighbours == NULL)
1152   {
1153     return;
1154   }
1155
1156   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1157                                          NULL);
1158   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1159 //  GNUNET_assert (neighbours_connected == 0);
1160   neighbours = NULL;
1161   callback_cls = NULL;
1162   connect_notify_cb = NULL;
1163   disconnect_notify_cb = NULL;
1164   address_change_cb = NULL;
1165 }
1166
1167 struct ContinutionContext
1168 {
1169   struct GNUNET_HELLO_Address *address;
1170
1171   struct Session *session;
1172 };
1173
1174 static void
1175 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1176                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1177 {
1178   struct QuotaSetMessage q_msg;
1179
1180 #if DEBUG_TRANSPORT
1181   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1182               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1183               ntohl (quota.value__), GNUNET_i2s (target));
1184 #endif
1185   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1186   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1187   q_msg.quota = quota;
1188   q_msg.peer = (*target);
1189   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1190 }
1191
1192 /**
1193  * We tried to send a SESSION_CONNECT message to another peer.  If this
1194  * succeeded, we change the state.  If it failed, we should tell
1195  * ATS to not use this address anymore (until it is re-validated).
1196  *
1197  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1198  * @param target peer to send the message to
1199  * @param success GNUNET_OK on success
1200  */
1201 static void
1202 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1203                            int success)
1204 {
1205   struct ContinutionContext *cc = cls;
1206   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1207
1208   if (GNUNET_YES != success)
1209   {
1210     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1211     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1212   }
1213   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1214   {
1215     GNUNET_HELLO_address_free (cc->address);
1216     GNUNET_free (cc);
1217     return;
1218   }
1219
1220   if ((GNUNET_YES == success) &&
1221       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1222   {
1223     change_state (n, S_CONNECT_SENT);
1224     GNUNET_HELLO_address_free (cc->address);
1225     GNUNET_free (cc);
1226     return;
1227   }
1228
1229   if ((GNUNET_NO == success) &&
1230       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1231   {
1232 #if DEBUG_TRANSPORT
1233     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1234                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1235                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1236 #endif
1237     change_state (n, S_NOT_CONNECTED);
1238     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1239       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1240     n->ats_suggest =
1241         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1242                                       n);
1243     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1244   }
1245   GNUNET_HELLO_address_free (cc->address);
1246   GNUNET_free (cc);
1247 }
1248
1249
1250 static void
1251 ats_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1252 {
1253   struct NeighbourMapEntry *n = cls;
1254   n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1255
1256   n->ats_suggest =
1257     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1258                                   n);
1259 }
1260
1261
1262 /**
1263  * We tried to switch addresses with an peer already connected. If it failed,
1264  * we should tell ATS to not use this address anymore (until it is re-validated).
1265  *
1266  * @param cls the 'struct NeighbourMapEntry'
1267  * @param target peer to send the message to
1268  * @param success GNUNET_OK on success
1269  */
1270 static void
1271 send_switch_address_continuation (void *cls,
1272                                   const struct GNUNET_PeerIdentity *target,
1273                                   int success)
1274 {
1275   struct ContinutionContext *cc = cls;
1276   struct NeighbourMapEntry *n;
1277
1278   if (neighbours == NULL)
1279   {
1280     GNUNET_HELLO_address_free (cc->address);
1281     GNUNET_free (cc);
1282     return;                     /* neighbour is going away */
1283   }
1284
1285   n = lookup_neighbour (&cc->address->peer);
1286   if ((n == NULL) || (is_disconnecting (n)))
1287   {
1288     GNUNET_HELLO_address_free (cc->address);
1289     GNUNET_free (cc);
1290     return;                     /* neighbour is going away */
1291   }
1292
1293   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1294   if (GNUNET_YES != success)
1295   {
1296
1297     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1298                 "Failed to switch connected peer `%s' in state %s to address '%s' session %X, asking ATS for new address \n",
1299                 GNUNET_i2s (&n->id), print_state(n->state), GST_plugins_a2s (cc->address), cc->session);
1300
1301     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1302     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1303
1304     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1305     {
1306       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1307       n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1308     }
1309
1310     if (n->state == S_FAST_RECONNECT)
1311     {
1312       if (GNUNET_SCHEDULER_NO_TASK == n->ats_request)
1313       {
1314         /* Throtteled ATS request for fast reconnect */
1315         struct GNUNET_TIME_Relative delay = GNUNET_TIME_relative_multiply(FAST_RECONNECT_RATE, n->fast_reconnect_attempts);
1316         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1317                     "Fast reconnect attempt %u failed, delay ATS request for %llu ms\n", n->fast_reconnect_attempts, delay.rel_value);
1318         n->ats_request =
1319             GNUNET_SCHEDULER_add_delayed (delay,
1320                                           ats_request,
1321                                           n);
1322       }
1323     }
1324     else
1325     {
1326       /* Immediate ATS request for connected peers */
1327       n->ats_suggest =
1328         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1329                                       n);
1330       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1331     }
1332     GNUNET_HELLO_address_free (cc->address);
1333     GNUNET_free (cc);
1334     return;
1335   }
1336   /* Tell ATS that switching addresses was successful */
1337   switch (n->state)
1338   {
1339   case S_CONNECTED:
1340     if (n->address_state == FRESH)
1341     {
1342       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1343       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1344       if (cc->session != n->session)
1345         GNUNET_break (0);
1346       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1347       n->address_state = USED;
1348     }
1349     break;
1350   case S_FAST_RECONNECT:
1351 #if DEBUG_TRANSPORT
1352     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1353                 "Successful fast reconnect to peer `%s'\n",
1354                 GNUNET_i2s (&n->id));
1355 #endif
1356     change_state (n, S_CONNECTED);
1357     neighbours_connected++;
1358     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1359                               GNUNET_NO);
1360
1361     if (n->address_state == FRESH)
1362     {
1363       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1364       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1365       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1366       n->address_state = USED;
1367     }
1368
1369     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1370       n->keepalive_task =
1371           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1372
1373     /* Updating quotas */
1374     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1375     send_outbound_quota (target, n->bandwidth_out);
1376
1377   default:
1378     break;
1379   }
1380   GNUNET_HELLO_address_free (cc->address);
1381   GNUNET_free (cc);
1382 }
1383
1384
1385 /**
1386  * We tried to send a SESSION_CONNECT message to another peer.  If this
1387  * succeeded, we change the state.  If it failed, we should tell
1388  * ATS to not use this address anymore (until it is re-validated).
1389  *
1390  * @param cls the 'struct NeighbourMapEntry'
1391  * @param target peer to send the message to
1392  * @param success GNUNET_OK on success
1393  */
1394 static void
1395 send_connect_ack_continuation (void *cls,
1396                                const struct GNUNET_PeerIdentity *target,
1397                                int success)
1398 {
1399   struct ContinutionContext *cc = cls;
1400   struct NeighbourMapEntry *n;
1401
1402   if (neighbours == NULL)
1403   {
1404     GNUNET_HELLO_address_free (cc->address);
1405     GNUNET_free (cc);
1406     return;                     /* neighbour is going away */
1407   }
1408
1409   n = lookup_neighbour (&cc->address->peer);
1410   if ((n == NULL) || (is_disconnecting (n)))
1411   {
1412     GNUNET_HELLO_address_free (cc->address);
1413     GNUNET_free (cc);
1414     return;                     /* neighbour is going away */
1415   }
1416
1417   if (GNUNET_YES == success)
1418   {
1419     GNUNET_HELLO_address_free (cc->address);
1420     GNUNET_free (cc);
1421     return;                     /* sending successful */
1422   }
1423
1424   /* sending failed, ask for next address  */
1425 #if DEBUG_TRANSPORT
1426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1427               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1428               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1429 #endif
1430   change_state (n, S_NOT_CONNECTED);
1431   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1432   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1433
1434   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1435     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1436   n->ats_suggest =
1437       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1438                                     n);
1439   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1440   GNUNET_HELLO_address_free (cc->address);
1441   GNUNET_free (cc);
1442 }
1443
1444
1445 /**
1446  * For an existing neighbour record, set the active connection to
1447  * use the given address.
1448  *
1449  * @param peer identity of the peer to switch the address for
1450  * @param address address of the other peer, NULL if other peer
1451  *                       connected to us
1452  * @param session session to use (or NULL)
1453  * @param ats performance data
1454  * @param ats_count number of entries in ats
1455  * @param bandwidth_in inbound quota to be used when connection is up
1456  * @param bandwidth_out outbound quota to be used when connection is up
1457  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1458  *         connection is not up (yet)
1459  */
1460 int
1461 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
1462                                        const struct GNUNET_HELLO_Address
1463                                        *address,
1464                                        struct Session *session,
1465                                        const struct GNUNET_ATS_Information *ats,
1466                                        uint32_t ats_count,
1467                                        struct GNUNET_BANDWIDTH_Value32NBO
1468                                        bandwidth_in,
1469                                        struct GNUNET_BANDWIDTH_Value32NBO
1470                                        bandwidth_out)
1471 {
1472   struct NeighbourMapEntry *n;
1473   struct SessionConnectMessage connect_msg;
1474   struct ContinutionContext *cc;
1475   size_t msg_len;
1476   size_t ret;
1477
1478   if (neighbours == NULL)
1479   {
1480     /* This can happen during shutdown */
1481     return GNUNET_NO;
1482   }
1483   n = lookup_neighbour (peer);
1484   if (NULL == n)
1485     return GNUNET_NO;
1486   if (n->state == S_DISCONNECT)
1487   {
1488     /* We are disconnecting, nothing to do here */
1489     return GNUNET_NO;
1490   }
1491   GNUNET_assert (address->transport_name != NULL);
1492   if ((session == NULL) && (0 == address->address_length))
1493   {
1494     GNUNET_break_op (0);
1495     /* FIXME: is this actually possible? When does this happen? */
1496     if (strlen (address->transport_name) > 0)
1497       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1498     GNUNET_ATS_suggest_address (GST_ats, peer);
1499     return GNUNET_NO;
1500   }
1501
1502   /* checks successful and neighbour != NULL */
1503   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1504               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1505               (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
1506               session,
1507               GNUNET_i2s (peer),
1508               print_state (n->state));
1509
1510   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1511   {
1512     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1513     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1514   }
1515
1516
1517   if (n->state == S_FAST_RECONNECT)
1518   {
1519     /* Throtteled fast reconnect */
1520
1521     if (NULL == n->fast_reconnect_address)
1522     {
1523       n->fast_reconnect_address = GNUNET_HELLO_address_copy (address);
1524
1525     }
1526     else if (0 == GNUNET_HELLO_address_cmp(address, n->fast_reconnect_address))
1527     {
1528       n->fast_reconnect_attempts ++;
1529
1530       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1531                 "FAST RECONNECT to peer `%s' and address '%s' with identical address attempt %u\n",
1532                 GNUNET_i2s (&n->id), GST_plugins_a2s (address), n->fast_reconnect_attempts);
1533     }
1534   }
1535   else
1536   {
1537     n->fast_reconnect_attempts = 0;
1538   }
1539
1540   /* do not switch addresses just update quotas */
1541   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1542       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1543       (n->session == session))
1544   {
1545     n->bandwidth_in = bandwidth_in;
1546     n->bandwidth_out = bandwidth_out;
1547     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1548     send_outbound_quota (peer, n->bandwidth_out);
1549     return GNUNET_NO;
1550   }
1551   if (n->state == S_CONNECTED)
1552   {
1553     /* mark old address as no longer used */
1554     GNUNET_assert (NULL != n->address);
1555     if (n->address_state == USED)
1556     {
1557       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1558       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1559       n->address_state = UNUSED;
1560     }
1561   }
1562
1563   /* set new address */
1564   if (NULL != n->address)
1565     GNUNET_HELLO_address_free (n->address);
1566   n->address = GNUNET_HELLO_address_copy (address);
1567   n->address_state = FRESH;
1568   n->bandwidth_in = bandwidth_in;
1569   n->bandwidth_out = bandwidth_out;
1570   GNUNET_SCHEDULER_cancel (n->timeout_task);
1571   n->timeout_task =
1572       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1573                                     &neighbour_timeout_task, n);
1574
1575   if (NULL != address_change_cb && n->state == S_CONNECTED)
1576     address_change_cb (callback_cls, &n->id, n->address); 
1577
1578   /* Obtain an session for this address from plugin */
1579   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1580   papi = GST_plugins_find (address->transport_name);
1581
1582   if (papi == NULL)
1583   {
1584     /* we don't have the plugin for this address */
1585     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1586
1587     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1588       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1589     n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1590                                       ats_suggest_cancel,
1591                                       n);
1592     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1593     GNUNET_HELLO_address_free (n->address);
1594     n->address = NULL;
1595     n->session = NULL;
1596     return GNUNET_NO;
1597   }
1598
1599   if (session == NULL)
1600   {
1601     n->session = papi->get_session (papi->cls, address);
1602     /* Session could not be initiated */
1603     if (n->session == NULL)
1604     {
1605       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1606                   "Failed to obtain new session %p for peer `%s' and  address '%s'\n",
1607                   n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1608
1609       GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1610
1611       if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1612         GNUNET_SCHEDULER_cancel (n->ats_suggest);
1613       n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1614                                         ats_suggest_cancel,
1615                                         n);
1616       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1617       GNUNET_HELLO_address_free (n->address);
1618       n->address = NULL;
1619       n->session = NULL;
1620       return GNUNET_NO;
1621     }
1622
1623     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1624                 "Obtained new session %p for peer `%s' and  address '%s'\n",
1625                  n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1626     /* Telling ATS about new session */
1627     GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0);
1628   }
1629   else
1630   {
1631     n->session = session;
1632     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1633                 "Using existing session %p for peer `%s' and  address '%s'\n",
1634                 n->session,
1635                 GNUNET_i2s (&n->id),
1636                 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>");
1637   }
1638
1639   switch (n->state)
1640   {
1641   case S_NOT_CONNECTED:
1642   case S_CONNECT_SENT:
1643     msg_len = sizeof (struct SessionConnectMessage);
1644     connect_msg.header.size = htons (msg_len);
1645     connect_msg.header.type =
1646         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1647     connect_msg.reserved = htonl (0);
1648     connect_msg.timestamp =
1649         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1650
1651     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1652     cc->session = n->session;
1653     cc->address = GNUNET_HELLO_address_copy (address);
1654
1655     ret = send_with_session (n,
1656       (const char *) &connect_msg, msg_len,
1657       UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1658       &send_connect_continuation, cc);
1659
1660     return GNUNET_NO;
1661   case S_CONNECT_RECV:
1662     /* We received a CONNECT message and asked ATS for an address */
1663     msg_len = sizeof (struct SessionConnectMessage);
1664     connect_msg.header.size = htons (msg_len);
1665     connect_msg.header.type =
1666         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1667     connect_msg.reserved = htonl (0);
1668     connect_msg.timestamp =
1669         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1670     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1671     cc->session = n->session;
1672     cc->address = GNUNET_HELLO_address_copy (address);
1673
1674     ret = send_with_session(n,
1675                             (const void *) &connect_msg, msg_len,
1676                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1677                             &send_connect_ack_continuation,
1678                             cc);
1679     return GNUNET_NO;
1680   case S_CONNECTED:
1681   case S_FAST_RECONNECT:
1682     /* connected peer is switching addresses or tries fast reconnect */
1683     msg_len = sizeof (struct SessionConnectMessage);
1684     connect_msg.header.size = htons (msg_len);
1685     connect_msg.header.type =
1686         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1687     connect_msg.reserved = htonl (0);
1688     connect_msg.timestamp =
1689         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1690     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1691     cc->session = n->session;
1692     cc->address = GNUNET_HELLO_address_copy (address);
1693     ret = send_with_session(n,
1694                             (const void *) &connect_msg, msg_len,
1695                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1696                             &send_switch_address_continuation, cc);
1697     if (ret == GNUNET_SYSERR)
1698     {
1699       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1700                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1701                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1702     }
1703     return GNUNET_NO;
1704   default:
1705     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1706                 "Invalid connection state to switch addresses %u \n", n->state);
1707     GNUNET_break_op (0);
1708     return GNUNET_NO;
1709   }
1710 }
1711
1712
1713 /**
1714  * Obtain current latency information for the given neighbour.
1715  *
1716  * @param peer
1717  * @return observed latency of the address, FOREVER if the address was
1718  *         never successfully validated
1719  */
1720 struct GNUNET_TIME_Relative
1721 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1722 {
1723   struct NeighbourMapEntry *n;
1724
1725   n = lookup_neighbour (peer);
1726   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1727     return GNUNET_TIME_UNIT_FOREVER_REL;
1728
1729   return n->latency;
1730 }
1731
1732 /**
1733  * Obtain current address information for the given neighbour.
1734  *
1735  * @param peer
1736  * @return address currently used
1737  */
1738 struct GNUNET_HELLO_Address *
1739 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1740 {
1741   struct NeighbourMapEntry *n;
1742
1743   n = lookup_neighbour (peer);
1744   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1745     return NULL;
1746
1747   return n->address;
1748 }
1749
1750
1751
1752 /**
1753  * Create an entry in the neighbour map for the given peer
1754  *
1755  * @param peer peer to create an entry for
1756  * @return new neighbour map entry
1757  */
1758 static struct NeighbourMapEntry *
1759 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1760 {
1761   struct NeighbourMapEntry *n;
1762
1763 #if DEBUG_TRANSPORT
1764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1766 #endif
1767   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1768   n->id = *peer;
1769   n->state = S_NOT_CONNECTED;
1770   n->latency = GNUNET_TIME_relative_get_forever ();
1771   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1772                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1773                                  MAX_BANDWIDTH_CARRY_S);
1774   n->timeout_task =
1775       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1776                                     &neighbour_timeout_task, n);
1777   GNUNET_assert (GNUNET_OK ==
1778                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1779                                                     &n->id.hashPubKey, n,
1780                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1781   return n;
1782 }
1783
1784
1785 /**
1786  * Try to create a connection to the given target (eventually).
1787  *
1788  * @param target peer to try to connect to
1789  */
1790 void
1791 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1792 {
1793   struct NeighbourMapEntry *n;
1794
1795   // This can happen during shutdown
1796   if (neighbours == NULL)
1797   {
1798     return;
1799   }
1800 #if DEBUG_TRANSPORT
1801   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1802               GNUNET_i2s (target));
1803 #endif
1804   if (0 ==
1805       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1806   {
1807     /* my own hello */
1808     return;
1809   }
1810   n = lookup_neighbour (target);
1811
1812   if (NULL != n)
1813   {
1814     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1815       return;                   /* already connecting or connected */
1816     if (is_disconnecting (n))
1817       change_state (n, S_NOT_CONNECTED);
1818   }
1819
1820
1821   if (n == NULL)
1822     n = setup_neighbour (target);
1823 #if DEBUG_TRANSPORT
1824   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1825               "Asking ATS for suggested address to connect to peer `%s'\n",
1826               GNUNET_i2s (&n->id));
1827 #endif
1828
1829   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1830 }
1831
1832 /**
1833  * Test if we're connected to the given peer.
1834  *
1835  * @param target peer to test
1836  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1837  */
1838 int
1839 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1840 {
1841   struct NeighbourMapEntry *n;
1842
1843   // This can happen during shutdown
1844   if (neighbours == NULL)
1845   {
1846     return GNUNET_NO;
1847   }
1848
1849   n = lookup_neighbour (target);
1850
1851   if ((NULL == n) || (S_CONNECTED != n->state))
1852     return GNUNET_NO;           /* not connected */
1853   return GNUNET_YES;
1854 }
1855
1856 /**
1857  * A session was terminated. Take note.
1858  *
1859  * @param peer identity of the peer where the session died
1860  * @param session session that is gone
1861  */
1862 void
1863 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1864                                    struct Session *session)
1865 {
1866   struct NeighbourMapEntry *n;
1867
1868   if (neighbours == NULL)
1869   {
1870     /* This can happen during shutdown */
1871     return;
1872   }
1873
1874 #if DEBUG_TRANSPORT
1875   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1876               session, GNUNET_i2s (peer));
1877 #endif
1878
1879   n = lookup_neighbour (peer);
1880   if (NULL == n)
1881     return;
1882   if (session != n->session)
1883     return;                     /* doesn't affect us */
1884   if (n->state == S_CONNECTED)
1885   {
1886     if (n->address_state == USED)
1887     {
1888       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1889       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1890       n->address_state = UNUSED;
1891
1892     }
1893   }
1894
1895   if (NULL != n->address)
1896   {
1897     GNUNET_HELLO_address_free (n->address);
1898     n->address = NULL;
1899   }
1900   n->session = NULL;
1901
1902   /* not connected anymore anyway, shouldn't matter */
1903   if (S_CONNECTED != n->state)
1904     return;
1905
1906   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1907   {
1908     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1909     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1910     n->expect_latency_response = GNUNET_NO;
1911   }
1912
1913   /* connected, try fast reconnect */
1914   /* statistics "transport" : "# peers connected" -= 1
1915    * neighbours_connected -= 1
1916    * BUT: no disconnect_cb to notify clients about disconnect
1917    */
1918
1919   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1920               GNUNET_i2s (peer));
1921
1922   GNUNET_assert (neighbours_connected > 0);
1923   change_state (n, S_FAST_RECONNECT);
1924   neighbours_connected--;
1925   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
1926                             GNUNET_NO);
1927
1928
1929   /* We are connected, so ask ATS to switch addresses */
1930   GNUNET_SCHEDULER_cancel (n->timeout_task);
1931   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1932                                     &neighbour_timeout_task, n);
1933   /* try QUICKLY to re-establish a connection, reduce timeout! */
1934   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1935     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1936   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1937                                     &ats_suggest_cancel,
1938                                     n);
1939   GNUNET_ATS_suggest_address (GST_ats, peer);
1940 }
1941
1942
1943 /**
1944  * Transmit a message to the given target using the active connection.
1945  *
1946  * @param target destination
1947  * @param msg message to send
1948  * @param msg_size number of bytes in msg
1949  * @param timeout when to fail with timeout
1950  * @param cont function to call when done
1951  * @param cont_cls closure for 'cont'
1952  */
1953 void
1954 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1955                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1956                      GST_NeighbourSendContinuation cont, void *cont_cls)
1957 {
1958   struct NeighbourMapEntry *n;
1959   struct MessageQueue *mq;
1960
1961   // This can happen during shutdown
1962   if (neighbours == NULL)
1963   {
1964     return;
1965   }
1966
1967   n = lookup_neighbour (target);
1968   if ((n == NULL) || (!is_connected (n)))
1969   {
1970     GNUNET_STATISTICS_update (GST_stats,
1971                               gettext_noop
1972                               ("# messages not sent (no such peer or not connected)"),
1973                               1, GNUNET_NO);
1974 #if DEBUG_TRANSPORT
1975     if (n == NULL)
1976       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1977                   "Could not send message to peer `%s': unknown neighbour",
1978                   GNUNET_i2s (target));
1979     else if (!is_connected (n))
1980       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981                   "Could not send message to peer `%s': not connected\n",
1982                   GNUNET_i2s (target));
1983 #endif
1984     if (NULL != cont)
1985       cont (cont_cls, GNUNET_SYSERR);
1986     return;
1987   }
1988
1989   if ((n->session == NULL) && (n->address == NULL))
1990   {
1991     GNUNET_STATISTICS_update (GST_stats,
1992                               gettext_noop
1993                               ("# messages not sent (no such peer or not connected)"),
1994                               1, GNUNET_NO);
1995 #if DEBUG_TRANSPORT
1996     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1997                 "Could not send message to peer `%s': no address available\n",
1998                 GNUNET_i2s (target));
1999 #endif
2000
2001     if (NULL != cont)
2002       cont (cont_cls, GNUNET_SYSERR);
2003     return;
2004   }
2005
2006   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
2007   GNUNET_STATISTICS_update (GST_stats,
2008                             gettext_noop
2009                             ("# bytes in message queue for other peers"),
2010                             msg_size, GNUNET_NO);
2011   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
2012   mq->cont = cont;
2013   mq->cont_cls = cont_cls;
2014   /* FIXME: this memcpy can be up to 7% of our total runtime! */
2015   memcpy (&mq[1], msg, msg_size);
2016   mq->message_buf = (const char *) &mq[1];
2017   mq->message_buf_size = msg_size;
2018   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
2019   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
2020
2021   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
2022       (NULL == n->is_active))
2023     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
2024 }
2025
2026
2027 /**
2028  * We have received a message from the given sender.  How long should
2029  * we delay before receiving more?  (Also used to keep the peer marked
2030  * as live).
2031  *
2032  * @param sender sender of the message
2033  * @param size size of the message
2034  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
2035  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
2036  *                   GNUNET_SYSERR if the connection is not fully up yet
2037  * @return how long to wait before reading more from this sender
2038  */
2039 struct GNUNET_TIME_Relative
2040 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
2041                                         *sender, ssize_t size, int *do_forward)
2042 {
2043   struct NeighbourMapEntry *n;
2044   struct GNUNET_TIME_Relative ret;
2045
2046   // This can happen during shutdown
2047   if (neighbours == NULL)
2048   {
2049     return GNUNET_TIME_UNIT_FOREVER_REL;
2050   }
2051
2052   n = lookup_neighbour (sender);
2053   if (n == NULL)
2054   {
2055     GST_neighbours_try_connect (sender);
2056     n = lookup_neighbour (sender);
2057     if (NULL == n)
2058     {
2059       GNUNET_STATISTICS_update (GST_stats,
2060                                 gettext_noop
2061                                 ("# messages discarded due to lack of neighbour record"),
2062                                 1, GNUNET_NO);
2063       *do_forward = GNUNET_NO;
2064       return GNUNET_TIME_UNIT_ZERO;
2065     }
2066   }
2067   if (!is_connected (n))
2068   {
2069     *do_forward = GNUNET_SYSERR;
2070     return GNUNET_TIME_UNIT_ZERO;
2071   }
2072   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
2073   {
2074     n->quota_violation_count++;
2075 #if DEBUG_TRANSPORT
2076     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2077                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
2078                 n->in_tracker.available_bytes_per_s__,
2079                 n->quota_violation_count);
2080 #endif
2081     /* Discount 32k per violation */
2082     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
2083   }
2084   else
2085   {
2086     if (n->quota_violation_count > 0)
2087     {
2088       /* try to add 32k back */
2089       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
2090       n->quota_violation_count--;
2091     }
2092   }
2093   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2094   {
2095     GNUNET_STATISTICS_update (GST_stats,
2096                               gettext_noop
2097                               ("# bandwidth quota violations by other peers"),
2098                               1, GNUNET_NO);
2099     *do_forward = GNUNET_NO;
2100     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
2101   }
2102   *do_forward = GNUNET_YES;
2103   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
2104   if (ret.rel_value > 0)
2105   {
2106 #if DEBUG_TRANSPORT
2107     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2108                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
2109                 (unsigned long long) n->in_tracker.
2110                 consumption_since_last_update__,
2111                 (unsigned int) n->in_tracker.available_bytes_per_s__,
2112                 (unsigned long long) ret.rel_value);
2113 #endif
2114     GNUNET_STATISTICS_update (GST_stats,
2115                               gettext_noop ("# ms throttling suggested"),
2116                               (int64_t) ret.rel_value, GNUNET_NO);
2117   }
2118   return ret;
2119 }
2120
2121
2122 /**
2123  * Keep the connection to the given neighbour alive longer,
2124  * we received a KEEPALIVE (or equivalent).
2125  *
2126  * @param neighbour neighbour to keep alive
2127  */
2128 void
2129 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
2130 {
2131   struct NeighbourMapEntry *n;
2132
2133   // This can happen during shutdown
2134   if (neighbours == NULL)
2135   {
2136     return;
2137   }
2138
2139   n = lookup_neighbour (neighbour);
2140   if (NULL == n)
2141   {
2142     GNUNET_STATISTICS_update (GST_stats,
2143                               gettext_noop
2144                               ("# KEEPALIVE messages discarded (not connected)"),
2145                               1, GNUNET_NO);
2146     return;
2147   }
2148   GNUNET_SCHEDULER_cancel (n->timeout_task);
2149   n->timeout_task =
2150       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2151                                     &neighbour_timeout_task, n);
2152
2153   /* send reply to measure latency */
2154   if (S_CONNECTED != n->state)
2155     return;
2156
2157   struct GNUNET_MessageHeader m;
2158
2159   m.size = htons (sizeof (struct GNUNET_MessageHeader));
2160   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
2161
2162   send_with_session(n,
2163       (const void *) &m, sizeof (m),
2164       UINT32_MAX,
2165       GNUNET_TIME_UNIT_FOREVER_REL,
2166       NULL, NULL);
2167 }
2168
2169 /**
2170  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
2171  * to this peer
2172  *
2173  * @param neighbour neighbour to keep alive
2174  * @param ats performance data
2175  * @param ats_count number of entries in ats
2176  */
2177 void
2178 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
2179                                    const struct GNUNET_ATS_Information *ats,
2180                                    uint32_t ats_count)
2181 {
2182   struct NeighbourMapEntry *n;
2183   struct GNUNET_ATS_Information *ats_new;
2184   uint32_t latency;
2185
2186   if (neighbours == NULL)
2187   {
2188     // This can happen during shutdown
2189     return;
2190   }
2191
2192   n = lookup_neighbour (neighbour);
2193   if ((NULL == n) || (n->state != S_CONNECTED))
2194   {
2195     GNUNET_STATISTICS_update (GST_stats,
2196                               gettext_noop
2197                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2198                               1, GNUNET_NO);
2199     return;
2200   }
2201   if (n->expect_latency_response != GNUNET_YES)
2202   {
2203     GNUNET_STATISTICS_update (GST_stats,
2204                               gettext_noop
2205                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2206                               1, GNUNET_NO);
2207     return;
2208   }
2209   n->expect_latency_response = GNUNET_NO;
2210
2211   GNUNET_assert (n->keep_alive_sent.abs_value !=
2212                  GNUNET_TIME_absolute_get_zero ().abs_value);
2213   n->latency =
2214       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2215                                            GNUNET_TIME_absolute_get ());
2216 #if DEBUG_TRANSPORT
2217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2218               GNUNET_i2s (&n->id), n->latency.rel_value);
2219 #endif
2220
2221
2222   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2223   {
2224     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2225   }
2226   else
2227   {
2228     ats_new =
2229         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2230                        (ats_count + 1));
2231     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2232
2233     /* add latency */
2234     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2235     if (n->latency.rel_value > UINT32_MAX)
2236       latency = UINT32_MAX;
2237     else
2238       latency = n->latency.rel_value;
2239     ats_new[ats_count].value = htonl (latency);
2240
2241     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2242                                ats_count + 1);
2243     GNUNET_free (ats_new);
2244   }
2245 }
2246
2247
2248 /**
2249  * Change the incoming quota for the given peer.
2250  *
2251  * @param neighbour identity of peer to change qutoa for
2252  * @param quota new quota
2253  */
2254 void
2255 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2256                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2257 {
2258   struct NeighbourMapEntry *n;
2259
2260   // This can happen during shutdown
2261   if (neighbours == NULL)
2262   {
2263     return;
2264   }
2265
2266   n = lookup_neighbour (neighbour);
2267   if (n == NULL)
2268   {
2269     GNUNET_STATISTICS_update (GST_stats,
2270                               gettext_noop
2271                               ("# SET QUOTA messages ignored (no such peer)"),
2272                               1, GNUNET_NO);
2273     return;
2274   }
2275 #if DEBUG_TRANSPORT
2276   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2277               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2278               ntohl (quota.value__), GNUNET_i2s (&n->id));
2279 #endif
2280   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2281   if (0 != ntohl (quota.value__))
2282     return;
2283 #if DEBUG_TRANSPORT
2284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2285               GNUNET_i2s (&n->id), "SET_QUOTA");
2286 #endif
2287   if (is_connected (n))
2288     GNUNET_STATISTICS_update (GST_stats,
2289                               gettext_noop ("# disconnects due to quota of 0"),
2290                               1, GNUNET_NO);
2291   disconnect_neighbour (n);
2292 }
2293
2294
2295 /**
2296  * Closure for the neighbours_iterate function.
2297  */
2298 struct IteratorContext
2299 {
2300   /**
2301    * Function to call on each connected neighbour.
2302    */
2303   GST_NeighbourIterator cb;
2304
2305   /**
2306    * Closure for 'cb'.
2307    */
2308   void *cb_cls;
2309 };
2310
2311
2312 /**
2313  * Call the callback from the closure for each connected neighbour.
2314  *
2315  * @param cls the 'struct IteratorContext'
2316  * @param key the hash of the public key of the neighbour
2317  * @param value the 'struct NeighbourMapEntry'
2318  * @return GNUNET_OK (continue to iterate)
2319  */
2320 static int
2321 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2322 {
2323   struct IteratorContext *ic = cls;
2324   struct NeighbourMapEntry *n = value;
2325
2326   if (!is_connected (n))
2327     return GNUNET_OK;
2328
2329   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2330   return GNUNET_OK;
2331 }
2332
2333
2334 /**
2335  * Iterate over all connected neighbours.
2336  *
2337  * @param cb function to call
2338  * @param cb_cls closure for cb
2339  */
2340 void
2341 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2342 {
2343   struct IteratorContext ic;
2344
2345   // This can happen during shutdown
2346   if (neighbours == NULL)
2347   {
2348     return;
2349   }
2350
2351   ic.cb = cb;
2352   ic.cb_cls = cb_cls;
2353   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2354 }
2355
2356 /**
2357  * If we have an active connection to the given target, it must be shutdown.
2358  *
2359  * @param target peer to disconnect from
2360  */
2361 void
2362 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2363 {
2364   struct NeighbourMapEntry *n;
2365
2366   // This can happen during shutdown
2367   if (neighbours == NULL)
2368   {
2369     return;
2370   }
2371
2372   n = lookup_neighbour (target);
2373   if (NULL == n)
2374     return;                     /* not active */
2375   disconnect_neighbour (n);
2376 }
2377
2378
2379 /**
2380  * We received a disconnect message from the given peer,
2381  * validate and process.
2382  *
2383  * @param peer sender of the message
2384  * @param msg the disconnect message
2385  */
2386 void
2387 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2388                                           *peer,
2389                                           const struct GNUNET_MessageHeader
2390                                           *msg)
2391 {
2392   struct NeighbourMapEntry *n;
2393   const struct SessionDisconnectMessage *sdm;
2394   GNUNET_HashCode hc;
2395
2396 #if DEBUG_TRANSPORT
2397   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2398               "Received DISCONNECT message from peer `%s'\n",
2399               GNUNET_i2s (peer));
2400 #endif
2401
2402   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2403   {
2404     // GNUNET_break_op (0);
2405     GNUNET_STATISTICS_update (GST_stats,
2406                               gettext_noop
2407                               ("# disconnect messages ignored (old format)"), 1,
2408                               GNUNET_NO);
2409     return;
2410   }
2411   sdm = (const struct SessionDisconnectMessage *) msg;
2412   n = lookup_neighbour (peer);
2413   if (NULL == n)
2414     return;                     /* gone already */
2415   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2416       n->connect_ts.abs_value)
2417   {
2418     GNUNET_STATISTICS_update (GST_stats,
2419                               gettext_noop
2420                               ("# disconnect messages ignored (timestamp)"), 1,
2421                               GNUNET_NO);
2422     return;
2423   }
2424   GNUNET_CRYPTO_hash (&sdm->public_key,
2425                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2426                       &hc);
2427   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2428   {
2429     GNUNET_break_op (0);
2430     return;
2431   }
2432   if (ntohl (sdm->purpose.size) !=
2433       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2434       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2435       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2436   {
2437     GNUNET_break_op (0);
2438     return;
2439   }
2440   if (GNUNET_OK !=
2441       GNUNET_CRYPTO_rsa_verify
2442       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2443        &sdm->signature, &sdm->public_key))
2444   {
2445     GNUNET_break_op (0);
2446     return;
2447   }
2448   GST_neighbours_force_disconnect (peer);
2449 }
2450
2451
2452 /**
2453  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2454  * Consider switching to it.
2455  *
2456  * @param message possibly a 'struct SessionConnectMessage' (check format)
2457  * @param peer identity of the peer to switch the address for
2458  * @param address address of the other peer, NULL if other peer
2459  *                       connected to us
2460  * @param session session to use (or NULL)
2461  * @param ats performance data
2462  * @param ats_count number of entries in ats
2463  */
2464 void
2465 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2466                                    const struct GNUNET_PeerIdentity *peer,
2467                                    const struct GNUNET_HELLO_Address *address,
2468                                    struct Session *session,
2469                                    const struct GNUNET_ATS_Information *ats,
2470                                    uint32_t ats_count)
2471 {
2472   const struct SessionConnectMessage *scm;
2473   struct GNUNET_MessageHeader msg;
2474   struct NeighbourMapEntry *n;
2475   size_t msg_len;
2476   size_t ret;
2477
2478   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2479               "Received CONNECT_ACK message from peer `%s'\n",
2480               GNUNET_i2s (peer));
2481
2482   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2483   {
2484     GNUNET_break_op (0);
2485     return;
2486   }
2487   scm = (const struct SessionConnectMessage *) message;
2488   GNUNET_break_op (ntohl (scm->reserved) == 0);
2489   n = lookup_neighbour (peer);
2490   if (NULL == n)
2491   {
2492     /* we did not send 'CONNECT' -- at least not recently */
2493     GNUNET_STATISTICS_update (GST_stats,
2494                               gettext_noop
2495                               ("# unexpected CONNECT_ACK messages (no peer)"),
2496                               1, GNUNET_NO);
2497     return;
2498   }
2499
2500   /* Additional check
2501    *
2502    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2503    *
2504    * We also received an CONNECT message, switched from SENDT to RECV and
2505    * ATS already suggested us an address after a successful blacklist check
2506    */
2507
2508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2509               "Received CONNECT_ACK message from peer `%s' in state `%s'\n",
2510               GNUNET_i2s (peer),
2511               print_state(n->state));
2512
2513   if ((n->address != NULL) && (n->state == S_CONNECTED))
2514   {
2515     /* After fast reconnect: send ACK (ACK) even when we are connected */
2516     msg_len = sizeof (msg);
2517     msg.size = htons (msg_len);
2518     msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2519
2520     ret = send_with_session(n,
2521               (const char *) &msg, msg_len,
2522               UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2523               NULL, NULL);
2524
2525     if (ret == GNUNET_SYSERR)
2526       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2527                   "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2528                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2529     return;
2530   }
2531
2532   if ((n->state != S_CONNECT_SENT) &&
2533       ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2534   {
2535     GNUNET_STATISTICS_update (GST_stats,
2536                               gettext_noop
2537                               ("# unexpected CONNECT_ACK messages"), 1,
2538                               GNUNET_NO);
2539     return;
2540   }
2541   if (n->state != S_CONNECTED)
2542     change_state (n, S_CONNECTED);
2543
2544   if (NULL != session)
2545   {
2546     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2547                      "transport-ats",
2548                      "Giving ATS session %p of plugin %s for peer %s\n",
2549                      session, address->transport_name, GNUNET_i2s (peer));
2550   }
2551   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2552   GNUNET_assert (NULL != n->address);
2553
2554   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2555   {
2556     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2557     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2558     n->address_state = USED;
2559   }
2560
2561   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2562
2563   /* send ACK (ACK) */
2564   msg_len = sizeof (msg);
2565   msg.size = htons (msg_len);
2566   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2567
2568   ret = send_with_session(n,
2569             (const char *) &msg, msg_len,
2570             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2571             NULL, NULL);
2572
2573   if (ret == GNUNET_SYSERR)
2574     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2575                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2576                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2577
2578
2579   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2580     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2581
2582   neighbours_connected++;
2583   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2584                             GNUNET_NO);
2585 #if DEBUG_TRANSPORT
2586   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2587               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2588               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2589               __LINE__);
2590 #endif
2591   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2592   send_outbound_quota (peer, n->bandwidth_out);
2593
2594 }
2595
2596
2597 void
2598 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2599                            const struct GNUNET_PeerIdentity *peer,
2600                            const struct GNUNET_HELLO_Address *address,
2601                            struct Session *session,
2602                            const struct GNUNET_ATS_Information *ats,
2603                            uint32_t ats_count)
2604 {
2605   struct NeighbourMapEntry *n;
2606
2607 #if DEBUG_TRANSPORT
2608   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2609               GNUNET_i2s (peer));
2610 #endif
2611
2612   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2613   {
2614     GNUNET_break_op (0);
2615     return;
2616   }
2617   n = lookup_neighbour (peer);
2618   if (NULL == n)
2619   {
2620     GNUNET_break (0);
2621     return;
2622   }
2623   if (S_CONNECTED == n->state)
2624     return;
2625   if (!is_connecting (n))
2626   {
2627     GNUNET_STATISTICS_update (GST_stats,
2628                               gettext_noop ("# unexpected ACK messages"), 1,
2629                               GNUNET_NO);
2630     return;
2631   }
2632   change_state (n, S_CONNECTED);
2633   if (NULL != session)
2634     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2635                      "transport-ats",
2636                      "Giving ATS session %p of plugin %s for peer %s\n",
2637                      session, address->transport_name, GNUNET_i2s (peer));
2638   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2639   GNUNET_assert (n->address != NULL);
2640
2641   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2642   {
2643     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2644     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2645     n->address_state = USED;
2646   }
2647
2648
2649   neighbours_connected++;
2650   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2651                             GNUNET_NO);
2652
2653   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2654   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2655     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2656 #if DEBUG_TRANSPORT
2657   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2658               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2659               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2660               __LINE__);
2661 #endif
2662   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2663   send_outbound_quota (peer, n->bandwidth_out);
2664 }
2665
2666 struct BlackListCheckContext
2667 {
2668   struct GNUNET_ATS_Information *ats;
2669
2670   uint32_t ats_count;
2671
2672   struct Session *session;
2673
2674   struct GNUNET_HELLO_Address *address;
2675
2676   struct GNUNET_TIME_Absolute ts;
2677 };
2678
2679
2680 static void
2681 handle_connect_blacklist_cont (void *cls,
2682                                const struct GNUNET_PeerIdentity *peer,
2683                                int result)
2684 {
2685   struct NeighbourMapEntry *n;
2686   struct BlackListCheckContext *bcc = cls;
2687
2688 #if DEBUG_TRANSPORT
2689   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2690               "Blacklist check due to CONNECT message: `%s'\n",
2691               GNUNET_i2s (peer),
2692               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2693 #endif
2694
2695   /* not allowed */
2696   if (GNUNET_OK != result)
2697   {
2698     GNUNET_HELLO_address_free (bcc->address);
2699     GNUNET_free (bcc);
2700     return;
2701   }
2702
2703   n = lookup_neighbour (peer);
2704   if (NULL == n)
2705     n = setup_neighbour (peer);
2706
2707   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2708   {
2709     if (NULL != bcc->session)
2710       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2711                        "transport-ats",
2712                        "Giving ATS session %p of address `%s' for peer %s\n",
2713                        bcc->session, GST_plugins_a2s (bcc->address),
2714                        GNUNET_i2s (peer));
2715     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2716
2717     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2718                                bcc->ats_count);
2719     n->connect_ts = bcc->ts;
2720   }
2721
2722   GNUNET_HELLO_address_free (bcc->address);
2723   GNUNET_free (bcc);
2724
2725   if (n->state != S_CONNECT_RECV)
2726     change_state (n, S_CONNECT_RECV);
2727
2728
2729   /* Ask ATS for an address to connect via that address */
2730   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2731     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2732   n->ats_suggest =
2733       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2734                                     n);
2735   GNUNET_ATS_suggest_address (GST_ats, peer);
2736 }
2737
2738 /**
2739  * We received a 'SESSION_CONNECT' message from the other peer.
2740  * Consider switching to it.
2741  *
2742  * @param message possibly a 'struct SessionConnectMessage' (check format)
2743  * @param peer identity of the peer to switch the address for
2744  * @param address address of the other peer, NULL if other peer
2745  *                       connected to us
2746  * @param session session to use (or NULL)
2747  * @param ats performance data
2748  * @param ats_count number of entries in ats (excluding 0-termination)
2749  */
2750 void
2751 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2752                                const struct GNUNET_PeerIdentity *peer,
2753                                const struct GNUNET_HELLO_Address *address,
2754                                struct Session *session,
2755                                const struct GNUNET_ATS_Information *ats,
2756                                uint32_t ats_count)
2757 {
2758   const struct SessionConnectMessage *scm;
2759   struct BlackListCheckContext *bcc = NULL;
2760   struct NeighbourMapEntry *n;
2761
2762 #if DEBUG_TRANSPORT
2763   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2764               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2765 #endif
2766
2767   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2768   {
2769     GNUNET_break_op (0);
2770     return;
2771   }
2772
2773   scm = (const struct SessionConnectMessage *) message;
2774   GNUNET_break_op (ntohl (scm->reserved) == 0);
2775
2776   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2777
2778   n = lookup_neighbour (peer);
2779   if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n->state)))
2780   {
2781     /* connected peer switches addresses or is trying to do a fast reconnect*/
2782     return;
2783   }
2784
2785
2786   /* we are not connected to this peer */
2787   /* do blacklist check */
2788   bcc =
2789       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2790                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2791   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2792   bcc->ats_count = ats_count + 1;
2793   bcc->address = GNUNET_HELLO_address_copy (address);
2794   bcc->session = session;
2795   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2796   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2797   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2798   bcc->ats[ats_count].value =
2799       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2800   GST_blacklist_test_allowed (peer, address->transport_name,
2801                               &handle_connect_blacklist_cont, bcc);
2802 }
2803
2804
2805 /* end of file gnunet-service-transport_neighbours.c */