e7d458e7ff5622eb5c93937fa5ceb7fa5cdbc860
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours?
52  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
53  * send 3 keepalives in each interval, so 3 messages would need to be
54  * lost in a row for a disconnect).
55  */
56 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
57
58
59 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
60
61
62 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
63
64
65 /**
66  * Entry in neighbours.
67  */
68 struct NeighbourMapEntry;
69
70 /**
71  * Message a peer sends to another to indicate its
72  * preference for communicating via a particular
73  * session (and the desire to establish a real
74  * connection).
75  */
76 struct SessionConnectMessage
77 {
78   /**
79    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
80    */
81   struct GNUNET_MessageHeader header;
82
83   /**
84    * Always zero.
85    */
86   uint32_t reserved GNUNET_PACKED;
87
88   /**
89    * Absolute time at the sender.  Only the most recent connect
90    * message implies which session is preferred by the sender.
91    */
92   struct GNUNET_TIME_AbsoluteNBO timestamp;
93
94 };
95
96
97 struct SessionDisconnectMessage
98 {
99   /**
100    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Always zero.
106    */
107   uint32_t reserved GNUNET_PACKED;
108
109   /**
110    * Purpose of the signature.  Extends over the timestamp.
111    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
112    */
113   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
114
115   /**
116    * Absolute time at the sender.  Only the most recent connect
117    * message implies which session is preferred by the sender.
118    */
119   struct GNUNET_TIME_AbsoluteNBO timestamp;
120
121   /**
122    * Public key of the sender.
123    */
124   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
125
126   /**
127    * Signature of the peer that sends us the disconnect.  Only
128    * valid if the timestamp is AFTER the timestamp from the
129    * corresponding 'CONNECT' message.
130    */
131   struct GNUNET_CRYPTO_RsaSignature signature;
132
133 };
134
135
136 /**
137  * For each neighbour we keep a list of messages
138  * that we still want to transmit to the neighbour.
139  */
140 struct MessageQueue
141 {
142
143   /**
144    * This is a doubly linked list.
145    */
146   struct MessageQueue *next;
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *prev;
152
153   /**
154    * Once this message is actively being transmitted, which
155    * neighbour is it associated with?
156    */
157   struct NeighbourMapEntry *n;
158
159   /**
160    * Function to call once we're done.
161    */
162   GST_NeighbourSendContinuation cont;
163
164   /**
165    * Closure for 'cont'
166    */
167   void *cont_cls;
168
169   /**
170    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
171    * stuck together in memory.  Allocated at the end of this struct.
172    */
173   const char *message_buf;
174
175   /**
176    * Size of the message buf
177    */
178   size_t message_buf_size;
179
180   /**
181    * At what time should we fail?
182    */
183   struct GNUNET_TIME_Absolute timeout;
184
185 };
186
187 enum State
188 {
189   /* fresh peer or completely disconnected */
190   S_NOT_CONNECTED = 0,
191   /* sent CONNECT message to other peer, waiting for CONNECT_ACK */
192   S_CONNECT_SENT = 1,
193   /* received CONNECT message to other peer, sending CONNECT_ACK */
194   S_CONNECT_RECV = 4,
195   /* sent CONNECT_ACK message to other peer, wait for ACK or payload */
196   S_CONNECT_RECV_ACK_SENT = 8,
197   /* received ACK or payload */
198   S_CONNECTED = 16,
199   /* Disconnect in progress */
200   S_DISCONNECT = 32
201 };
202
203 /**
204  * Entry in neighbours.
205  */
206 struct NeighbourMapEntry
207 {
208
209   /**
210    * Head of list of messages we would like to send to this peer;
211    * must contain at most one message per client.
212    */
213   struct MessageQueue *messages_head;
214
215   /**
216    * Tail of list of messages we would like to send to this peer; must
217    * contain at most one message per client.
218    */
219   struct MessageQueue *messages_tail;
220
221   /**
222    * Performance data for the peer.
223    */
224   //struct GNUNET_ATS_Information *ats;
225
226   /**
227    * Are we currently trying to send a message? If so, which one?
228    */
229   struct MessageQueue *is_active;
230
231   /**
232    * Active session for communicating with the peer.
233    */
234   struct Session *session;
235
236   /**
237    * Name of the plugin we currently use.
238    */
239   char *plugin_name;
240
241   /**
242    * Address used for communicating with the peer, NULL for inbound connections.
243    */
244   void *addr;
245
246   /**
247    * Number of bytes in 'addr'.
248    */
249   size_t addrlen;
250
251   /**
252    * Identity of this neighbour.
253    */
254   struct GNUNET_PeerIdentity id;
255
256   /**
257    * ID of task scheduled to run when this peer is about to
258    * time out (will free resources associated with the peer).
259    */
260   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
261
262   /**
263    * ID of task scheduled to send keepalives.
264    */
265   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
266
267   /**
268    * ID of task scheduled to run when we should try transmitting
269    * the head of the message queue.
270    */
271   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
272
273   /**
274    * Tracker for inbound bandwidth.
275    */
276   struct GNUNET_BANDWIDTH_Tracker in_tracker;
277
278   /**
279    * Inbound bandwidth from ATS, activated when connection is up
280    */
281   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
282
283   /**
284    * Inbound bandwidth from ATS, activated when connection is up
285    */
286   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
287
288   /**
289    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
290    */
291   struct GNUNET_TIME_Absolute connect_ts;
292
293   /**
294    * Timeout for ATS
295    * We asked ATS for a new address for this peer
296    */
297   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
298
299   /**
300    * Task the resets the peer state after due to an pending
301    * unsuccessful connection setup
302    */
303   GNUNET_SCHEDULER_TaskIdentifier state_reset;
304
305   /**
306    * How often has the other peer (recently) violated the inbound
307    * traffic limit?  Incremented by 10 per violation, decremented by 1
308    * per non-violation (for each time interval).
309    */
310   unsigned int quota_violation_count;
311
312
313   /**
314    * The current state of the peer
315    * Element of enum State
316    */
317   int state;
318
319 };
320
321
322 /**
323  * All known neighbours and their HELLOs.
324  */
325 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
326
327 /**
328  * Closure for connect_notify_cb and disconnect_notify_cb
329  */
330 static void *callback_cls;
331
332 /**
333  * Function to call when we connected to a neighbour.
334  */
335 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
336
337 /**
338  * Function to call when we disconnected from a neighbour.
339  */
340 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
341
342 /**
343  * counter for connected neighbours
344  */
345 static int neighbours_connected;
346
347 /**
348  * Lookup a neighbour entry in the neighbours hash map.
349  *
350  * @param pid identity of the peer to look up
351  * @return the entry, NULL if there is no existing record
352  */
353 static struct NeighbourMapEntry *
354 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
355 {
356   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
357 }
358
359 #define change_state(n, state, ...) change (n, state, __LINE__)
360
361 static int
362 is_connecting (struct NeighbourMapEntry *n)
363 {
364   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
365     return GNUNET_YES;
366   return GNUNET_NO;
367 }
368
369 static int
370 is_connected (struct NeighbourMapEntry *n)
371 {
372   if (n->state == S_CONNECTED)
373     return GNUNET_YES;
374   return GNUNET_NO;
375 }
376
377 static int
378 is_disconnecting (struct NeighbourMapEntry *n)
379 {
380   if (n->state == S_DISCONNECT)
381     return GNUNET_YES;
382   return GNUNET_NO;
383 }
384
385 static const char *
386 print_state (int state)
387 {
388   switch (state)
389   {
390   case S_CONNECTED:
391     return "S_CONNECTED";
392     break;
393   case S_CONNECT_RECV:
394     return "S_CONNECT_RECV";
395     break;
396   case S_CONNECT_RECV_ACK_SENT:
397     return "S_CONNECT_RECV_ACK_SENT";
398     break;
399   case S_CONNECT_SENT:
400     return "S_CONNECT_SENT";
401     break;
402   case S_DISCONNECT:
403     return "S_DISCONNECT";
404     break;
405   case S_NOT_CONNECTED:
406     return "S_NOT_CONNECTED";
407     break;
408   default:
409     GNUNET_break (0);
410     break;
411   }
412   return NULL;
413 }
414
415 static int
416 change (struct NeighbourMapEntry *n, int state, int line);
417
418 static void
419 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
420
421 static void
422 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
423 {
424   struct NeighbourMapEntry *n = cls;
425
426   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
427
428 #if DEBUG_TRANSPORT
429 #endif
430   /* This jut a temporary debug message to check if a the value
431    * SETUP_CONNECTION_TIMEOUT was choosen to small for slow machines
432    */
433   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
434               "Information for developers: Connection to peer `%s' %s failed in state `%s', resetting connection attempt \n",
435               GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name, n->addr,
436                                                     n->addrlen),
437               print_state (n->state));
438
439   GNUNET_STATISTICS_update (GST_stats,
440                             gettext_noop
441                             ("# failed connection attempts due to timeout"), 1,
442                             GNUNET_NO);
443
444   /* resetting state */
445   n->state = S_NOT_CONNECTED;
446
447   /* destroying address */
448   GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
449                                 n->addrlen, NULL);
450
451   /* request new address */
452   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
453     GNUNET_SCHEDULER_cancel (n->ats_suggest);
454   n->ats_suggest =
455       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
456                                     n);
457   GNUNET_ATS_suggest_address (GST_ats, &n->id);
458 }
459
460 static int
461 change (struct NeighbourMapEntry *n, int state, int line)
462 {
463   char *old = strdup (print_state (n->state));
464   char *new = strdup (print_state (state));
465
466   /* allowed transitions */
467   int allowed = GNUNET_NO;
468
469   switch (n->state)
470   {
471   case S_NOT_CONNECTED:
472     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
473         (state == S_DISCONNECT))
474     {
475       allowed = GNUNET_YES;
476
477       /* Schedule reset task */
478       if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT))
479       {
480         GNUNET_assert (n->state_reset == GNUNET_SCHEDULER_NO_TASK);
481         n->state_reset =
482             GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
483                                           n);
484       }
485       break;
486     }
487     break;
488   case S_CONNECT_RECV:
489     if ((state == S_NOT_CONNECTED) || (state == S_DISCONNECT) ||
490         (state == S_CONNECTED) ||
491         /* FIXME SENT -> RECV ISSUE! */ (state == S_CONNECT_SENT))
492     {
493       if ((state == S_CONNECTED) || (state == S_DISCONNECT) ||
494           (state == S_NOT_CONNECTED))
495       {
496 #if DEBUG_TRANSPORT
497         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498                     "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
499                     GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name,
500                                                           n->addr, n->addrlen),
501                     print_state (n->state), print_state (state));
502 #endif
503         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
504         GNUNET_SCHEDULER_cancel (n->state_reset);
505         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
506       }
507
508       allowed = GNUNET_YES;
509       break;
510     }
511     break;
512   case S_CONNECT_SENT:
513     if ((state == S_NOT_CONNECTED) || (state == S_CONNECTED) ||
514         (state == S_DISCONNECT) ||
515         /* FIXME SENT -> RECV ISSUE! */ (state == S_CONNECT_RECV))
516     {
517       if ((state == S_CONNECTED) || (state == S_DISCONNECT) ||
518           (state == S_NOT_CONNECTED))
519       {
520 #if DEBUG_TRANSPORT
521         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
522                     "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
523                     GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name,
524                                                           n->addr, n->addrlen),
525                     print_state (n->state), print_state (state));
526 #endif
527         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
528         GNUNET_SCHEDULER_cancel (n->state_reset);
529         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
530       }
531
532       allowed = GNUNET_YES;
533       break;
534     }
535     break;
536   case S_CONNECTED:
537     if (state == S_DISCONNECT)
538     {
539       allowed = GNUNET_YES;
540       break;
541     }
542     break;
543   case S_DISCONNECT:
544     /*
545      * if (state == S_NOT_CONNECTED)
546      * {
547      * allowed = GNUNET_YES;
548      * break;
549      * } */
550     break;
551   default:
552     GNUNET_break (0);
553     break;
554
555   }
556
557   if (allowed == GNUNET_NO)
558   {
559     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
560                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
561                 new, line);
562     GNUNET_break (0);
563     GNUNET_free (old);
564     GNUNET_free (new);
565     return GNUNET_SYSERR;
566   }
567
568   n->state = state;
569 #if DEBUG_TRANSPORT
570   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
572               GNUNET_i2s (&n->id), n, old, new, line);
573 #endif
574   GNUNET_free (old);
575   GNUNET_free (new);
576   return GNUNET_OK;
577 }
578
579 static ssize_t
580 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
581                   size_t msgbuf_size, uint32_t priority,
582                   struct GNUNET_TIME_Relative timeout, struct Session *session,
583                   const char *plugin_name, const void *addr, size_t addrlen,
584                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
585                   void *cont_cls)
586 {
587   struct GNUNET_TRANSPORT_PluginFunctions *papi;
588   size_t ret = GNUNET_SYSERR;
589
590   /* FIXME : ats returns an address with all values 0 */
591   if (plugin_name == NULL)
592   {
593     if (cont != NULL)
594       cont (cont_cls, target, GNUNET_SYSERR);
595     return GNUNET_SYSERR;
596   }
597
598   if ((session == NULL) && (addr == NULL) && (addrlen == 0))
599   {
600     if (cont != NULL)
601       cont (cont_cls, target, GNUNET_SYSERR);
602     return GNUNET_SYSERR;
603   }
604
605   papi = GST_plugins_find (plugin_name);
606   if (papi == NULL)
607   {
608     if (cont != NULL)
609       cont (cont_cls, target, GNUNET_SYSERR);
610     return GNUNET_SYSERR;
611   }
612
613   ret =
614       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
615                   addr, addrlen, GNUNET_YES, cont, cont_cls);
616
617   if (ret == -1)
618   {
619     if (cont != NULL)
620       cont (cont_cls, target, GNUNET_SYSERR);
621   }
622   return ret;
623 }
624
625 /**
626  * Task invoked to start a transmission to another peer.
627  *
628  * @param cls the 'struct NeighbourMapEntry'
629  * @param tc scheduler context
630  */
631 static void
632 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
633
634
635 /**
636  * We're done with our transmission attempt, continue processing.
637  *
638  * @param cls the 'struct MessageQueue' of the message
639  * @param receiver intended receiver
640  * @param success whether it worked or not
641  */
642 static void
643 transmit_send_continuation (void *cls,
644                             const struct GNUNET_PeerIdentity *receiver,
645                             int success)
646 {
647   struct MessageQueue *mq;
648   struct NeighbourMapEntry *n;
649
650   mq = cls;
651   n = mq->n;
652   if (NULL != n)
653   {
654     GNUNET_assert (n->is_active == mq);
655     n->is_active = NULL;
656     if (success == GNUNET_YES)
657     {
658       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
659       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
660     }
661   }
662 #if DEBUG_TRANSPORT
663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
664               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
665               (success == GNUNET_OK) ? "successful" : "FAILED");
666 #endif
667   if (NULL != mq->cont)
668     mq->cont (mq->cont_cls, success);
669   GNUNET_free (mq);
670 }
671
672
673 /**
674  * Check the ready list for the given neighbour and if a plugin is
675  * ready for transmission (and if we have a message), do so!
676  *
677  * @param n target peer for which to transmit
678  */
679 static void
680 try_transmission_to_peer (struct NeighbourMapEntry *n)
681 {
682   struct MessageQueue *mq;
683   struct GNUNET_TIME_Relative timeout;
684   ssize_t ret;
685
686   if (n->is_active != NULL)
687   {
688     GNUNET_break (0);
689     return;                     /* transmission already pending */
690   }
691   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
692   {
693     GNUNET_break (0);
694     return;                     /* currently waiting for bandwidth */
695   }
696   while (NULL != (mq = n->messages_head))
697   {
698     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
699     if (timeout.rel_value > 0)
700       break;
701     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
702     n->is_active = mq;
703     mq->n = n;
704     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
705   }
706   if (NULL == mq)
707     return;                     /* no more messages */
708
709   if (GST_plugins_find (n->plugin_name) == NULL)
710   {
711     GNUNET_break (0);
712     return;
713   }
714   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
715   n->is_active = mq;
716   mq->n = n;
717
718   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
719   {
720     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
721                 GNUNET_i2s (&n->id));
722     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
723     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
724     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
725     return;
726   }
727
728   ret =
729       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
730                         timeout, n->session, n->plugin_name, n->addr,
731                         n->addrlen, GNUNET_YES, &transmit_send_continuation,
732                         mq);
733   if (ret == -1)
734   {
735     /* failure, but 'send' would not call continuation in this case,
736      * so we need to do it here! */
737     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
738   }
739
740 }
741
742
743 /**
744  * Task invoked to start a transmission to another peer.
745  *
746  * @param cls the 'struct NeighbourMapEntry'
747  * @param tc scheduler context
748  */
749 static void
750 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
751 {
752   struct NeighbourMapEntry *n = cls;
753
754   GNUNET_assert (NULL != lookup_neighbour (&n->id));
755   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
756   try_transmission_to_peer (n);
757 }
758
759
760 /**
761  * Initialize the neighbours subsystem.
762  *
763  * @param cls closure for callbacks
764  * @param connect_cb function to call if we connect to a peer
765  * @param disconnect_cb function to call if we disconnect from a peer
766  */
767 void
768 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
769                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
770 {
771   callback_cls = cls;
772   connect_notify_cb = connect_cb;
773   disconnect_notify_cb = disconnect_cb;
774   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
775 }
776
777
778 static void
779 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
780                       int result)
781 {
782 #if DEBUG_TRANSPORT
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "Sending DISCONNECT message to peer `%4s': %i\n",
785               GNUNET_i2s (target), result);
786 #endif
787 }
788
789
790 static int
791 send_disconnect (const struct GNUNET_PeerIdentity *target,
792                  const char *plugin_name, const char *sender_address,
793                  uint16_t sender_address_len, struct Session *session)
794 {
795   size_t ret;
796   struct SessionDisconnectMessage disconnect_msg;
797
798 #if DEBUG_TRANSPORT
799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800               "Sending DISCONNECT message to peer `%4s'\n",
801               GNUNET_i2s (target));
802 #endif
803
804   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
805   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
806   disconnect_msg.reserved = htonl (0);
807   disconnect_msg.purpose.size =
808       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
809              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
810              sizeof (struct GNUNET_TIME_AbsoluteNBO));
811   disconnect_msg.purpose.purpose =
812       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
813   disconnect_msg.timestamp =
814       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
815   disconnect_msg.public_key = GST_my_public_key;
816   GNUNET_assert (GNUNET_OK ==
817                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
818                                          &disconnect_msg.purpose,
819                                          &disconnect_msg.signature));
820
821   ret =
822       send_with_plugin (target, (const char *) &disconnect_msg,
823                         sizeof (disconnect_msg), UINT32_MAX,
824                         GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name,
825                         sender_address, sender_address_len, GNUNET_YES,
826                         &send_disconnect_cont, NULL);
827
828   if (ret == GNUNET_SYSERR)
829     return GNUNET_SYSERR;
830
831   GNUNET_STATISTICS_update (GST_stats,
832                             gettext_noop
833                             ("# peers disconnected due to external request"), 1,
834                             GNUNET_NO);
835   return GNUNET_OK;
836 }
837
838 /**
839  * Disconnect from the given neighbour, clean up the record.
840  *
841  * @param n neighbour to disconnect from
842  */
843 static void
844 disconnect_neighbour (struct NeighbourMapEntry *n)
845 {
846   struct MessageQueue *mq;
847   int was_connected = is_connected (n);
848
849   /* send DISCONNECT MESSAGE */
850   if (is_connected (n) || is_connecting (n))
851   {
852     if (GNUNET_OK ==
853         send_disconnect (&n->id, n->plugin_name, n->addr, n->addrlen,
854                          n->session))
855       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
856                   GNUNET_i2s (&n->id));
857     else
858       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859                   "Could not send DISCONNECT_MSG to `%s'\n",
860                   GNUNET_i2s (&n->id));
861   }
862
863   if (is_connected(n))
864   {
865      GNUNET_ATS_address_in_use (GST_ats, &n->id, n->plugin_name,
866          n->addr, n->addrlen, n->session, GNUNET_NO);
867   }
868
869
870   if (is_disconnecting (n))
871     return;
872   change_state (n, S_DISCONNECT);
873   GST_validation_set_address_use (&n->id,
874                                   n->plugin_name,
875                                   n->session,
876                                   n->addr,
877                                   n->addrlen,
878                                   GNUNET_NO);
879
880   if (n->plugin_name != NULL)
881   {
882     struct GNUNET_TRANSPORT_PluginFunctions *papi;
883     papi = GST_plugins_find (n->plugin_name);
884     if (papi != NULL)
885       papi->disconnect (papi->cls, &n->id);
886   }
887
888   while (NULL != (mq = n->messages_head))
889   {
890     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
891     if (NULL != mq->cont)
892       mq->cont (mq->cont_cls, GNUNET_SYSERR);
893     GNUNET_free (mq);
894   }
895   if (NULL != n->is_active)
896   {
897     n->is_active->n = NULL;
898     n->is_active = NULL;
899   }
900   if (was_connected)
901   {
902     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
903     GNUNET_SCHEDULER_cancel (n->keepalive_task);
904     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
905     GNUNET_assert (neighbours_connected > 0);
906     neighbours_connected--;
907     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
908                               GNUNET_NO);
909     disconnect_notify_cb (callback_cls, &n->id);
910   }
911   GNUNET_assert (GNUNET_YES ==
912                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
913                                                        &n->id.hashPubKey, n));
914   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
915   {
916     GNUNET_SCHEDULER_cancel (n->ats_suggest);
917     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
918   }
919   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
920   {
921     GNUNET_SCHEDULER_cancel (n->timeout_task);
922     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
923   }
924   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
925   {
926     GNUNET_SCHEDULER_cancel (n->transmission_task);
927     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
928   }
929   if (NULL != n->plugin_name)
930   {
931     GNUNET_free (n->plugin_name);
932     n->plugin_name = NULL;
933   }
934   if (NULL != n->addr)
935   {
936     GNUNET_free (n->addr);
937     n->addr = NULL;
938     n->addrlen = 0;
939   }
940   n->session = NULL;
941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
942               GNUNET_i2s (&n->id), n);
943   GNUNET_free (n);
944 }
945
946
947 /**
948  * Peer has been idle for too long. Disconnect.
949  *
950  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
951  * @param tc scheduler context
952  */
953 static void
954 neighbour_timeout_task (void *cls,
955                         const struct GNUNET_SCHEDULER_TaskContext *tc)
956 {
957   struct NeighbourMapEntry *n = cls;
958
959   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
960
961   GNUNET_STATISTICS_update (GST_stats,
962                             gettext_noop
963                             ("# peers disconnected due to timeout"), 1,
964                             GNUNET_NO);
965   disconnect_neighbour (n);
966 }
967
968
969 /**
970  * Send another keepalive message.
971  *
972  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
973  * @param tc scheduler context
974  */
975 static void
976 neighbour_keepalive_task (void *cls,
977                           const struct GNUNET_SCHEDULER_TaskContext *tc)
978 {
979   struct NeighbourMapEntry *n = cls;
980   struct GNUNET_MessageHeader m;
981
982   n->keepalive_task =
983       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
984                                     &neighbour_keepalive_task, n);
985   GNUNET_assert (is_connected (n));
986   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
987                             GNUNET_NO);
988   m.size = htons (sizeof (struct GNUNET_MessageHeader));
989   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
990
991   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
992                     UINT32_MAX /* priority */ ,
993                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name,
994                     n->addr, n->addrlen, GNUNET_YES, NULL, NULL);
995 }
996
997
998 /**
999  * Disconnect from the given neighbour.
1000  *
1001  * @param cls unused
1002  * @param key hash of neighbour's public key (not used)
1003  * @param value the 'struct NeighbourMapEntry' of the neighbour
1004  */
1005 static int
1006 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1007 {
1008   struct NeighbourMapEntry *n = value;
1009
1010 #if DEBUG_TRANSPORT
1011   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1012               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1013 #endif
1014   if (is_connected (n))
1015     GNUNET_STATISTICS_update (GST_stats,
1016                               gettext_noop
1017                               ("# peers disconnected due to global disconnect"),
1018                               1, GNUNET_NO);
1019   disconnect_neighbour (n);
1020   return GNUNET_OK;
1021 }
1022
1023
1024 static void
1025 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1026 {
1027   struct NeighbourMapEntry *n = cls;
1028
1029   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1030
1031   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032               " ATS did not suggested address to connect to peer `%s'\n",
1033               GNUNET_i2s (&n->id));
1034
1035   disconnect_neighbour (n);
1036 }
1037
1038 /**
1039  * Cleanup the neighbours subsystem.
1040  */
1041 void
1042 GST_neighbours_stop ()
1043 {
1044   // This can happen during shutdown
1045   if (neighbours == NULL)
1046   {
1047     return;
1048   }
1049
1050   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1051                                          NULL);
1052   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1053   GNUNET_assert (neighbours_connected == 0);
1054   neighbours = NULL;
1055   callback_cls = NULL;
1056   connect_notify_cb = NULL;
1057   disconnect_notify_cb = NULL;
1058 }
1059
1060
1061 /**
1062  * We tried to send a SESSION_CONNECT message to another peer.  If this
1063  * succeeded, we change the state.  If it failed, we should tell
1064  * ATS to not use this address anymore (until it is re-validated).
1065  *
1066  * @param cls the 'struct NeighbourMapEntry'
1067  * @param success GNUNET_OK on success
1068  */
1069 static void
1070 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1071                            int success)
1072 {
1073   struct NeighbourMapEntry *n = cls;
1074
1075   GNUNET_assert (n != NULL);
1076   GNUNET_assert (!is_connected (n));
1077
1078   if (is_disconnecting (n))
1079     return;                     /* neighbour is going away */
1080
1081   if (GNUNET_YES != success)
1082   {
1083 #if DEBUG_TRANSPORT
1084     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085                 "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1086                 GNUNET_i2s (&n->id), n->plugin_name,
1087                 (n->addrlen ==
1088                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
1089                                                      n->addrlen), n->session);
1090 #endif
1091
1092     GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1093                                   n->addrlen, NULL);
1094
1095     change_state (n, S_NOT_CONNECTED);
1096
1097     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1098       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1099     n->ats_suggest =
1100         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1101                                       n);
1102     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1103     return;
1104   }
1105
1106
1107 }
1108
1109
1110 /**
1111  * We tried to switch addresses with an peer already connected. If it failed,
1112  * we should tell ATS to not use this address anymore (until it is re-validated).
1113  *
1114  * @param cls the 'struct NeighbourMapEntry'
1115  * @param success GNUNET_OK on success
1116  */
1117 static void
1118 send_switch_address_continuation (void *cls,
1119                                   const struct GNUNET_PeerIdentity *target,
1120                                   int success)
1121 {
1122   struct NeighbourMapEntry *n = cls;
1123
1124   GNUNET_assert (n != NULL);
1125   if (is_disconnecting (n))
1126     return;                     /* neighbour is going away */
1127
1128   GNUNET_assert (n->state == S_CONNECTED);
1129   if (GNUNET_YES != success)
1130   {
1131 #if DEBUG_TRANSPORT
1132     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133                 "Failed to switch connected peer `%s' to plugin `%s' address '%s' session %X, asking ATS for new address \n",
1134                 GNUNET_i2s (&n->id), n->plugin_name,
1135                 (n->addrlen ==
1136                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
1137                                                      n->addrlen), n->session);
1138 #endif
1139
1140     GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1141                                   n->addrlen, NULL);
1142
1143     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1144       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1145     n->ats_suggest =
1146         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1147                                       n);
1148     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1149     return;
1150   }
1151 }
1152
1153 /**
1154  * We tried to send a SESSION_CONNECT message to another peer.  If this
1155  * succeeded, we change the state.  If it failed, we should tell
1156  * ATS to not use this address anymore (until it is re-validated).
1157  *
1158  * @param cls the 'struct NeighbourMapEntry'
1159  * @param success GNUNET_OK on success
1160  */
1161 static void
1162 send_connect_ack_continuation (void *cls,
1163                                const struct GNUNET_PeerIdentity *target,
1164                                int success)
1165 {
1166   struct NeighbourMapEntry *n = cls;
1167
1168   GNUNET_assert (n != NULL);
1169
1170   if (GNUNET_YES == success)
1171     return;                     /* sending successful */
1172
1173   /* sending failed, ask for next address  */
1174 #if DEBUG_TRANSPORT
1175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1176               "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1177               GNUNET_i2s (&n->id), n->plugin_name,
1178               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name,
1179                                                                  n->addr,
1180                                                                  n->addrlen),
1181               n->session);
1182 #endif
1183   change_state (n, S_NOT_CONNECTED);
1184
1185   GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1186                                 n->addrlen, NULL);
1187
1188   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1189     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1190   n->ats_suggest =
1191       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1192                                     n);
1193   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1194 }
1195
1196 /**
1197  * For an existing neighbour record, set the active connection to
1198  * the given address.
1199  *
1200  * @param peer identity of the peer to switch the address for
1201  * @param plugin_name name of transport that delivered the PONG
1202  * @param address address of the other peer, NULL if other peer
1203  *                       connected to us
1204  * @param address_len number of bytes in address
1205  * @param session session to use (or NULL)
1206  * @param ats performance data
1207  * @param ats_count number of entries in ats
1208  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1209  *         connection is not up (yet)
1210  */
1211 int
1212 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1213                                        const char *plugin_name,
1214                                        const void *address, size_t address_len,
1215                                        struct Session *session,
1216                                        const struct GNUNET_ATS_Information *ats,
1217                                        uint32_t ats_count,
1218                                        struct GNUNET_BANDWIDTH_Value32NBO
1219                                        bandwidth_in,
1220                                        struct GNUNET_BANDWIDTH_Value32NBO
1221                                        bandwidth_out)
1222 {
1223   struct NeighbourMapEntry *n;
1224   struct SessionConnectMessage connect_msg;
1225   size_t msg_len;
1226   size_t ret;
1227   int checks_failed;
1228
1229   // This can happen during shutdown
1230   if (neighbours == NULL)
1231   {
1232     return GNUNET_NO;
1233   }
1234
1235   checks_failed = GNUNET_NO;
1236
1237   if (plugin_name == NULL)
1238   {
1239     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1240                 "ATS offered suggested us empty address: plugin NULL");
1241     GNUNET_break_op (0);
1242     checks_failed = GNUNET_YES;
1243   }
1244   if ((address == NULL) && (address_len == 0) && (session == NULL))
1245   {
1246     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1247                 "ATS offered suggested us empty address: address NULL & session NULL");
1248     GNUNET_break_op (0);
1249     checks_failed = GNUNET_YES;
1250   }
1251
1252   n = lookup_neighbour (peer);
1253   if (NULL == n)
1254     checks_failed = GNUNET_YES;
1255
1256   if (checks_failed == GNUNET_YES)
1257   {
1258     GNUNET_ATS_address_destroyed (GST_ats, peer, plugin_name, address,
1259                                   address_len, session);
1260     if (n != NULL)
1261       GNUNET_ATS_suggest_address (GST_ats, peer);
1262     return GNUNET_NO;
1263   }
1264
1265   /* checks successful and neighbour != NULL */
1266 #if DEBUG_TRANSPORT
1267   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1268               "ATS tells us to switch to plugin `%s' address '%s' session %X for %s peer `%s'\n",
1269               plugin_name,
1270               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
1271                                                                   address,
1272                                                                   address_len),
1273               session, (is_connected (n) ? "CONNECTED" : "NOT CONNECTED"),
1274               GNUNET_i2s (peer));
1275 #endif
1276
1277   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1278   {
1279     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1280     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1281   }
1282
1283   // do not switch addresses just update quotas
1284   if ((is_connected (n)) && (address_len == n->addrlen))
1285   {
1286     if ((0 == memcmp (address, n->addr, address_len)) &&
1287         (n->session == session))
1288     {
1289       struct QuotaSetMessage q_msg;
1290
1291 #if DEBUG_TRANSPORT
1292       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1293                   "Sending outbound quota of %u Bps and inbound quota of %u Bps for peer `%s' to all clients\n",
1294                   ntohl (n->bandwidth_out.value__),
1295                   ntohl (n->bandwidth_in.value__), GNUNET_i2s (peer));
1296 #endif
1297
1298       n->bandwidth_in = bandwidth_in;
1299       n->bandwidth_out = bandwidth_out;
1300       GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1301
1302       q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1303       q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1304       q_msg.quota = n->bandwidth_out;
1305       q_msg.peer = (*peer);
1306       GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1307       return GNUNET_NO;
1308     }
1309   }
1310   if (n->state == S_CONNECTED) 
1311     GST_validation_set_address_use (&n->id,
1312                                     n->plugin_name,
1313                                     n->session,
1314                                     n->addr,
1315                                     n->addrlen,
1316                                     GNUNET_NO);
1317   GNUNET_free_non_null (n->addr);
1318   n->addr = GNUNET_malloc (address_len);
1319   memcpy (n->addr, address, address_len);
1320   n->bandwidth_in = bandwidth_in;
1321   n->bandwidth_out = bandwidth_out;
1322   n->addrlen = address_len;
1323   n->session = session;
1324   GNUNET_free_non_null (n->plugin_name);
1325   n->plugin_name = GNUNET_strdup (plugin_name);
1326   GNUNET_SCHEDULER_cancel (n->timeout_task);
1327   n->timeout_task =
1328       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1329                                     &neighbour_timeout_task, n);
1330   if (n->state == S_CONNECTED)
1331     GST_validation_set_address_use (&n->id,
1332                                     n->plugin_name,
1333                                     n->session,
1334                                     n->addr,
1335                                     n->addrlen,
1336                                     GNUNET_YES);
1337
1338
1339   if (n->state == S_DISCONNECT)
1340   {
1341     /* We are disconnecting, nothing to do here */
1342     return GNUNET_NO;
1343   }
1344   /* We are not connected/connecting and initiate a fresh connect */
1345   if (n->state == S_NOT_CONNECTED)
1346   {
1347     msg_len = sizeof (struct SessionConnectMessage);
1348     connect_msg.header.size = htons (msg_len);
1349     connect_msg.header.type =
1350         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1351     connect_msg.reserved = htonl (0);
1352     connect_msg.timestamp =
1353         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1354
1355     change_state (n, S_CONNECT_SENT);
1356
1357     ret =
1358         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1359                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1360                           plugin_name, address, address_len, GNUNET_YES,
1361                           &send_connect_continuation, n);
1362
1363
1364     return GNUNET_NO;
1365   }
1366   /* We received a CONNECT message and asked ATS for an address */
1367   else if (n->state == S_CONNECT_RECV)
1368   {
1369     msg_len = sizeof (struct SessionConnectMessage);
1370     connect_msg.header.size = htons (msg_len);
1371     connect_msg.header.type =
1372         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1373     connect_msg.reserved = htonl (0);
1374     connect_msg.timestamp =
1375         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1376
1377     ret =
1378         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1379                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1380                           plugin_name, address, address_len, GNUNET_YES,
1381                           &send_connect_ack_continuation, n);
1382     return GNUNET_NO;
1383   }
1384   /* connected peer is switching addresses */
1385   else if (n->state == S_CONNECTED)
1386   {
1387     msg_len = sizeof (struct SessionConnectMessage);
1388     connect_msg.header.size = htons (msg_len);
1389     connect_msg.header.type =
1390         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1391     connect_msg.reserved = htonl (0);
1392     connect_msg.timestamp =
1393         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1394
1395     ret =
1396         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1397                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1398                           plugin_name, address, address_len, GNUNET_YES,
1399                           &send_switch_address_continuation, n);
1400     if (ret == GNUNET_SYSERR)
1401     {
1402       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403                   "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n",
1404                   GNUNET_i2s (peer), plugin_name,
1405                   (address_len ==
1406                    0) ? "<inbound>" : GST_plugins_a2s (plugin_name, address,
1407                                                        address_len), session);
1408     }
1409     return GNUNET_NO;
1410   }
1411   else if (n->state == S_CONNECT_SENT)
1412   {
1413     return GNUNET_NO;
1414   }
1415   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1416               "Invalid connection state to switch addresses %u \n", n->state);
1417   GNUNET_break_op (0);
1418   return GNUNET_NO;
1419 }
1420
1421
1422 /**
1423  * Obtain current latency information for the given neighbour.
1424  *
1425  * @param peer 
1426  * @return observed latency of the address, FOREVER if the address was
1427  *         never successfully validated
1428  */
1429 struct GNUNET_TIME_Relative
1430 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1431 {
1432   struct NeighbourMapEntry *n;
1433
1434   n = lookup_neighbour (peer);
1435   if (NULL == n)
1436     return GNUNET_TIME_UNIT_FOREVER_REL;
1437   return GST_validation_get_address_latency (peer,
1438                                              n->plugin_name,
1439                                              n->session,
1440                                              n->addr,
1441                                              n->addrlen);
1442 }
1443
1444
1445 /**
1446  * Create an entry in the neighbour map for the given peer
1447  *
1448  * @param peer peer to create an entry for
1449  * @return new neighbour map entry
1450  */
1451 static struct NeighbourMapEntry *
1452 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1453 {
1454   struct NeighbourMapEntry *n;
1455
1456 #if DEBUG_TRANSPORT
1457   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1458               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1459 #endif
1460   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1461   n->id = *peer;
1462   n->state = S_NOT_CONNECTED;
1463   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1464                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1465                                  MAX_BANDWIDTH_CARRY_S);
1466   n->timeout_task =
1467       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1468                                     &neighbour_timeout_task, n);
1469   GNUNET_assert (GNUNET_OK ==
1470                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1471                                                     &n->id.hashPubKey, n,
1472                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1473   return n;
1474 }
1475
1476
1477 /**
1478  * Try to create a connection to the given target (eventually).
1479  *
1480  * @param target peer to try to connect to
1481  */
1482 void
1483 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1484 {
1485   struct NeighbourMapEntry *n;
1486
1487   // This can happen during shutdown
1488   if (neighbours == NULL)
1489   {
1490     return;
1491   }
1492 #if DEBUG_TRANSPORT
1493   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1494               GNUNET_i2s (target));
1495 #endif
1496   if (0 ==
1497       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1498   {
1499     /* my own hello */
1500     return;
1501   }
1502   n = lookup_neighbour (target);
1503
1504   if (NULL != n)
1505   {
1506     if ((is_connected (n)) || (is_connecting (n)))
1507       return;                   /* already connecting or connected */
1508     if (is_disconnecting (n))
1509       change_state (n, S_NOT_CONNECTED);
1510   }
1511
1512
1513   if (n == NULL)
1514     n = setup_neighbour (target);
1515 #if DEBUG_TRANSPORT
1516   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1517               "Asking ATS for suggested address to connect to peer `%s'\n",
1518               GNUNET_i2s (&n->id));
1519 #endif
1520
1521   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1522 }
1523
1524 /**
1525  * Test if we're connected to the given peer.
1526  *
1527  * @param target peer to test
1528  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1529  */
1530 int
1531 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1532 {
1533   struct NeighbourMapEntry *n;
1534
1535   // This can happen during shutdown
1536   if (neighbours == NULL)
1537   {
1538     return GNUNET_NO;
1539   }
1540
1541   n = lookup_neighbour (target);
1542
1543   if ((NULL == n) || (!is_connected (n)))
1544     return GNUNET_NO;           /* not connected */
1545   return GNUNET_YES;
1546 }
1547
1548
1549 /**
1550  * A session was terminated. Take note.
1551  *
1552  * @param peer identity of the peer where the session died
1553  * @param session session that is gone
1554  */
1555 void
1556 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1557                                    struct Session *session)
1558 {
1559   struct NeighbourMapEntry *n;
1560
1561   // This can happen during shutdown
1562   if (neighbours == NULL)
1563   {
1564     return;
1565   }
1566
1567 #if DEBUG_TRANSPORT
1568   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1569               session, GNUNET_i2s (peer));
1570 #endif
1571
1572   n = lookup_neighbour (peer);
1573   if (NULL == n)
1574     return;
1575   if (session != n->session)
1576     return;                     /* doesn't affect us */
1577
1578   n->session = NULL;
1579   GNUNET_free (n->addr);
1580   n->addr = NULL;
1581   n->addrlen = 0;
1582
1583   /* not connected anymore anyway, shouldn't matter */
1584   if ((!is_connected (n)) && (!is_connecting (n)))
1585     return;
1586
1587   /* We are connected, so ask ATS to switch addresses */
1588   GNUNET_SCHEDULER_cancel (n->timeout_task);
1589   n->timeout_task =
1590       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1591                                     &neighbour_timeout_task, n);
1592   /* try QUICKLY to re-establish a connection, reduce timeout! */
1593   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1594     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1595   n->ats_suggest =
1596       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1597                                     n);
1598   GNUNET_ATS_suggest_address (GST_ats, peer);
1599 }
1600
1601
1602 /**
1603  * Transmit a message to the given target using the active connection.
1604  *
1605  * @param target destination
1606  * @param msg message to send
1607  * @param msg_size number of bytes in msg
1608  * @param timeout when to fail with timeout
1609  * @param cont function to call when done
1610  * @param cont_cls closure for 'cont'
1611  */
1612 void
1613 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1614                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1615                      GST_NeighbourSendContinuation cont, void *cont_cls)
1616 {
1617   struct NeighbourMapEntry *n;
1618   struct MessageQueue *mq;
1619
1620   // This can happen during shutdown
1621   if (neighbours == NULL)
1622   {
1623     return;
1624   }
1625
1626   n = lookup_neighbour (target);
1627   if ((n == NULL) || (!is_connected (n)))
1628   {
1629     GNUNET_STATISTICS_update (GST_stats,
1630                               gettext_noop
1631                               ("# messages not sent (no such peer or not connected)"),
1632                               1, GNUNET_NO);
1633 #if DEBUG_TRANSPORT
1634     if (n == NULL)
1635       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636                   "Could not send message to peer `%s': unknown neighbour",
1637                   GNUNET_i2s (target));
1638     else if (!is_connected (n))
1639       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1640                   "Could not send message to peer `%s': not connected\n",
1641                   GNUNET_i2s (target));
1642 #endif
1643     if (NULL != cont)
1644       cont (cont_cls, GNUNET_SYSERR);
1645     return;
1646   }
1647
1648   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
1649   {
1650     GNUNET_STATISTICS_update (GST_stats,
1651                               gettext_noop
1652                               ("# messages not sent (no such peer or not connected)"),
1653                               1, GNUNET_NO);
1654 #if DEBUG_TRANSPORT
1655     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1656                 "Could not send message to peer `%s': no address available\n",
1657                 GNUNET_i2s (target));
1658 #endif
1659
1660     if (NULL != cont)
1661       cont (cont_cls, GNUNET_SYSERR);
1662     return;
1663   }
1664
1665   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1666   GNUNET_STATISTICS_update (GST_stats,
1667                             gettext_noop
1668                             ("# bytes in message queue for other peers"),
1669                             msg_size, GNUNET_NO);
1670   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1671   mq->cont = cont;
1672   mq->cont_cls = cont_cls;
1673   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1674   memcpy (&mq[1], msg, msg_size);
1675   mq->message_buf = (const char *) &mq[1];
1676   mq->message_buf_size = msg_size;
1677   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1678   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1679
1680   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1681       (NULL == n->is_active))
1682     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1683 }
1684
1685
1686 /**
1687  * We have received a message from the given sender.  How long should
1688  * we delay before receiving more?  (Also used to keep the peer marked
1689  * as live).
1690  *
1691  * @param sender sender of the message
1692  * @param size size of the message
1693  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1694  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1695  *                   GNUNET_SYSERR if the connection is not fully up yet
1696  * @return how long to wait before reading more from this sender
1697  */
1698 struct GNUNET_TIME_Relative
1699 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1700                                         *sender, ssize_t size, int *do_forward)
1701 {
1702   struct NeighbourMapEntry *n;
1703   struct GNUNET_TIME_Relative ret;
1704
1705   // This can happen during shutdown
1706   if (neighbours == NULL)
1707   {
1708     return GNUNET_TIME_UNIT_FOREVER_REL;
1709   }
1710
1711   n = lookup_neighbour (sender);
1712   if (n == NULL)
1713   {
1714     GST_neighbours_try_connect (sender);
1715     n = lookup_neighbour (sender);
1716     if (NULL == n)
1717     {
1718       GNUNET_STATISTICS_update (GST_stats,
1719                                 gettext_noop
1720                                 ("# messages discarded due to lack of neighbour record"),
1721                                 1, GNUNET_NO);
1722       *do_forward = GNUNET_NO;
1723       return GNUNET_TIME_UNIT_ZERO;
1724     }
1725   }
1726   if (!is_connected (n))
1727   {
1728     *do_forward = GNUNET_SYSERR;
1729     return GNUNET_TIME_UNIT_ZERO;
1730   }
1731   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1732   {
1733     n->quota_violation_count++;
1734 #if DEBUG_TRANSPORT
1735     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1736                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1737                 n->in_tracker.available_bytes_per_s__,
1738                 n->quota_violation_count);
1739 #endif
1740     /* Discount 32k per violation */
1741     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1742   }
1743   else
1744   {
1745     if (n->quota_violation_count > 0)
1746     {
1747       /* try to add 32k back */
1748       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1749       n->quota_violation_count--;
1750     }
1751   }
1752   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1753   {
1754     GNUNET_STATISTICS_update (GST_stats,
1755                               gettext_noop
1756                               ("# bandwidth quota violations by other peers"),
1757                               1, GNUNET_NO);
1758     *do_forward = GNUNET_NO;
1759     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1760   }
1761   *do_forward = GNUNET_YES;
1762   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1763   if (ret.rel_value > 0)
1764   {
1765 #if DEBUG_TRANSPORT
1766     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1767                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1768                 (unsigned long long) n->in_tracker.
1769                 consumption_since_last_update__,
1770                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1771                 (unsigned long long) ret.rel_value);
1772 #endif
1773     GNUNET_STATISTICS_update (GST_stats,
1774                               gettext_noop ("# ms throttling suggested"),
1775                               (int64_t) ret.rel_value, GNUNET_NO);
1776   }
1777   return ret;
1778 }
1779
1780
1781 /**
1782  * Keep the connection to the given neighbour alive longer,
1783  * we received a KEEPALIVE (or equivalent).
1784  *
1785  * @param neighbour neighbour to keep alive
1786  */
1787 void
1788 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1789 {
1790   struct NeighbourMapEntry *n;
1791
1792   // This can happen during shutdown
1793   if (neighbours == NULL)
1794   {
1795     return;
1796   }
1797
1798   n = lookup_neighbour (neighbour);
1799   if (NULL == n)
1800   {
1801     GNUNET_STATISTICS_update (GST_stats,
1802                               gettext_noop
1803                               ("# KEEPALIVE messages discarded (not connected)"),
1804                               1, GNUNET_NO);
1805     return;
1806   }
1807   GNUNET_SCHEDULER_cancel (n->timeout_task);
1808   n->timeout_task =
1809       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1810                                     &neighbour_timeout_task, n);
1811 }
1812
1813
1814 /**
1815  * Change the incoming quota for the given peer.
1816  *
1817  * @param neighbour identity of peer to change qutoa for
1818  * @param quota new quota
1819  */
1820 void
1821 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1822                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1823 {
1824   struct NeighbourMapEntry *n;
1825
1826   // This can happen during shutdown
1827   if (neighbours == NULL)
1828   {
1829     return;
1830   }
1831
1832   n = lookup_neighbour (neighbour);
1833   if (n == NULL)
1834   {
1835     GNUNET_STATISTICS_update (GST_stats,
1836                               gettext_noop
1837                               ("# SET QUOTA messages ignored (no such peer)"),
1838                               1, GNUNET_NO);
1839     return;
1840   }
1841   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1842   if (0 != ntohl (quota.value__))
1843     return;
1844 #if DEBUG_TRANSPORT
1845   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1846               GNUNET_i2s (&n->id), "SET_QUOTA");
1847 #endif
1848   if (is_connected (n))
1849     GNUNET_STATISTICS_update (GST_stats,
1850                               gettext_noop ("# disconnects due to quota of 0"),
1851                               1, GNUNET_NO);
1852   disconnect_neighbour (n);
1853 }
1854
1855
1856 /**
1857  * Closure for the neighbours_iterate function.
1858  */
1859 struct IteratorContext
1860 {
1861   /**
1862    * Function to call on each connected neighbour.
1863    */
1864   GST_NeighbourIterator cb;
1865
1866   /**
1867    * Closure for 'cb'.
1868    */
1869   void *cb_cls;
1870 };
1871
1872
1873 /**
1874  * Call the callback from the closure for each connected neighbour.
1875  *
1876  * @param cls the 'struct IteratorContext'
1877  * @param key the hash of the public key of the neighbour
1878  * @param value the 'struct NeighbourMapEntry'
1879  * @return GNUNET_OK (continue to iterate)
1880  */
1881 static int
1882 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1883 {
1884   struct IteratorContext *ic = cls;
1885   struct NeighbourMapEntry *n = value;
1886
1887   if (!is_connected (n))
1888     return GNUNET_OK;
1889
1890   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->plugin_name, n->addr, n->addrlen);
1891   return GNUNET_OK;
1892 }
1893
1894
1895 /**
1896  * Iterate over all connected neighbours.
1897  *
1898  * @param cb function to call
1899  * @param cb_cls closure for cb
1900  */
1901 void
1902 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1903 {
1904   struct IteratorContext ic;
1905
1906   // This can happen during shutdown
1907   if (neighbours == NULL)
1908   {
1909     return;
1910   }
1911
1912   ic.cb = cb;
1913   ic.cb_cls = cb_cls;
1914   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1915 }
1916
1917 /**
1918  * If we have an active connection to the given target, it must be shutdown.
1919  *
1920  * @param target peer to disconnect from
1921  */
1922 void
1923 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1924 {
1925   struct NeighbourMapEntry *n;
1926
1927   // This can happen during shutdown
1928   if (neighbours == NULL)
1929   {
1930     return;
1931   }
1932
1933   n = lookup_neighbour (target);
1934   if (NULL == n)
1935     return;                     /* not active */
1936   if (is_connected (n))
1937   {
1938     send_disconnect (&n->id, n->plugin_name, n->addr, n->addrlen, n->session);
1939
1940     n = lookup_neighbour (target);
1941     if (NULL == n)
1942       return;                   /* gone already */
1943   }
1944   disconnect_neighbour (n);
1945 }
1946
1947
1948 /**
1949  * We received a disconnect message from the given peer,
1950  * validate and process.
1951  *
1952  * @param peer sender of the message
1953  * @param msg the disconnect message
1954  */
1955 void
1956 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
1957                                           *peer,
1958                                           const struct GNUNET_MessageHeader
1959                                           *msg)
1960 {
1961   struct NeighbourMapEntry *n;
1962   const struct SessionDisconnectMessage *sdm;
1963   GNUNET_HashCode hc;
1964
1965 #if DEBUG_TRANSPORT
1966   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967               "Received DISCONNECT message from peer `%s'\n",
1968               GNUNET_i2s (peer));
1969 #endif
1970
1971   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
1972   {
1973     // GNUNET_break_op (0);
1974     GNUNET_STATISTICS_update (GST_stats,
1975                               gettext_noop
1976                               ("# disconnect messages ignored (old format)"), 1,
1977                               GNUNET_NO);
1978     return;
1979   }
1980   sdm = (const struct SessionDisconnectMessage *) msg;
1981   n = lookup_neighbour (peer);
1982   if (NULL == n)
1983     return;                     /* gone already */
1984   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
1985       n->connect_ts.abs_value)
1986   {
1987     GNUNET_STATISTICS_update (GST_stats,
1988                               gettext_noop
1989                               ("# disconnect messages ignored (timestamp)"), 1,
1990                               GNUNET_NO);
1991     return;
1992   }
1993   GNUNET_CRYPTO_hash (&sdm->public_key,
1994                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1995                       &hc);
1996   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
1997   {
1998     GNUNET_break_op (0);
1999     return;
2000   }
2001   if (ntohl (sdm->purpose.size) !=
2002       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2003       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2004       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2005   {
2006     GNUNET_break_op (0);
2007     return;
2008   }
2009   if (GNUNET_OK !=
2010       GNUNET_CRYPTO_rsa_verify
2011       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2012        &sdm->signature, &sdm->public_key))
2013   {
2014     GNUNET_break_op (0);
2015     return;
2016   }
2017   GST_neighbours_force_disconnect (peer);
2018 }
2019
2020 /**
2021  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2022  * Consider switching to it.
2023  *
2024  * @param message possibly a 'struct SessionConnectMessage' (check format)
2025  * @param peer identity of the peer to switch the address for
2026  * @param plugin_name name of transport that delivered the PONG
2027  * @param address address of the other peer, NULL if other peer
2028  *                       connected to us
2029  * @param address_len number of bytes in address
2030  * @param session session to use (or NULL)
2031  * @param ats performance data
2032  * @param ats_count number of entries in ats
2033   */
2034 void
2035 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2036                                    const struct GNUNET_PeerIdentity *peer,
2037                                    const char *plugin_name,
2038                                    const char *sender_address,
2039                                    uint16_t sender_address_len,
2040                                    struct Session *session,
2041                                    const struct GNUNET_ATS_Information *ats,
2042                                    uint32_t ats_count)
2043 {
2044   const struct SessionConnectMessage *scm;
2045   struct QuotaSetMessage q_msg;
2046   struct GNUNET_MessageHeader msg;
2047   struct NeighbourMapEntry *n;
2048   size_t msg_len;
2049   size_t ret;
2050   int was_connected;
2051
2052 #if DEBUG_TRANSPORT
2053   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2054               "Received CONNECT_ACK message from peer `%s'\n",
2055               GNUNET_i2s (peer));
2056 #endif
2057
2058   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2059   {
2060     GNUNET_break_op (0);
2061     return;
2062   }
2063
2064   scm = (const struct SessionConnectMessage *) message;
2065   GNUNET_break_op (ntohl (scm->reserved) == 0);
2066   n = lookup_neighbour (peer);
2067   if (NULL == n)
2068     n = setup_neighbour (peer);
2069
2070   if (!is_connecting(n))
2071   {
2072     GNUNET_STATISTICS_update (GST_stats,
2073         gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
2074         GNUNET_NO);
2075     return;
2076   }
2077
2078   if (NULL != session)
2079     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2080                      "transport-ats",
2081                      "Giving ATS session %p of plugin %s for peer %s\n",
2082                      session, plugin_name, GNUNET_i2s (peer));
2083   GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2084                              sender_address_len, session, ats, ats_count);
2085
2086   was_connected = is_connected (n);
2087   if (!is_connected (n))
2088   {
2089     change_state (n, S_CONNECTED);
2090     GST_validation_set_address_use (&n->id,
2091                                     n->plugin_name,
2092                                     n->session,
2093                                     n->addr,
2094                                     n->addrlen,
2095                                     GNUNET_YES);
2096   }
2097
2098   GNUNET_ATS_address_in_use (GST_ats, &n->id, n->plugin_name, n->addr,
2099                              n->addrlen, n->addr, GNUNET_YES);
2100
2101 #if DEBUG_TRANSPORT
2102   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2103               "Setting inbound quota of %u for peer `%s' to \n",
2104               ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id));
2105 #endif
2106   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2107
2108   /* send ACK (ACK) */
2109   msg_len = sizeof (msg);
2110   msg.size = htons (msg_len);
2111   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2112
2113   ret =
2114       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2115                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2116                         n->plugin_name, n->addr, n->addrlen, GNUNET_YES, NULL,
2117                         NULL);
2118
2119   if (ret == GNUNET_SYSERR)
2120     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2121                 "Failed to send SESSION_ACK to `%4s' using plugin `%s' address '%s' session %X\n",
2122                 GNUNET_i2s (&n->id), n->plugin_name,
2123                 (n->addrlen ==
2124                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2125                                                      n->addrlen), n->session);
2126
2127
2128   if (!was_connected)
2129   {
2130     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2131       n->keepalive_task =
2132           GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2133                                         &neighbour_keepalive_task, n);
2134
2135     neighbours_connected++;
2136     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2137                               GNUNET_NO);
2138 #if DEBUG_TRANSPORT
2139     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2140                 "Notify about connect of `%4s' using plugin `%s' address '%s' session %X LINE %u\n",
2141                 GNUNET_i2s (&n->id), n->plugin_name,
2142                 (n->addrlen ==
2143                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2144                                                      n->addrlen), n->session,
2145                 __LINE__);
2146 #endif
2147     connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2148   }
2149
2150 #if DEBUG_TRANSPORT
2151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2152               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2153               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2154 #endif
2155   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2156   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2157   q_msg.quota = n->bandwidth_out;
2158   q_msg.peer = (*peer);
2159   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2160
2161 }
2162
2163 void
2164 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2165                            const struct GNUNET_PeerIdentity *peer,
2166                            const char *plugin_name, const char *sender_address,
2167                            uint16_t sender_address_len, struct Session *session,
2168                            const struct GNUNET_ATS_Information *ats,
2169                            uint32_t ats_count)
2170 {
2171   struct NeighbourMapEntry *n;
2172   struct QuotaSetMessage q_msg;
2173   int was_connected;
2174
2175 #if DEBUG_TRANSPORT
2176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2177               GNUNET_i2s (peer));
2178 #endif
2179
2180   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2181   {
2182     GNUNET_break_op (0);
2183     return;
2184   }
2185
2186   n = lookup_neighbour (peer);
2187   if (NULL == n)
2188   {
2189     send_disconnect (peer, plugin_name, sender_address, sender_address_len,
2190                      session);
2191     GNUNET_break (0);
2192     return;
2193   }
2194
2195   if (is_connected (n))
2196     return;
2197
2198   if (!is_connecting(n))
2199   {
2200     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2201                               GNUNET_NO);
2202     return;
2203   }
2204
2205   if (NULL != session)
2206     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2207                      "transport-ats",
2208                      "Giving ATS session %p of plugin %s for peer %s\n",
2209                      session, plugin_name, GNUNET_i2s (peer));
2210   GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2211                              sender_address_len, session, ats, ats_count);
2212
2213   was_connected = is_connected (n);
2214   change_state (n, S_CONNECTED);
2215
2216   GNUNET_ATS_address_in_use (GST_ats, &n->id, n->plugin_name, n->addr,
2217                              n->addrlen, n->addr, GNUNET_YES);
2218
2219   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2220
2221   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2222     n->keepalive_task =
2223         GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2224                                       &neighbour_keepalive_task, n);
2225
2226   if (!was_connected)
2227   {
2228     GST_validation_set_address_use (&n->id,
2229                                     n->plugin_name,
2230                                     n->session,
2231                                     n->addr,
2232                                     n->addrlen,
2233                                     GNUNET_YES);
2234     neighbours_connected++;
2235     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2236                               GNUNET_NO);
2237
2238 #if DEBUG_TRANSPORT
2239     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2240                 "Notify about connect of `%4s' using plugin `%s' address '%s' session %X LINE %u\n",
2241                 GNUNET_i2s (&n->id), n->plugin_name,
2242                 (n->addrlen ==
2243                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2244                                                      n->addrlen), n->session,
2245                 __LINE__);
2246 #endif
2247     connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2248   }
2249 #if DEBUG_TRANSPORT
2250   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2251               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2252               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2253 #endif
2254   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2255   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2256   q_msg.quota = n->bandwidth_out;
2257   q_msg.peer = (*peer);
2258   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2259 }
2260
2261 struct BlackListCheckContext
2262 {
2263   struct GNUNET_ATS_Information *ats;
2264
2265   uint32_t ats_count;
2266
2267   struct Session *session;
2268
2269   char *sender_address;
2270
2271   uint16_t sender_address_len;
2272
2273   char *plugin_name;
2274
2275   struct GNUNET_TIME_Absolute ts;
2276 };
2277
2278
2279 static void
2280 handle_connect_blacklist_cont (void *cls,
2281                                const struct GNUNET_PeerIdentity *peer,
2282                                int result)
2283 {
2284   struct NeighbourMapEntry *n;
2285   struct BlackListCheckContext *bcc = cls;
2286
2287 #if DEBUG_TRANSPORT
2288   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2289               "Blacklist check due to CONNECT message: `%s'\n",
2290               GNUNET_i2s (peer),
2291               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2292 #endif
2293
2294   /* not allowed */
2295   if (GNUNET_OK != result)
2296   {
2297     GNUNET_free (bcc);
2298     return;
2299   }
2300
2301   n = lookup_neighbour (peer);
2302   if (NULL == n)
2303     n = setup_neighbour (peer);
2304
2305   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2306   {
2307     if (NULL != bcc->session)
2308       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2309                        "transport-ats",
2310                        "Giving ATS session %p of plugin %s address `%s' for peer %s\n",
2311                        bcc->session, bcc->plugin_name,
2312                        GST_plugins_a2s (bcc->plugin_name, bcc->sender_address,
2313                                         bcc->sender_address_len),
2314                        GNUNET_i2s (peer));
2315     GNUNET_ATS_address_update (GST_ats, peer, bcc->plugin_name,
2316                                bcc->sender_address, bcc->sender_address_len,
2317                                bcc->session, bcc->ats, bcc->ats_count);
2318     n->connect_ts = bcc->ts;
2319   }
2320
2321   GNUNET_free (bcc);
2322
2323   if (n->state != S_CONNECT_RECV)
2324     change_state (n, S_CONNECT_RECV);
2325
2326   /* Ask ATS for an address to connect via that address */
2327   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2328     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2329   n->ats_suggest =
2330       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2331                                     n);
2332   GNUNET_ATS_suggest_address (GST_ats, peer);
2333 }
2334
2335 /**
2336  * We received a 'SESSION_CONNECT' message from the other peer.
2337  * Consider switching to it.
2338  *
2339  * @param message possibly a 'struct SessionConnectMessage' (check format)
2340  * @param peer identity of the peer to switch the address for
2341  * @param plugin_name name of transport that delivered the PONG
2342  * @param address address of the other peer, NULL if other peer
2343  *                       connected to us
2344  * @param address_len number of bytes in address
2345  * @param session session to use (or NULL)
2346  * @param ats performance data
2347  * @param ats_count number of entries in ats (excluding 0-termination)
2348   */
2349 void
2350 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2351                                const struct GNUNET_PeerIdentity *peer,
2352                                const char *plugin_name,
2353                                const char *sender_address,
2354                                uint16_t sender_address_len,
2355                                struct Session *session,
2356                                const struct GNUNET_ATS_Information *ats,
2357                                uint32_t ats_count)
2358 {
2359   const struct SessionConnectMessage *scm;
2360   struct NeighbourMapEntry *n;
2361   struct BlackListCheckContext *bcc = NULL;
2362
2363 #if DEBUG_TRANSPORT
2364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2365               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2366 #endif
2367
2368   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2369   {
2370     GNUNET_break_op (0);
2371     return;
2372   }
2373
2374   scm = (const struct SessionConnectMessage *) message;
2375   GNUNET_break_op (ntohl (scm->reserved) == 0);
2376
2377   n = lookup_neighbour (peer);
2378   if (n != NULL)
2379   {
2380     /* connected peer switches addresses */
2381     if (is_connected (n))
2382     {
2383       GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2384                                  sender_address_len, session, ats, ats_count);
2385       return;
2386     }
2387   }
2388
2389   /* we are not connected to this peer */
2390   /* do blacklist check */
2391   bcc =
2392       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2393                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1) +
2394                      sender_address_len + strlen (plugin_name) + 1);
2395
2396   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2397
2398   bcc->ats_count = ats_count + 1;
2399   bcc->sender_address_len = sender_address_len;
2400   bcc->session = session;
2401
2402   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2403   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2404   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2405   bcc->ats[ats_count].value = htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2406   bcc->sender_address = (char *) &bcc->ats[ats_count + 1];
2407   memcpy (bcc->sender_address, sender_address, sender_address_len);
2408
2409   bcc->plugin_name = &bcc->sender_address[sender_address_len];
2410   strcpy (bcc->plugin_name, plugin_name);
2411
2412   GST_blacklist_test_allowed (peer, plugin_name, handle_connect_blacklist_cont,
2413                               bcc);
2414 }
2415
2416
2417 /* end of file gnunet-service-transport_neighbours.c */