- stats
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
60
61 #define FAST_RECONNECT_RATE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100)
62
63 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
64
65 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
66
67 /**
68  * Entry in neighbours.
69  */
70 struct NeighbourMapEntry;
71
72 GNUNET_NETWORK_STRUCT_BEGIN
73
74 /**
75  * Message a peer sends to another to indicate its
76  * preference for communicating via a particular
77  * session (and the desire to establish a real
78  * connection).
79  */
80 struct SessionConnectMessage
81 {
82   /**
83    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
84    */
85   struct GNUNET_MessageHeader header;
86
87   /**
88    * Always zero.
89    */
90   uint32_t reserved GNUNET_PACKED;
91
92   /**
93    * Absolute time at the sender.  Only the most recent connect
94    * message implies which session is preferred by the sender.
95    */
96   struct GNUNET_TIME_AbsoluteNBO timestamp;
97
98 };
99
100
101 struct SessionDisconnectMessage
102 {
103   /**
104    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
105    */
106   struct GNUNET_MessageHeader header;
107
108   /**
109    * Always zero.
110    */
111   uint32_t reserved GNUNET_PACKED;
112
113   /**
114    * Purpose of the signature.  Extends over the timestamp.
115    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
116    */
117   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
118
119   /**
120    * Absolute time at the sender.  Only the most recent connect
121    * message implies which session is preferred by the sender.
122    */
123   struct GNUNET_TIME_AbsoluteNBO timestamp;
124
125   /**
126    * Public key of the sender.
127    */
128   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
129
130   /**
131    * Signature of the peer that sends us the disconnect.  Only
132    * valid if the timestamp is AFTER the timestamp from the
133    * corresponding 'CONNECT' message.
134    */
135   struct GNUNET_CRYPTO_RsaSignature signature;
136
137 };
138 GNUNET_NETWORK_STRUCT_END
139
140 /**
141  * For each neighbour we keep a list of messages
142  * that we still want to transmit to the neighbour.
143  */
144 struct MessageQueue
145 {
146
147   /**
148    * This is a doubly linked list.
149    */
150   struct MessageQueue *next;
151
152   /**
153    * This is a doubly linked list.
154    */
155   struct MessageQueue *prev;
156
157   /**
158    * Once this message is actively being transmitted, which
159    * neighbour is it associated with?
160    */
161   struct NeighbourMapEntry *n;
162
163   /**
164    * Function to call once we're done.
165    */
166   GST_NeighbourSendContinuation cont;
167
168   /**
169    * Closure for 'cont'
170    */
171   void *cont_cls;
172
173   /**
174    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
175    * stuck together in memory.  Allocated at the end of this struct.
176    */
177   const char *message_buf;
178
179   /**
180    * Size of the message buf
181    */
182   size_t message_buf_size;
183
184   /**
185    * At what time should we fail?
186    */
187   struct GNUNET_TIME_Absolute timeout;
188
189 };
190
191
192 enum State
193 {
194   /**
195    * fresh peer or completely disconnected
196    */
197   S_NOT_CONNECTED,
198
199   /**
200    * sent CONNECT message to other peer, waiting for CONNECT_ACK
201    */
202   S_CONNECT_SENT,
203
204   /**
205    * received CONNECT message to other peer, sending CONNECT_ACK
206    */
207   S_CONNECT_RECV,
208
209   /**
210    * received ACK or payload
211    */
212   S_CONNECTED,
213
214   /**
215    * connection ended, fast reconnect
216    */
217   S_FAST_RECONNECT,
218
219   /**
220    * Disconnect in progress
221    */
222   S_DISCONNECT
223 };
224
225 enum Address_State
226 {
227   USED,
228   UNUSED,
229   FRESH,
230 };
231
232
233 /**
234  * Entry in neighbours.
235  */
236 struct NeighbourMapEntry
237 {
238
239   /**
240    * Head of list of messages we would like to send to this peer;
241    * must contain at most one message per client.
242    */
243   struct MessageQueue *messages_head;
244
245   /**
246    * Tail of list of messages we would like to send to this peer; must
247    * contain at most one message per client.
248    */
249   struct MessageQueue *messages_tail;
250
251   /**
252    * Are we currently trying to send a message? If so, which one?
253    */
254   struct MessageQueue *is_active;
255
256   /**
257    * Active session for communicating with the peer.
258    */
259   struct Session *session;
260
261   /**
262    * Address we currently use.
263    */
264   struct GNUNET_HELLO_Address *address;
265
266   /**
267    * Address we currently use.
268    */
269   struct GNUNET_HELLO_Address *fast_reconnect_address;
270
271
272   /**
273    * Identity of this neighbour.
274    */
275   struct GNUNET_PeerIdentity id;
276
277   /**
278    * ID of task scheduled to run when this peer is about to
279    * time out (will free resources associated with the peer).
280    */
281   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
282
283   /**
284    * ID of task scheduled to send keepalives.
285    */
286   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
287
288   /**
289    * ID of task scheduled to run when we should try transmitting
290    * the head of the message queue.
291    */
292   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
293
294   /**
295    * Tracker for inbound bandwidth.
296    */
297   struct GNUNET_BANDWIDTH_Tracker in_tracker;
298
299   /**
300    * Inbound bandwidth from ATS, activated when connection is up
301    */
302   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
303
304   /**
305    * Inbound bandwidth from ATS, activated when connection is up
306    */
307   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
308
309   /**
310    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
311    */
312   struct GNUNET_TIME_Absolute connect_ts;
313
314   /**
315    * When did we sent the last keep-alive message?
316    */
317   struct GNUNET_TIME_Absolute keep_alive_sent;
318
319   /**
320    * Latest calculated latency value
321    */
322   struct GNUNET_TIME_Relative latency;
323
324   /**
325    * Timeout for ATS
326    * We asked ATS for a new address for this peer
327    */
328   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
329
330   /**
331    * Delay for ATS request
332    */
333   GNUNET_SCHEDULER_TaskIdentifier ats_request;
334
335   /**
336    * Task the resets the peer state after due to an pending
337    * unsuccessful connection setup
338    */
339   GNUNET_SCHEDULER_TaskIdentifier state_reset;
340
341
342   /**
343    * How often has the other peer (recently) violated the inbound
344    * traffic limit?  Incremented by 10 per violation, decremented by 1
345    * per non-violation (for each time interval).
346    */
347   unsigned int quota_violation_count;
348
349
350   /**
351    * The current state of the peer
352    * Element of enum State
353    */
354   int state;
355
356   /**
357    * Did we sent an KEEP_ALIVE message and are we expecting a response?
358    */
359   int expect_latency_response;
360
361   /**
362    * Was the address used successfully
363    */
364   int address_state;
365
366   /**
367    * Fast reconnect attempts for identical address
368    */
369   unsigned int fast_reconnect_attempts;
370 };
371
372
373 /**
374  * All known neighbours and their HELLOs.
375  */
376 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
377
378 /**
379  * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb
380  */
381 static void *callback_cls;
382
383 /**
384  * Function to call when we connected to a neighbour.
385  */
386 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
387
388 /**
389  * Function to call when we disconnected from a neighbour.
390  */
391 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
392
393 /**
394  * Function to call when we changed an active address of a neighbour.
395  */
396 static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb;
397
398 /**
399  * counter for connected neighbours
400  */
401 static int neighbours_connected;
402
403 static unsigned int bytes_in_send_queue;
404 static unsigned int bytes_received;
405
406 /**
407  * Lookup a neighbour entry in the neighbours hash map.
408  *
409  * @param pid identity of the peer to look up
410  * @return the entry, NULL if there is no existing record
411  */
412 static struct NeighbourMapEntry *
413 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
414 {
415   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
416 }
417
418 /**
419  * Disconnect from the given neighbour, clean up the record.
420  *
421  * @param n neighbour to disconnect from
422  */
423 static void
424 disconnect_neighbour (struct NeighbourMapEntry *n);
425
426 #define change_state(n, state, ...) change (n, state, __LINE__)
427
428 static int
429 is_connecting (struct NeighbourMapEntry *n)
430 {
431   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
432     return GNUNET_YES;
433   return GNUNET_NO;
434 }
435
436 static int
437 is_connected (struct NeighbourMapEntry *n)
438 {
439   if (n->state == S_CONNECTED)
440     return GNUNET_YES;
441   return GNUNET_NO;
442 }
443
444 static int
445 is_disconnecting (struct NeighbourMapEntry *n)
446 {
447   if (n->state == S_DISCONNECT)
448     return GNUNET_YES;
449   return GNUNET_NO;
450 }
451
452 static const char *
453 print_state (int state)
454 {
455   switch (state)
456   {
457   case S_CONNECTED:
458     return "S_CONNECTED";
459     break;
460   case S_CONNECT_RECV:
461     return "S_CONNECT_RECV";
462     break;
463   case S_CONNECT_SENT:
464     return "S_CONNECT_SENT";
465     break;
466   case S_DISCONNECT:
467     return "S_DISCONNECT";
468     break;
469   case S_NOT_CONNECTED:
470     return "S_NOT_CONNECTED";
471     break;
472   case S_FAST_RECONNECT:
473     return "S_FAST_RECONNECT";
474     break;
475   default:
476     GNUNET_break (0);
477     break;
478   }
479   return NULL;
480 }
481
482 static int
483 change (struct NeighbourMapEntry *n, int state, int line);
484
485 static void
486 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
487
488
489 static void
490 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
491 {
492   struct NeighbourMapEntry *n = cls;
493
494   if (n == NULL)
495     return;
496
497   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
498   if (n->state == S_CONNECTED)
499     return;
500   GNUNET_STATISTICS_update (GST_stats,
501                             gettext_noop
502                             ("# failed connection attempts due to timeout"), 1,
503                             GNUNET_NO);
504   /* resetting state */
505
506   if (n->state == S_FAST_RECONNECT)
507   {
508     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
509                 "Fast reconnect time out, disconnecting peer `%s'\n",
510                 GNUNET_i2s (&n->id));
511     disconnect_neighbour(n);
512     return;
513   }
514
515   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
516               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
517               GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECTED", __LINE__);
518
519   n->state = S_NOT_CONNECTED;
520
521   /* destroying address */
522   if (n->address != NULL)
523   {
524     GNUNET_assert (strlen (n->address->transport_name) > 0);
525     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
526   }
527
528   /* request new address */
529   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
530     GNUNET_SCHEDULER_cancel (n->ats_suggest);
531   n->ats_suggest =
532       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
533                                     n);
534   GNUNET_ATS_suggest_address (GST_ats, &n->id);
535 }
536
537 static int
538 change (struct NeighbourMapEntry *n, int state, int line)
539 {
540   int previous_state;
541   /* allowed transitions */
542   int allowed = GNUNET_NO;
543
544   previous_state = n->state;
545
546   switch (n->state)
547   {
548   case S_NOT_CONNECTED:
549     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
550         (state == S_DISCONNECT))
551       allowed = GNUNET_YES;
552     break;
553   case S_CONNECT_RECV:
554     allowed = GNUNET_YES;
555     break;
556   case S_CONNECT_SENT:
557     allowed = GNUNET_YES;
558     break;
559   case S_CONNECTED:
560     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
561       allowed = GNUNET_YES;
562     break;
563   case S_DISCONNECT:
564     break;
565   case S_FAST_RECONNECT:
566     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
567       allowed = GNUNET_YES;
568     break;
569   default:
570     GNUNET_break (0);
571     break;
572   }
573   if (allowed == GNUNET_NO)
574   {
575     char *old = GNUNET_strdup (print_state (n->state));
576     char *new = GNUNET_strdup (print_state (state));
577
578     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
579                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
580                 new, line);
581     GNUNET_break (0);
582     GNUNET_free (old);
583     GNUNET_free (new);
584     return GNUNET_SYSERR;
585   }
586   {
587     char *old = GNUNET_strdup (print_state (n->state));
588     char *new = GNUNET_strdup (print_state (state));
589
590     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
591                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
592                 GNUNET_i2s (&n->id), n, old, new, line);
593     GNUNET_free (old);
594     GNUNET_free (new);
595   }
596   n->state = state;
597
598   switch (n->state)
599   {
600   case S_FAST_RECONNECT:
601   case S_CONNECT_RECV:
602   case S_CONNECT_SENT:
603     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
604       GNUNET_SCHEDULER_cancel (n->state_reset);
605     n->state_reset =
606         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
607     break;
608   case S_CONNECTED:
609   case S_NOT_CONNECTED:
610   case S_DISCONNECT:
611     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
612     {
613 #if DEBUG_TRANSPORT
614       char *old = GNUNET_strdup (print_state (n->state));
615       char *new = GNUNET_strdup (print_state (state));
616
617       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
618                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
619                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
620       GNUNET_free (old);
621       GNUNET_free (new);
622 #endif
623       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
624       GNUNET_SCHEDULER_cancel (n->state_reset);
625       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
626     }
627     break;
628
629   default:
630     GNUNET_assert (0);
631   }
632
633   if (NULL != address_change_cb)
634   {
635     if (n->state == S_CONNECTED)
636       address_change_cb (callback_cls, &n->id, n->address);
637     else if (previous_state == S_CONNECTED)
638       address_change_cb (callback_cls, &n->id, NULL);
639   }
640
641   return GNUNET_OK;
642 }
643
644 static ssize_t
645 send_with_session (struct NeighbourMapEntry *n,
646                    const char *msgbuf, size_t msgbuf_size,
647                    uint32_t priority,
648                    struct GNUNET_TIME_Relative timeout,
649                    GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
650 {
651   struct GNUNET_TRANSPORT_PluginFunctions *papi;
652   size_t ret = GNUNET_SYSERR;
653
654   GNUNET_assert (n != NULL);
655   GNUNET_assert (n->session != NULL);
656
657   papi = GST_plugins_find (n->address->transport_name);
658   if (papi == NULL)
659   {
660     if (cont != NULL)
661       cont (cont_cls, &n->id, GNUNET_SYSERR);
662     return GNUNET_SYSERR;
663   }
664
665   ret = papi->send (papi->cls,
666                    n->session,
667                    msgbuf, msgbuf_size,
668                    0,
669                    timeout,
670                    cont, cont_cls);
671
672   if ((ret == -1) && (cont != NULL))
673       cont (cont_cls, &n->id, GNUNET_SYSERR);
674   return ret;
675 }
676
677 /**
678  * Task invoked to start a transmission to another peer.
679  *
680  * @param cls the 'struct NeighbourMapEntry'
681  * @param tc scheduler context
682  */
683 static void
684 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
685
686
687 /**
688  * We're done with our transmission attempt, continue processing.
689  *
690  * @param cls the 'struct MessageQueue' of the message
691  * @param receiver intended receiver
692  * @param success whether it worked or not
693  */
694 static void
695 transmit_send_continuation (void *cls,
696                             const struct GNUNET_PeerIdentity *receiver,
697                             int success)
698 {
699   struct MessageQueue *mq = cls;
700   struct NeighbourMapEntry *n;
701   struct NeighbourMapEntry *tmp;
702
703   tmp = lookup_neighbour (receiver);
704   n = mq->n;
705   if ((NULL != n) && (tmp != NULL) && (tmp == n))
706   {
707     GNUNET_assert (n->is_active == mq);
708     n->is_active = NULL;
709     if (success == GNUNET_YES)
710     {
711       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
712       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
713     }
714   }
715
716   GNUNET_assert (bytes_in_send_queue >= mq->message_buf_size);
717   bytes_in_send_queue -= mq->message_buf_size;
718   GNUNET_STATISTICS_set (GST_stats,
719                         gettext_noop
720                         ("# bytes in message queue for other peers"),
721                         bytes_in_send_queue, GNUNET_NO);
722
723   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
724               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
725               (success == GNUNET_OK) ? "successful" : "FAILED");
726   if (NULL != mq->cont)
727     mq->cont (mq->cont_cls, success);
728   GNUNET_free (mq);
729 }
730
731
732 /**
733  * Check the ready list for the given neighbour and if a plugin is
734  * ready for transmission (and if we have a message), do so!
735  *
736  * @param n target peer for which to transmit
737  */
738 static void
739 try_transmission_to_peer (struct NeighbourMapEntry *n)
740 {
741   struct MessageQueue *mq;
742   struct GNUNET_TIME_Relative timeout;
743   ssize_t ret;
744
745   if (n->is_active != NULL)
746   {
747     GNUNET_break (0);
748     return;                     /* transmission already pending */
749   }
750   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
751   {
752     GNUNET_break (0);
753     return;                     /* currently waiting for bandwidth */
754   }
755   while (NULL != (mq = n->messages_head))
756   {
757     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
758     if (timeout.rel_value > 0)
759       break;
760     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
761     n->is_active = mq;
762     mq->n = n;
763     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
764   }
765   if (NULL == mq)
766     return;                     /* no more messages */
767
768   if (n->address == NULL)
769   {
770     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
771                 GNUNET_i2s (&n->id));
772     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
773     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
774     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
775     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
776     return;
777   }
778
779   if (GST_plugins_find (n->address->transport_name) == NULL)
780   {
781     GNUNET_break (0);
782     return;
783   }
784   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
785   n->is_active = mq;
786   mq->n = n;
787
788   if ((n->address->address_length == 0) && (n->session == NULL))
789   {
790     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
791                 GNUNET_i2s (&n->id));
792     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
793     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
794     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
795     return;
796   }
797
798   ret = send_with_session(n,
799               mq->message_buf, mq->message_buf_size,
800               0, timeout,
801               &transmit_send_continuation, mq);
802
803   if (ret == -1)
804   {
805     /* failure, but 'send' would not call continuation in this case,
806      * so we need to do it here! */
807     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
808   }
809
810 }
811
812
813 /**
814  * Task invoked to start a transmission to another peer.
815  *
816  * @param cls the 'struct NeighbourMapEntry'
817  * @param tc scheduler context
818  */
819 static void
820 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
821 {
822   struct NeighbourMapEntry *n = cls;
823
824   GNUNET_assert (NULL != lookup_neighbour (&n->id));
825   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
826   try_transmission_to_peer (n);
827 }
828
829
830 /**
831  * Initialize the neighbours subsystem.
832  *
833  * @param cls closure for callbacks
834  * @param connect_cb function to call if we connect to a peer
835  * @param disconnect_cb function to call if we disconnect from a peer
836  * @param peer_address_cb function to call if we change an active address
837  *                   of a neighbour
838  */
839 void
840 GST_neighbours_start (void *cls,
841                       GNUNET_TRANSPORT_NotifyConnect connect_cb,
842                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb,
843                       GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb)
844 {
845   callback_cls = cls;
846   connect_notify_cb = connect_cb;
847   disconnect_notify_cb = disconnect_cb;
848   address_change_cb = peer_address_cb;
849   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
850   neighbours_connected = 0;
851   bytes_in_send_queue = 0;
852   bytes_received = 0;
853 }
854
855
856 static void
857 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
858                       int result)
859 {
860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861               "Sending DISCONNECT message to peer `%4s': %i\n",
862               GNUNET_i2s (target), result);
863 }
864
865
866 static int
867 send_disconnect (struct NeighbourMapEntry * n)
868 {
869   size_t ret;
870   struct SessionDisconnectMessage disconnect_msg;
871
872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873               "Sending DISCONNECT message to peer `%4s'\n",
874               GNUNET_i2s (&n->id));
875   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
876   disconnect_msg.header.type =
877       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
878   disconnect_msg.reserved = htonl (0);
879   disconnect_msg.purpose.size =
880       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
881              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
882              sizeof (struct GNUNET_TIME_AbsoluteNBO));
883   disconnect_msg.purpose.purpose =
884       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
885   disconnect_msg.timestamp =
886       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
887   disconnect_msg.public_key = GST_my_public_key;
888   GNUNET_assert (GNUNET_OK ==
889                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
890                                          &disconnect_msg.purpose,
891                                          &disconnect_msg.signature));
892
893   ret = send_with_session (n,
894             (const char *) &disconnect_msg, sizeof (disconnect_msg),
895             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
896             &send_disconnect_cont, NULL);
897
898   if (ret == GNUNET_SYSERR)
899     return GNUNET_SYSERR;
900
901   GNUNET_STATISTICS_update (GST_stats,
902                             gettext_noop
903                             ("# peers disconnected due to external request"), 1,
904                             GNUNET_NO);
905   return GNUNET_OK;
906 }
907
908
909 /**
910  * Disconnect from the given neighbour, clean up the record.
911  *
912  * @param n neighbour to disconnect from
913  */
914 static void
915 disconnect_neighbour (struct NeighbourMapEntry *n)
916 {
917   struct MessageQueue *mq;
918   int previous_state;
919
920   previous_state = n->state;
921
922   if (is_disconnecting (n))
923     return;
924
925   /* send DISCONNECT MESSAGE */
926   if (previous_state == S_CONNECTED)
927   {
928     if (GNUNET_OK == send_disconnect (n))
929       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
930                   GNUNET_i2s (&n->id));
931     else
932       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933                   "Could not send DISCONNECT_MSG to `%s'\n",
934                   GNUNET_i2s (&n->id));
935   }
936
937   change_state (n, S_DISCONNECT);
938
939   if (previous_state == S_CONNECTED)
940   {
941     GNUNET_assert (NULL != n->address);
942     if (n->address_state == USED)
943     {
944       GST_validation_set_address_use (n->address, n->session, GNUNET_NO, __LINE__);
945       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
946       n->address_state = UNUSED;
947     }
948   }
949
950   if (n->address != NULL)
951   {
952     struct GNUNET_TRANSPORT_PluginFunctions *papi;
953
954     papi = GST_plugins_find (n->address->transport_name);
955     if (papi != NULL)
956       papi->disconnect (papi->cls, &n->id);
957   }
958   while (NULL != (mq = n->messages_head))
959   {
960     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
961     if (NULL != mq->cont)
962       mq->cont (mq->cont_cls, GNUNET_SYSERR);
963     GNUNET_free (mq);
964   }
965   if (NULL != n->is_active)
966   {
967     n->is_active->n = NULL;
968     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969                 "Failing transmission of active message due to disconnect\n");
970     if (NULL != n->is_active->cont)
971       n->is_active->cont (n->is_active->cont_cls, GNUNET_SYSERR);
972     GNUNET_free (n->is_active);
973     n->is_active = NULL;
974   }
975
976   switch (previous_state)
977   {
978   case S_CONNECTED:
979     GNUNET_assert (neighbours_connected > 0);
980     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
981     GNUNET_SCHEDULER_cancel (n->keepalive_task);
982     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
983     n->expect_latency_response = GNUNET_NO;
984     neighbours_connected--;
985     GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
986                               GNUNET_NO);
987     disconnect_notify_cb (callback_cls, &n->id);
988     break;
989   case S_FAST_RECONNECT:
990     GNUNET_STATISTICS_update (GST_stats,
991                               gettext_noop ("# fast reconnects failed"), 1,
992                               GNUNET_NO);
993     disconnect_notify_cb (callback_cls, &n->id);
994     break;
995   default:
996     break;
997   }
998
999   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
1000
1001   GNUNET_assert (GNUNET_YES ==
1002                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
1003                                                        &n->id.hashPubKey, n));
1004   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
1005   {
1006     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1007     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1008   }
1009   if (n->ats_request != GNUNET_SCHEDULER_NO_TASK)
1010   {
1011     GNUNET_SCHEDULER_cancel (n->ats_request);
1012     n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1013   }
1014
1015   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
1016   {
1017     GNUNET_SCHEDULER_cancel (n->timeout_task);
1018     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1019   }
1020   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
1021   {
1022     GNUNET_SCHEDULER_cancel (n->transmission_task);
1023     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
1024   }
1025   if (NULL != n->fast_reconnect_address)
1026   {
1027     GNUNET_HELLO_address_free (n->fast_reconnect_address);
1028     n->fast_reconnect_address = NULL;
1029   }
1030   if (NULL != n->address)
1031   {
1032     GNUNET_HELLO_address_free (n->address);
1033     n->address = NULL;
1034   }
1035   n->session = NULL;
1036   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
1037               GNUNET_i2s (&n->id), n);
1038   GNUNET_free (n);
1039 }
1040
1041
1042 /**
1043  * Peer has been idle for too long. Disconnect.
1044  *
1045  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1046  * @param tc scheduler context
1047  */
1048 static void
1049 neighbour_timeout_task (void *cls,
1050                         const struct GNUNET_SCHEDULER_TaskContext *tc)
1051 {
1052   struct NeighbourMapEntry *n = cls;
1053
1054   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer`%4s' disconnected due to timeout\n",
1056               GNUNET_i2s (&n->id));
1057
1058   GNUNET_STATISTICS_update (GST_stats,
1059                             gettext_noop
1060                             ("# peers disconnected due to timeout"), 1,
1061                             GNUNET_NO);
1062   disconnect_neighbour (n);
1063 }
1064
1065
1066 /**
1067  * Send another keepalive message.
1068  *
1069  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1070  * @param tc scheduler context
1071  */
1072 static void
1073 neighbour_keepalive_task (void *cls,
1074                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1075 {
1076   struct NeighbourMapEntry *n = cls;
1077   struct GNUNET_MessageHeader m;
1078   int ret;
1079
1080   GNUNET_assert (S_CONNECTED == n->state);
1081   n->keepalive_task =
1082       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1083                                     &neighbour_keepalive_task, n);
1084
1085   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1086                             GNUNET_NO);
1087   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1088   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1089
1090   ret = send_with_session (n,
1091             (const void *) &m, sizeof (m),
1092             UINT32_MAX /* priority */ ,
1093             GNUNET_TIME_UNIT_FOREVER_REL,
1094             NULL, NULL);
1095
1096   n->expect_latency_response = GNUNET_NO;
1097   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1098   if (ret != GNUNET_SYSERR)
1099   {
1100     n->expect_latency_response = GNUNET_YES;
1101     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1102   }
1103
1104 }
1105
1106
1107 /**
1108  * Disconnect from the given neighbour.
1109  *
1110  * @param cls unused
1111  * @param key hash of neighbour's public key (not used)
1112  * @param value the 'struct NeighbourMapEntry' of the neighbour
1113  */
1114 static int
1115 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1116 {
1117   struct NeighbourMapEntry *n = value;
1118
1119   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1120               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1121   if (S_CONNECTED == n->state)
1122     GNUNET_STATISTICS_update (GST_stats,
1123                               gettext_noop
1124                               ("# peers disconnected due to global disconnect"),
1125                               1, GNUNET_NO);
1126   disconnect_neighbour (n);
1127   return GNUNET_OK;
1128 }
1129
1130
1131 static void
1132 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1133 {
1134   struct NeighbourMapEntry *n = cls;
1135
1136   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1137
1138   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1139               "ATS did not suggested address to connect to peer `%s'\n",
1140               GNUNET_i2s (&n->id));
1141
1142   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# ATS address suggestions failed"), 1,
1143                             GNUNET_NO);
1144
1145
1146   disconnect_neighbour (n);
1147 }
1148
1149 /**
1150  * Cleanup the neighbours subsystem.
1151  */
1152 void
1153 GST_neighbours_stop ()
1154 {
1155   // This can happen during shutdown
1156   if (neighbours == NULL)
1157   {
1158     return;
1159   }
1160
1161   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1162                                          NULL);
1163   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1164 //  GNUNET_assert (neighbours_connected == 0);
1165   neighbours = NULL;
1166   callback_cls = NULL;
1167   connect_notify_cb = NULL;
1168   disconnect_notify_cb = NULL;
1169   address_change_cb = NULL;
1170 }
1171
1172 struct ContinutionContext
1173 {
1174   struct GNUNET_HELLO_Address *address;
1175
1176   struct Session *session;
1177 };
1178
1179 static void
1180 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1181                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1182 {
1183   struct QuotaSetMessage q_msg;
1184
1185   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1187               ntohl (quota.value__), GNUNET_i2s (target));
1188   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1189   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1190   q_msg.quota = quota;
1191   q_msg.peer = (*target);
1192   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1193 }
1194
1195 /**
1196  * We tried to send a SESSION_CONNECT message to another peer.  If this
1197  * succeeded, we change the state.  If it failed, we should tell
1198  * ATS to not use this address anymore (until it is re-validated).
1199  *
1200  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1201  * @param target peer to send the message to
1202  * @param success GNUNET_OK on success
1203  */
1204 static void
1205 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1206                            int success)
1207 {
1208   struct ContinutionContext *cc = cls;
1209   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1210
1211   if (GNUNET_YES != success)
1212   {
1213     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1214     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1215   }
1216   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1217   {
1218     GNUNET_HELLO_address_free (cc->address);
1219     GNUNET_free (cc);
1220     return;
1221   }
1222
1223   if ((GNUNET_YES == success) &&
1224       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1225   {
1226     change_state (n, S_CONNECT_SENT);
1227     GNUNET_HELLO_address_free (cc->address);
1228     GNUNET_free (cc);
1229     return;
1230   }
1231
1232   if ((GNUNET_NO == success) &&
1233       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1234   {
1235     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1236                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1237                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1238     change_state (n, S_NOT_CONNECTED);
1239     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1240       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1241     n->ats_suggest =
1242         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1243                                       n);
1244     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1245   }
1246   GNUNET_HELLO_address_free (cc->address);
1247   GNUNET_free (cc);
1248 }
1249
1250
1251 static void
1252 ats_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1253 {
1254   struct NeighbourMapEntry *n = cls;
1255   n->ats_request = GNUNET_SCHEDULER_NO_TASK;
1256
1257   n->ats_suggest =
1258     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1259                                   n);
1260 }
1261
1262
1263 /**
1264  * We tried to switch addresses with an peer already connected. If it failed,
1265  * we should tell ATS to not use this address anymore (until it is re-validated).
1266  *
1267  * @param cls the 'struct NeighbourMapEntry'
1268  * @param target peer to send the message to
1269  * @param success GNUNET_OK on success
1270  */
1271 static void
1272 send_switch_address_continuation (void *cls,
1273                                   const struct GNUNET_PeerIdentity *target,
1274                                   int success)
1275 {
1276   struct ContinutionContext *cc = cls;
1277   struct NeighbourMapEntry *n;
1278
1279   if (neighbours == NULL)
1280   {
1281     GNUNET_HELLO_address_free (cc->address);
1282     GNUNET_free (cc);
1283     return;                     /* neighbour is going away */
1284   }
1285
1286   n = lookup_neighbour (&cc->address->peer);
1287   if ((n == NULL) || (is_disconnecting (n)))
1288   {
1289     GNUNET_HELLO_address_free (cc->address);
1290     GNUNET_free (cc);
1291     return;                     /* neighbour is going away */
1292   }
1293
1294   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1295   if (GNUNET_YES != success)
1296   {
1297
1298     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1299                 "Failed to switch connected peer `%s' in state %s to address '%s' session %X, asking ATS for new address \n",
1300                 GNUNET_i2s (&n->id), print_state(n->state), GST_plugins_a2s (cc->address), cc->session);
1301
1302     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1303     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1304
1305     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1306     {
1307       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1308       n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1309     }
1310
1311     if (n->state == S_FAST_RECONNECT)
1312     {
1313       if (GNUNET_SCHEDULER_NO_TASK == n->ats_request)
1314       {
1315         /* Throtteled ATS request for fast reconnect */
1316         struct GNUNET_TIME_Relative delay = GNUNET_TIME_relative_multiply(FAST_RECONNECT_RATE, n->fast_reconnect_attempts);
1317         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318                     "Fast reconnect attempt %u failed, delay ATS request for %llu ms\n", n->fast_reconnect_attempts, delay.rel_value);
1319         n->ats_request =
1320             GNUNET_SCHEDULER_add_delayed (delay,
1321                                           ats_request,
1322                                           n);
1323       }
1324     }
1325     else
1326     {
1327       /* Immediate ATS request for connected peers */
1328       n->ats_suggest =
1329         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1330                                       n);
1331       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1332     }
1333     GNUNET_HELLO_address_free (cc->address);
1334     GNUNET_free (cc);
1335     return;
1336   }
1337   /* Tell ATS that switching addresses was successful */
1338   switch (n->state)
1339   {
1340   case S_CONNECTED:
1341     if (n->address_state == FRESH)
1342     {
1343       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1344       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1345       GNUNET_break (cc->session == n->session);
1346       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1347       n->address_state = USED;
1348     }
1349     break;
1350   case S_FAST_RECONNECT:
1351     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1352                 "Successful fast reconnect to peer `%s'\n",
1353                 GNUNET_i2s (&n->id));
1354     change_state (n, S_CONNECTED);
1355     neighbours_connected++;
1356     GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1357                               GNUNET_NO);
1358
1359     if (n->address_state == FRESH)
1360     {
1361       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES,  __LINE__);
1362       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1363       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1364       n->address_state = USED;
1365     }
1366
1367     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1368       n->keepalive_task =
1369           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1370
1371     /* Updating quotas */
1372     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1373     send_outbound_quota (target, n->bandwidth_out);
1374
1375   default:
1376     break;
1377   }
1378   GNUNET_HELLO_address_free (cc->address);
1379   GNUNET_free (cc);
1380 }
1381
1382
1383 /**
1384  * We tried to send a SESSION_CONNECT message to another peer.  If this
1385  * succeeded, we change the state.  If it failed, we should tell
1386  * ATS to not use this address anymore (until it is re-validated).
1387  *
1388  * @param cls the 'struct NeighbourMapEntry'
1389  * @param target peer to send the message to
1390  * @param success GNUNET_OK on success
1391  */
1392 static void
1393 send_connect_ack_continuation (void *cls,
1394                                const struct GNUNET_PeerIdentity *target,
1395                                int success)
1396 {
1397   struct ContinutionContext *cc = cls;
1398   struct NeighbourMapEntry *n;
1399
1400   if (neighbours == NULL)
1401   {
1402     GNUNET_HELLO_address_free (cc->address);
1403     GNUNET_free (cc);
1404     return;                     /* neighbour is going away */
1405   }
1406
1407   n = lookup_neighbour (&cc->address->peer);
1408   if ((n == NULL) || (is_disconnecting (n)))
1409   {
1410     GNUNET_HELLO_address_free (cc->address);
1411     GNUNET_free (cc);
1412     return;                     /* neighbour is going away */
1413   }
1414
1415   if (GNUNET_YES == success)
1416   {
1417     GNUNET_HELLO_address_free (cc->address);
1418     GNUNET_free (cc);
1419     return;                     /* sending successful */
1420   }
1421
1422   /* sending failed, ask for next address  */
1423   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1424               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1425               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1426   if (n->state != S_NOT_CONNECTED)
1427     change_state (n, S_NOT_CONNECTED);
1428   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1429   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1430
1431   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1432     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1433   n->ats_suggest =
1434       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1435                                     n);
1436   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1437   GNUNET_HELLO_address_free (cc->address);
1438   GNUNET_free (cc);
1439 }
1440
1441
1442 /**
1443  * For an existing neighbour record, set the active connection to
1444  * use the given address.
1445  *
1446  * @param peer identity of the peer to switch the address for
1447  * @param address address of the other peer, NULL if other peer
1448  *                       connected to us
1449  * @param session session to use (or NULL)
1450  * @param ats performance data
1451  * @param ats_count number of entries in ats
1452  * @param bandwidth_in inbound quota to be used when connection is up
1453  * @param bandwidth_out outbound quota to be used when connection is up
1454  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1455  *         connection is not up (yet)
1456  */
1457 int
1458 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
1459                                        const struct GNUNET_HELLO_Address
1460                                        *address,
1461                                        struct Session *session,
1462                                        const struct GNUNET_ATS_Information *ats,
1463                                        uint32_t ats_count,
1464                                        struct GNUNET_BANDWIDTH_Value32NBO
1465                                        bandwidth_in,
1466                                        struct GNUNET_BANDWIDTH_Value32NBO
1467                                        bandwidth_out)
1468 {
1469   struct NeighbourMapEntry *n;
1470   struct SessionConnectMessage connect_msg;
1471   struct ContinutionContext *cc;
1472   size_t msg_len;
1473   size_t ret;
1474
1475   if (neighbours == NULL)
1476   {
1477     /* This can happen during shutdown */
1478     return GNUNET_NO;
1479   }
1480   n = lookup_neighbour (peer);
1481   if (NULL == n)
1482     return GNUNET_NO;
1483   if (n->state == S_DISCONNECT)
1484   {
1485     /* We are disconnecting, nothing to do here */
1486     return GNUNET_NO;
1487   }
1488   GNUNET_assert (address->transport_name != NULL);
1489   if ((session == NULL) && (0 == address->address_length))
1490   {
1491     GNUNET_break_op (0);
1492     /* FIXME: is this actually possible? When does this happen? */
1493     if (strlen (address->transport_name) > 0)
1494       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1495     GNUNET_ATS_suggest_address (GST_ats, peer);
1496     return GNUNET_NO;
1497   }
1498
1499   /* checks successful and neighbour != NULL */
1500   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1501               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1502               (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>",
1503               session,
1504               GNUNET_i2s (peer),
1505               print_state (n->state));
1506
1507   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1508   {
1509     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1510     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1511   }
1512
1513
1514   if (n->state == S_FAST_RECONNECT)
1515   {
1516     /* Throtteled fast reconnect */
1517
1518     if (NULL == n->fast_reconnect_address)
1519     {
1520       n->fast_reconnect_address = GNUNET_HELLO_address_copy (address);
1521
1522     }
1523     else if (0 == GNUNET_HELLO_address_cmp(address, n->fast_reconnect_address))
1524     {
1525       n->fast_reconnect_attempts ++;
1526
1527       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1528                 "FAST RECONNECT to peer `%s' and address '%s' with identical address attempt %u\n",
1529                 GNUNET_i2s (&n->id), GST_plugins_a2s (address), n->fast_reconnect_attempts);
1530     }
1531   }
1532   else
1533   {
1534     n->fast_reconnect_attempts = 0;
1535   }
1536
1537   /* do not switch addresses just update quotas */
1538   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1539       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1540       (n->session == session))
1541   {
1542     n->bandwidth_in = bandwidth_in;
1543     n->bandwidth_out = bandwidth_out;
1544     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1545     send_outbound_quota (peer, n->bandwidth_out);
1546     return GNUNET_NO;
1547   }
1548   if (n->state == S_CONNECTED)
1549   {
1550     /* mark old address as no longer used */
1551     GNUNET_assert (NULL != n->address);
1552     if (n->address_state == USED)
1553     {
1554       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1555       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1556       n->address_state = UNUSED;
1557     }
1558   }
1559
1560   /* set new address */
1561   if (NULL != n->address)
1562     GNUNET_HELLO_address_free (n->address);
1563   n->address = GNUNET_HELLO_address_copy (address);
1564   n->address_state = FRESH;
1565   n->bandwidth_in = bandwidth_in;
1566   n->bandwidth_out = bandwidth_out;
1567   GNUNET_SCHEDULER_cancel (n->timeout_task);
1568   n->timeout_task =
1569       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1570                                     &neighbour_timeout_task, n);
1571
1572   if (NULL != address_change_cb && n->state == S_CONNECTED)
1573     address_change_cb (callback_cls, &n->id, n->address); 
1574
1575   /* Obtain an session for this address from plugin */
1576   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1577   papi = GST_plugins_find (address->transport_name);
1578
1579   if (papi == NULL)
1580   {
1581     /* we don't have the plugin for this address */
1582     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1583
1584     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1585       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1586     n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1587                                       ats_suggest_cancel,
1588                                       n);
1589     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1590     GNUNET_HELLO_address_free (n->address);
1591     n->address = NULL;
1592     n->session = NULL;
1593     return GNUNET_NO;
1594   }
1595
1596   if (session == NULL)
1597   {
1598     n->session = papi->get_session (papi->cls, address);
1599     /* Session could not be initiated */
1600     if (n->session == NULL)
1601     {
1602       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603                   "Failed to obtain new session %p for peer `%s' and  address '%s'\n",
1604                   n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1605
1606       GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1607
1608       if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1609         GNUNET_SCHEDULER_cancel (n->ats_suggest);
1610       n->ats_suggest =  GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1611                                         ats_suggest_cancel,
1612                                         n);
1613       GNUNET_ATS_suggest_address (GST_ats, &n->id);
1614       GNUNET_HELLO_address_free (n->address);
1615       n->address = NULL;
1616       n->session = NULL;
1617       return GNUNET_NO;
1618     }
1619
1620     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1621                 "Obtained new session %p for peer `%s' and  address '%s'\n",
1622                  n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address));
1623     /* Telling ATS about new session */
1624     GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0);
1625   }
1626   else
1627   {
1628     n->session = session;
1629     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1630                 "Using existing session %p for peer `%s' and  address '%s'\n",
1631                 n->session,
1632                 GNUNET_i2s (&n->id),
1633                 (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>");
1634   }
1635
1636   switch (n->state)
1637   {
1638   case S_NOT_CONNECTED:
1639   case S_CONNECT_SENT:
1640     msg_len = sizeof (struct SessionConnectMessage);
1641     connect_msg.header.size = htons (msg_len);
1642     connect_msg.header.type =
1643         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1644     connect_msg.reserved = htonl (0);
1645     connect_msg.timestamp =
1646         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1647
1648     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1649     cc->session = n->session;
1650     cc->address = GNUNET_HELLO_address_copy (address);
1651     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1652                 "Sending CONNECT message to %s\n",
1653                 GNUNET_i2s (&n->id));
1654     if (n->state != S_CONNECT_SENT)
1655       change_state (n, S_CONNECT_SENT);
1656     ret = send_with_session (n,
1657       (const char *) &connect_msg, msg_len,
1658       UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1659       &send_connect_continuation, cc);
1660
1661     return GNUNET_NO;
1662   case S_CONNECT_RECV:
1663     /* We received a CONNECT message and asked ATS for an address */
1664     msg_len = sizeof (struct SessionConnectMessage);
1665     connect_msg.header.size = htons (msg_len);
1666     connect_msg.header.type =
1667         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1668     connect_msg.reserved = htonl (0);
1669     connect_msg.timestamp =
1670         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1671     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1672     cc->session = n->session;
1673     cc->address = GNUNET_HELLO_address_copy (address);
1674
1675     ret = send_with_session(n,
1676                             (const void *) &connect_msg, msg_len,
1677                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1678                             &send_connect_ack_continuation,
1679                             cc);
1680     return GNUNET_NO;
1681   case S_CONNECTED:
1682   case S_FAST_RECONNECT:
1683     /* connected peer is switching addresses or tries fast reconnect */
1684     msg_len = sizeof (struct SessionConnectMessage);
1685     connect_msg.header.size = htons (msg_len);
1686     connect_msg.header.type =
1687         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1688     connect_msg.reserved = htonl (0);
1689     connect_msg.timestamp =
1690         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1691     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1692     cc->session = n->session;
1693     cc->address = GNUNET_HELLO_address_copy (address);
1694     ret = send_with_session(n,
1695                             (const void *) &connect_msg, msg_len,
1696                             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1697                             &send_switch_address_continuation, cc);
1698     if (ret == GNUNET_SYSERR)
1699     {
1700       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1701                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1702                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1703     }
1704     return GNUNET_NO;
1705   default:
1706     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1707                 "Invalid connection state to switch addresses %u \n", n->state);
1708     GNUNET_break_op (0);
1709     return GNUNET_NO;
1710   }
1711 }
1712
1713
1714 /**
1715  * Obtain current latency information for the given neighbour.
1716  *
1717  * @param peer
1718  * @return observed latency of the address, FOREVER if the address was
1719  *         never successfully validated
1720  */
1721 struct GNUNET_TIME_Relative
1722 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1723 {
1724   struct NeighbourMapEntry *n;
1725
1726   n = lookup_neighbour (peer);
1727   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1728     return GNUNET_TIME_UNIT_FOREVER_REL;
1729
1730   return n->latency;
1731 }
1732
1733 /**
1734  * Obtain current address information for the given neighbour.
1735  *
1736  * @param peer
1737  * @return address currently used
1738  */
1739 struct GNUNET_HELLO_Address *
1740 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1741 {
1742   struct NeighbourMapEntry *n;
1743
1744   n = lookup_neighbour (peer);
1745   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1746     return NULL;
1747
1748   return n->address;
1749 }
1750
1751
1752
1753 /**
1754  * Create an entry in the neighbour map for the given peer
1755  *
1756  * @param peer peer to create an entry for
1757  * @return new neighbour map entry
1758  */
1759 static struct NeighbourMapEntry *
1760 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1761 {
1762   struct NeighbourMapEntry *n;
1763
1764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1766   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1767   n->id = *peer;
1768   n->state = S_NOT_CONNECTED;
1769   n->latency = GNUNET_TIME_relative_get_forever ();
1770   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1771                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1772                                  MAX_BANDWIDTH_CARRY_S);
1773   n->timeout_task =
1774       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1775                                     &neighbour_timeout_task, n);
1776   GNUNET_assert (GNUNET_OK ==
1777                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1778                                                     &n->id.hashPubKey, n,
1779                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1780   return n;
1781 }
1782
1783
1784 /**
1785  * Try to create a connection to the given target (eventually).
1786  *
1787  * @param target peer to try to connect to
1788  */
1789 void
1790 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1791 {
1792   struct NeighbourMapEntry *n;
1793
1794   // This can happen during shutdown
1795   if (neighbours == NULL)
1796   {
1797     return;
1798   }
1799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1800               GNUNET_i2s (target));
1801   if (0 ==
1802       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1803   {
1804     /* my own hello */
1805     return;
1806   }
1807   n = lookup_neighbour (target);
1808
1809   if (NULL != n)
1810   {
1811     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1812       return;                   /* already connecting or connected */
1813     if (is_disconnecting (n))
1814       change_state (n, S_NOT_CONNECTED);
1815   }
1816
1817
1818   if (n == NULL)
1819     n = setup_neighbour (target);
1820   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1821               "Asking ATS for suggested address to connect to peer `%s'\n",
1822               GNUNET_i2s (&n->id));
1823   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1824 }
1825
1826 /**
1827  * Test if we're connected to the given peer.
1828  *
1829  * @param target peer to test
1830  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1831  */
1832 int
1833 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1834 {
1835   struct NeighbourMapEntry *n;
1836
1837   // This can happen during shutdown
1838   if (neighbours == NULL)
1839   {
1840     return GNUNET_NO;
1841   }
1842
1843   n = lookup_neighbour (target);
1844
1845   if ((NULL == n) || (S_CONNECTED != n->state))
1846     return GNUNET_NO;           /* not connected */
1847   return GNUNET_YES;
1848 }
1849
1850 /**
1851  * A session was terminated. Take note.
1852  *
1853  * @param peer identity of the peer where the session died
1854  * @param session session that is gone
1855  */
1856 void
1857 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1858                                    struct Session *session)
1859 {
1860   struct NeighbourMapEntry *n;
1861
1862   if (neighbours == NULL)
1863   {
1864     /* This can happen during shutdown */
1865     return;
1866   }
1867   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p to peer `%s' ended \n",
1868               session, GNUNET_i2s (peer));
1869   n = lookup_neighbour (peer);
1870   if (NULL == n)
1871     return;
1872   if (session != n->session)
1873     return;                     /* doesn't affect us */
1874   if (n->state == S_CONNECTED)
1875   {
1876     if (n->address_state == USED)
1877     {
1878       GST_validation_set_address_use (n->address, n->session, GNUNET_NO,  __LINE__);
1879       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1880       n->address_state = UNUSED;
1881
1882     }
1883   }
1884
1885   if (NULL != n->address)
1886   {
1887     GNUNET_HELLO_address_free (n->address);
1888     n->address = NULL;
1889   }
1890   n->session = NULL;
1891
1892   /* not connected anymore anyway, shouldn't matter */
1893   if (S_CONNECTED != n->state)
1894     return;
1895
1896   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1897   {
1898     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1899     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1900     n->expect_latency_response = GNUNET_NO;
1901   }
1902
1903   /* connected, try fast reconnect */
1904   /* statistics "transport" : "# peers connected" -= 1
1905    * neighbours_connected -= 1
1906    * BUT: no disconnect_cb to notify clients about disconnect
1907    */
1908
1909   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1910               GNUNET_i2s (peer));
1911
1912   GNUNET_assert (neighbours_connected > 0);
1913   change_state (n, S_FAST_RECONNECT);
1914   neighbours_connected--;
1915   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
1916                             GNUNET_NO);
1917
1918
1919   /* We are connected, so ask ATS to switch addresses */
1920   GNUNET_SCHEDULER_cancel (n->timeout_task);
1921   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1922                                     &neighbour_timeout_task, n);
1923   /* try QUICKLY to re-establish a connection, reduce timeout! */
1924   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1925     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1926   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT,
1927                                     &ats_suggest_cancel,
1928                                     n);
1929   GNUNET_ATS_suggest_address (GST_ats, peer);
1930 }
1931
1932
1933 /**
1934  * Transmit a message to the given target using the active connection.
1935  *
1936  * @param target destination
1937  * @param msg message to send
1938  * @param msg_size number of bytes in msg
1939  * @param timeout when to fail with timeout
1940  * @param cont function to call when done
1941  * @param cont_cls closure for 'cont'
1942  */
1943 void
1944 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1945                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1946                      GST_NeighbourSendContinuation cont, void *cont_cls)
1947 {
1948   struct NeighbourMapEntry *n;
1949   struct MessageQueue *mq;
1950
1951   // This can happen during shutdown
1952   if (neighbours == NULL)
1953   {
1954     return;
1955   }
1956
1957   n = lookup_neighbour (target);
1958   if ((n == NULL) || (!is_connected (n)))
1959   {
1960     GNUNET_STATISTICS_update (GST_stats,
1961                               gettext_noop
1962                               ("# messages not sent (no such peer or not connected)"),
1963                               1, GNUNET_NO);
1964     if (n == NULL)
1965       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1966                   "Could not send message to peer `%s': unknown neighbour",
1967                   GNUNET_i2s (target));
1968     else if (!is_connected (n))
1969       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1970                   "Could not send message to peer `%s': not connected\n",
1971                   GNUNET_i2s (target));
1972     if (NULL != cont)
1973       cont (cont_cls, GNUNET_SYSERR);
1974     return;
1975   }
1976
1977   if ((n->session == NULL) && (n->address == NULL))
1978   {
1979     GNUNET_STATISTICS_update (GST_stats,
1980                               gettext_noop
1981                               ("# messages not sent (no such peer or not connected)"),
1982                               1, GNUNET_NO);
1983     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984                 "Could not send message to peer `%s': no address available\n",
1985                 GNUNET_i2s (target));
1986     if (NULL != cont)
1987       cont (cont_cls, GNUNET_SYSERR);
1988     return;
1989   }
1990
1991   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1992   bytes_in_send_queue += msg_size;
1993   GNUNET_STATISTICS_set (GST_stats,
1994                         gettext_noop
1995                         ("# bytes in message queue for other peers"),
1996                         bytes_in_send_queue, GNUNET_NO);
1997   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1998   mq->cont = cont;
1999   mq->cont_cls = cont_cls;
2000   /* FIXME: this memcpy can be up to 7% of our total runtime! */
2001   memcpy (&mq[1], msg, msg_size);
2002   mq->message_buf = (const char *) &mq[1];
2003   mq->message_buf_size = msg_size;
2004   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
2005   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
2006
2007   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
2008       (NULL == n->is_active))
2009     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
2010 }
2011
2012
2013 /**
2014  * We have received a message from the given sender.  How long should
2015  * we delay before receiving more?  (Also used to keep the peer marked
2016  * as live).
2017  *
2018  * @param sender sender of the message
2019  * @param size size of the message
2020  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
2021  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
2022  *                   GNUNET_SYSERR if the connection is not fully up yet
2023  * @return how long to wait before reading more from this sender
2024  */
2025 struct GNUNET_TIME_Relative
2026 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
2027                                         *sender, ssize_t size, int *do_forward)
2028 {
2029   struct NeighbourMapEntry *n;
2030   struct GNUNET_TIME_Relative ret;
2031
2032   // This can happen during shutdown
2033   if (neighbours == NULL)
2034   {
2035     return GNUNET_TIME_UNIT_FOREVER_REL;
2036   }
2037
2038   n = lookup_neighbour (sender);
2039   if (n == NULL)
2040   {
2041     GST_neighbours_try_connect (sender);
2042     n = lookup_neighbour (sender);
2043     if (NULL == n)
2044     {
2045       GNUNET_STATISTICS_update (GST_stats,
2046                                 gettext_noop
2047                                 ("# messages discarded due to lack of neighbour record"),
2048                                 1, GNUNET_NO);
2049       *do_forward = GNUNET_NO;
2050       return GNUNET_TIME_UNIT_ZERO;
2051     }
2052   }
2053   if (!is_connected (n))
2054   {
2055     *do_forward = GNUNET_SYSERR;
2056     return GNUNET_TIME_UNIT_ZERO;
2057   }
2058   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
2059   {
2060     n->quota_violation_count++;
2061     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
2063                 n->in_tracker.available_bytes_per_s__,
2064                 n->quota_violation_count);
2065     /* Discount 32k per violation */
2066     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
2067   }
2068   else
2069   {
2070     if (n->quota_violation_count > 0)
2071     {
2072       /* try to add 32k back */
2073       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
2074       n->quota_violation_count--;
2075     }
2076   }
2077   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
2078   {
2079     GNUNET_STATISTICS_update (GST_stats,
2080                               gettext_noop
2081                               ("# bandwidth quota violations by other peers"),
2082                               1, GNUNET_NO);
2083     *do_forward = GNUNET_NO;
2084     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
2085   }
2086   *do_forward = GNUNET_YES;
2087   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
2088   if (ret.rel_value > 0)
2089   {
2090     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2091                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
2092                 (unsigned long long) n->in_tracker.
2093                 consumption_since_last_update__,
2094                 (unsigned int) n->in_tracker.available_bytes_per_s__,
2095                 (unsigned long long) ret.rel_value);
2096     GNUNET_STATISTICS_update (GST_stats,
2097                               gettext_noop ("# ms throttling suggested"),
2098                               (int64_t) ret.rel_value, GNUNET_NO);
2099   }
2100   return ret;
2101 }
2102
2103
2104 /**
2105  * Keep the connection to the given neighbour alive longer,
2106  * we received a KEEPALIVE (or equivalent).
2107  *
2108  * @param neighbour neighbour to keep alive
2109  */
2110 void
2111 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
2112 {
2113   struct NeighbourMapEntry *n;
2114
2115   // This can happen during shutdown
2116   if (neighbours == NULL)
2117   {
2118     return;
2119   }
2120
2121   n = lookup_neighbour (neighbour);
2122   if (NULL == n)
2123   {
2124     GNUNET_STATISTICS_update (GST_stats,
2125                               gettext_noop
2126                               ("# KEEPALIVE messages discarded (not connected)"),
2127                               1, GNUNET_NO);
2128     return;
2129   }
2130   GNUNET_SCHEDULER_cancel (n->timeout_task);
2131   n->timeout_task =
2132       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2133                                     &neighbour_timeout_task, n);
2134
2135   /* send reply to measure latency */
2136   if (S_CONNECTED != n->state)
2137     return;
2138
2139   struct GNUNET_MessageHeader m;
2140
2141   m.size = htons (sizeof (struct GNUNET_MessageHeader));
2142   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
2143
2144   send_with_session(n,
2145       (const void *) &m, sizeof (m),
2146       UINT32_MAX,
2147       GNUNET_TIME_UNIT_FOREVER_REL,
2148       NULL, NULL);
2149 }
2150
2151 /**
2152  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
2153  * to this peer
2154  *
2155  * @param neighbour neighbour to keep alive
2156  * @param ats performance data
2157  * @param ats_count number of entries in ats
2158  */
2159 void
2160 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
2161                                    const struct GNUNET_ATS_Information *ats,
2162                                    uint32_t ats_count)
2163 {
2164   struct NeighbourMapEntry *n;
2165   struct GNUNET_ATS_Information *ats_new;
2166   uint32_t latency;
2167
2168   if (neighbours == NULL)
2169   {
2170     // This can happen during shutdown
2171     return;
2172   }
2173
2174   n = lookup_neighbour (neighbour);
2175   if ((NULL == n) || (n->state != S_CONNECTED))
2176   {
2177     GNUNET_STATISTICS_update (GST_stats,
2178                               gettext_noop
2179                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2180                               1, GNUNET_NO);
2181     return;
2182   }
2183   if (n->expect_latency_response != GNUNET_YES)
2184   {
2185     GNUNET_STATISTICS_update (GST_stats,
2186                               gettext_noop
2187                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2188                               1, GNUNET_NO);
2189     return;
2190   }
2191   n->expect_latency_response = GNUNET_NO;
2192
2193   GNUNET_assert (n->keep_alive_sent.abs_value !=
2194                  GNUNET_TIME_absolute_get_zero ().abs_value);
2195   n->latency =
2196       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2197                                            GNUNET_TIME_absolute_get ());
2198   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2199               GNUNET_i2s (&n->id), n->latency.rel_value);
2200
2201   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2202   {
2203     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2204   }
2205   else
2206   {
2207     ats_new =
2208         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2209                        (ats_count + 1));
2210     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2211
2212     /* add latency */
2213     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2214     if (n->latency.rel_value > UINT32_MAX)
2215       latency = UINT32_MAX;
2216     else
2217       latency = n->latency.rel_value;
2218     ats_new[ats_count].value = htonl (latency);
2219
2220     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2221                                ats_count + 1);
2222     GNUNET_free (ats_new);
2223   }
2224 }
2225
2226
2227 /**
2228  * Change the incoming quota for the given peer.
2229  *
2230  * @param neighbour identity of peer to change qutoa for
2231  * @param quota new quota
2232  */
2233 void
2234 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2235                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2236 {
2237   struct NeighbourMapEntry *n;
2238
2239   // This can happen during shutdown
2240   if (neighbours == NULL)
2241   {
2242     return;
2243   }
2244
2245   n = lookup_neighbour (neighbour);
2246   if (n == NULL)
2247   {
2248     GNUNET_STATISTICS_update (GST_stats,
2249                               gettext_noop
2250                               ("# SET QUOTA messages ignored (no such peer)"),
2251                               1, GNUNET_NO);
2252     return;
2253   }
2254   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2255               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2256               ntohl (quota.value__), GNUNET_i2s (&n->id));
2257   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2258   if (0 != ntohl (quota.value__))
2259     return;
2260   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2261               GNUNET_i2s (&n->id), "SET_QUOTA");
2262   if (is_connected (n))
2263     GNUNET_STATISTICS_update (GST_stats,
2264                               gettext_noop ("# disconnects due to quota of 0"),
2265                               1, GNUNET_NO);
2266   disconnect_neighbour (n);
2267 }
2268
2269
2270 /**
2271  * Closure for the neighbours_iterate function.
2272  */
2273 struct IteratorContext
2274 {
2275   /**
2276    * Function to call on each connected neighbour.
2277    */
2278   GST_NeighbourIterator cb;
2279
2280   /**
2281    * Closure for 'cb'.
2282    */
2283   void *cb_cls;
2284 };
2285
2286
2287 /**
2288  * Call the callback from the closure for each connected neighbour.
2289  *
2290  * @param cls the 'struct IteratorContext'
2291  * @param key the hash of the public key of the neighbour
2292  * @param value the 'struct NeighbourMapEntry'
2293  * @return GNUNET_OK (continue to iterate)
2294  */
2295 static int
2296 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2297 {
2298   struct IteratorContext *ic = cls;
2299   struct NeighbourMapEntry *n = value;
2300
2301   if (!is_connected (n))
2302     return GNUNET_OK;
2303
2304   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2305   return GNUNET_OK;
2306 }
2307
2308
2309 /**
2310  * Iterate over all connected neighbours.
2311  *
2312  * @param cb function to call
2313  * @param cb_cls closure for cb
2314  */
2315 void
2316 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2317 {
2318   struct IteratorContext ic;
2319
2320   // This can happen during shutdown
2321   if (neighbours == NULL)
2322   {
2323     return;
2324   }
2325
2326   ic.cb = cb;
2327   ic.cb_cls = cb_cls;
2328   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2329 }
2330
2331 /**
2332  * If we have an active connection to the given target, it must be shutdown.
2333  *
2334  * @param target peer to disconnect from
2335  */
2336 void
2337 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2338 {
2339   struct NeighbourMapEntry *n;
2340
2341   // This can happen during shutdown
2342   if (neighbours == NULL)
2343   {
2344     return;
2345   }
2346
2347   n = lookup_neighbour (target);
2348   if (NULL == n)
2349     return;                     /* not active */
2350   disconnect_neighbour (n);
2351 }
2352
2353
2354 /**
2355  * We received a disconnect message from the given peer,
2356  * validate and process.
2357  *
2358  * @param peer sender of the message
2359  * @param msg the disconnect message
2360  */
2361 void
2362 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2363                                           *peer,
2364                                           const struct GNUNET_MessageHeader
2365                                           *msg)
2366 {
2367   struct NeighbourMapEntry *n;
2368   const struct SessionDisconnectMessage *sdm;
2369   GNUNET_HashCode hc;
2370
2371   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2372               "Received DISCONNECT message from peer `%s'\n",
2373               GNUNET_i2s (peer));
2374   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2375   {
2376     // GNUNET_break_op (0);
2377     GNUNET_STATISTICS_update (GST_stats,
2378                               gettext_noop
2379                               ("# disconnect messages ignored (old format)"), 1,
2380                               GNUNET_NO);
2381     return;
2382   }
2383   sdm = (const struct SessionDisconnectMessage *) msg;
2384   n = lookup_neighbour (peer);
2385   if (NULL == n)
2386     return;                     /* gone already */
2387   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2388       n->connect_ts.abs_value)
2389   {
2390     GNUNET_STATISTICS_update (GST_stats,
2391                               gettext_noop
2392                               ("# disconnect messages ignored (timestamp)"), 1,
2393                               GNUNET_NO);
2394     return;
2395   }
2396   GNUNET_CRYPTO_hash (&sdm->public_key,
2397                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2398                       &hc);
2399   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2400   {
2401     GNUNET_break_op (0);
2402     return;
2403   }
2404   if (ntohl (sdm->purpose.size) !=
2405       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2406       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2407       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2408   {
2409     GNUNET_break_op (0);
2410     return;
2411   }
2412   if (GNUNET_OK !=
2413       GNUNET_CRYPTO_rsa_verify
2414       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2415        &sdm->signature, &sdm->public_key))
2416   {
2417     GNUNET_break_op (0);
2418     return;
2419   }
2420   GST_neighbours_force_disconnect (peer);
2421 }
2422
2423
2424 /**
2425  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2426  * Consider switching to it.
2427  *
2428  * @param message possibly a 'struct SessionConnectMessage' (check format)
2429  * @param peer identity of the peer to switch the address for
2430  * @param address address of the other peer, NULL if other peer
2431  *                       connected to us
2432  * @param session session to use (or NULL)
2433  * @param ats performance data
2434  * @param ats_count number of entries in ats
2435  */
2436 void
2437 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2438                                    const struct GNUNET_PeerIdentity *peer,
2439                                    const struct GNUNET_HELLO_Address *address,
2440                                    struct Session *session,
2441                                    const struct GNUNET_ATS_Information *ats,
2442                                    uint32_t ats_count)
2443 {
2444   const struct SessionConnectMessage *scm;
2445   struct GNUNET_MessageHeader msg;
2446   struct NeighbourMapEntry *n;
2447   size_t msg_len;
2448   size_t ret;
2449
2450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2451               "Received CONNECT_ACK message from peer `%s'\n",
2452               GNUNET_i2s (peer));
2453
2454   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2455   {
2456     GNUNET_break_op (0);
2457     return;
2458   }
2459   scm = (const struct SessionConnectMessage *) message;
2460   GNUNET_break_op (ntohl (scm->reserved) == 0);
2461   n = lookup_neighbour (peer);
2462   if (NULL == n)
2463   {
2464     /* we did not send 'CONNECT' -- at least not recently */
2465     GNUNET_STATISTICS_update (GST_stats,
2466                               gettext_noop
2467                               ("# unexpected CONNECT_ACK messages (no peer)"),
2468                               1, GNUNET_NO);
2469     return;
2470   }
2471
2472   /* Additional check
2473    *
2474    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2475    *
2476    * We also received an CONNECT message, switched from SENDT to RECV and
2477    * ATS already suggested us an address after a successful blacklist check
2478    */
2479
2480   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2481               "Received CONNECT_ACK message from peer `%s' in state `%s'\n",
2482               GNUNET_i2s (peer),
2483               print_state(n->state));
2484
2485   if ((n->address != NULL) && (n->state == S_CONNECTED))
2486   {
2487     /* After fast reconnect: send ACK (ACK) even when we are connected */
2488     msg_len = sizeof (msg);
2489     msg.size = htons (msg_len);
2490     msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2491
2492     ret = send_with_session(n,
2493               (const char *) &msg, msg_len,
2494               UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2495               NULL, NULL);
2496
2497     if (ret == GNUNET_SYSERR)
2498       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2499                   "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2500                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2501     return;
2502   }
2503
2504   if ((NULL == n->address) ||
2505       ((n->state != S_CONNECT_SENT) &&
2506        (n->state != S_CONNECT_RECV)))
2507   {
2508     GNUNET_STATISTICS_update (GST_stats,
2509                               gettext_noop
2510                               ("# unexpected CONNECT_ACK messages"), 1,
2511                               GNUNET_NO);
2512     return;
2513   }
2514   if (n->state != S_CONNECTED)
2515     change_state (n, S_CONNECTED);
2516
2517   if (NULL != session)
2518   {
2519     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2520                      "transport-ats",
2521                      "Giving ATS session %p of plugin %s for peer %s\n",
2522                      session, address->transport_name, GNUNET_i2s (peer));
2523   }
2524   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2525   GNUNET_assert (NULL != n->address);
2526
2527   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2528   {
2529     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2530     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2531     n->address_state = USED;
2532   }
2533
2534   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2535
2536   /* send ACK (ACK) */
2537   msg_len = sizeof (msg);
2538   msg.size = htons (msg_len);
2539   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2540
2541   ret = send_with_session(n,
2542             (const char *) &msg, msg_len,
2543             UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
2544             NULL, NULL);
2545
2546   if (ret == GNUNET_SYSERR)
2547     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2548                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2549                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2550
2551
2552   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2553     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2554
2555   neighbours_connected++;
2556   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
2557                             GNUNET_NO);
2558   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2559               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2560               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2561               __LINE__);
2562   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2563   send_outbound_quota (peer, n->bandwidth_out);
2564
2565 }
2566
2567
2568 void
2569 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2570                            const struct GNUNET_PeerIdentity *peer,
2571                            const struct GNUNET_HELLO_Address *address,
2572                            struct Session *session,
2573                            const struct GNUNET_ATS_Information *ats,
2574                            uint32_t ats_count)
2575 {
2576   struct NeighbourMapEntry *n;
2577
2578   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2579               GNUNET_i2s (peer));
2580   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2581   {
2582     GNUNET_break_op (0);
2583     return;
2584   }
2585   n = lookup_neighbour (peer);
2586   if (NULL == n)
2587   {
2588     GNUNET_break (0);
2589     return;
2590   }
2591   if (S_CONNECTED == n->state)
2592     return;
2593   if (!is_connecting (n))
2594   {
2595     GNUNET_STATISTICS_update (GST_stats,
2596                               gettext_noop ("# unexpected ACK messages"), 1,
2597                               GNUNET_NO);
2598     return;
2599   }
2600   change_state (n, S_CONNECTED);
2601   if (NULL != session)
2602     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2603                      "transport-ats",
2604                      "Giving ATS session %p of plugin %s for peer %s\n",
2605                      session, address->transport_name, GNUNET_i2s (peer));
2606   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2607   GNUNET_assert (n->address != NULL);
2608
2609   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2610   {
2611     GST_validation_set_address_use (n->address, n->session, GNUNET_YES,  __LINE__);
2612     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2613     n->address_state = USED;
2614   }
2615
2616
2617   neighbours_connected++;
2618   GNUNET_STATISTICS_set (GST_stats, gettext_noop ("# peers connected"), neighbours_connected,
2619                             GNUNET_NO);
2620
2621   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2622   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2623     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2624   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2625               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2626               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2627               __LINE__);
2628   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2629   send_outbound_quota (peer, n->bandwidth_out);
2630 }
2631
2632 struct BlackListCheckContext
2633 {
2634   struct GNUNET_ATS_Information *ats;
2635
2636   uint32_t ats_count;
2637
2638   struct Session *session;
2639
2640   struct GNUNET_HELLO_Address *address;
2641
2642   struct GNUNET_TIME_Absolute ts;
2643 };
2644
2645
2646 static void
2647 handle_connect_blacklist_cont (void *cls,
2648                                const struct GNUNET_PeerIdentity *peer,
2649                                int result)
2650 {
2651   struct NeighbourMapEntry *n;
2652   struct BlackListCheckContext *bcc = cls;
2653
2654   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2655               "Blacklist check due to CONNECT message: `%s'\n",
2656               GNUNET_i2s (peer),
2657               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2658   /* not allowed */
2659   if (GNUNET_OK != result)
2660   {
2661     GNUNET_HELLO_address_free (bcc->address);
2662     GNUNET_free (bcc);
2663     return;
2664   }
2665
2666   n = lookup_neighbour (peer);
2667   if (NULL == n)
2668     n = setup_neighbour (peer);
2669
2670   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2671   {
2672     if (NULL != bcc->session)
2673       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2674                        "transport-ats",
2675                        "Giving ATS session %p of address `%s' for peer %s\n",
2676                        bcc->session, GST_plugins_a2s (bcc->address),
2677                        GNUNET_i2s (peer));
2678     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2679
2680     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2681                                bcc->ats_count);
2682     n->connect_ts = bcc->ts;
2683   }
2684
2685   GNUNET_HELLO_address_free (bcc->address);
2686   GNUNET_free (bcc);
2687
2688   if (n->state != S_CONNECT_RECV)
2689     change_state (n, S_CONNECT_RECV);
2690
2691
2692   /* Ask ATS for an address to connect via that address */
2693   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2694     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2695   n->ats_suggest =
2696       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2697                                     n);
2698   GNUNET_ATS_suggest_address (GST_ats, peer);
2699 }
2700
2701 /**
2702  * We received a 'SESSION_CONNECT' message from the other peer.
2703  * Consider switching to it.
2704  *
2705  * @param message possibly a 'struct SessionConnectMessage' (check format)
2706  * @param peer identity of the peer to switch the address for
2707  * @param address address of the other peer, NULL if other peer
2708  *                       connected to us
2709  * @param session session to use (or NULL)
2710  * @param ats performance data
2711  * @param ats_count number of entries in ats (excluding 0-termination)
2712  */
2713 void
2714 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2715                                const struct GNUNET_PeerIdentity *peer,
2716                                const struct GNUNET_HELLO_Address *address,
2717                                struct Session *session,
2718                                const struct GNUNET_ATS_Information *ats,
2719                                uint32_t ats_count)
2720 {
2721   const struct SessionConnectMessage *scm;
2722   struct BlackListCheckContext *bcc = NULL;
2723   struct NeighbourMapEntry *n;
2724
2725   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2726               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2727   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2728   {
2729     GNUNET_break_op (0);
2730     return;
2731   }
2732
2733   scm = (const struct SessionConnectMessage *) message;
2734   GNUNET_break_op (ntohl (scm->reserved) == 0);
2735
2736   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2737
2738   n = lookup_neighbour (peer);
2739   if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n->state)))
2740   {
2741     /* connected peer switches addresses or is trying to do a fast reconnect*/
2742     return;
2743   }
2744
2745
2746   /* we are not connected to this peer */
2747   /* do blacklist check */
2748   bcc =
2749       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2750                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2751   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2752   bcc->ats_count = ats_count + 1;
2753   bcc->address = GNUNET_HELLO_address_copy (address);
2754   bcc->session = session;
2755   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2756   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2757   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2758   bcc->ats[ats_count].value =
2759       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2760   GST_blacklist_test_allowed (peer, address->transport_name,
2761                               &handle_connect_blacklist_cont, bcc);
2762 }
2763
2764
2765 /* end of file gnunet-service-transport_neighbours.c */