workaround for:
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours?
52  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
53  * send 3 keepalives in each interval, so 3 messages would need to be
54  * lost in a row for a disconnect).
55  */
56 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
57
58
59 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
60
61
62 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
63
64
65 /**
66  * Entry in neighbours.
67  */
68 struct NeighbourMapEntry;
69
70 /**
71  * Message a peer sends to another to indicate its
72  * preference for communicating via a particular
73  * session (and the desire to establish a real
74  * connection).
75  */
76 struct SessionConnectMessage
77 {
78   /**
79    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
80    */
81   struct GNUNET_MessageHeader header;
82
83   /**
84    * Always zero.
85    */
86   uint32_t reserved GNUNET_PACKED;
87
88   /**
89    * Absolute time at the sender.  Only the most recent connect
90    * message implies which session is preferred by the sender.
91    */
92   struct GNUNET_TIME_AbsoluteNBO timestamp;
93
94 };
95
96
97 struct SessionDisconnectMessage
98 {
99   /**
100    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Always zero.
106    */
107   uint32_t reserved GNUNET_PACKED;
108
109   /**
110    * Purpose of the signature.  Extends over the timestamp.
111    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
112    */
113   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
114
115   /**
116    * Absolute time at the sender.  Only the most recent connect
117    * message implies which session is preferred by the sender.
118    */
119   struct GNUNET_TIME_AbsoluteNBO timestamp;
120
121   /**
122    * Public key of the sender.
123    */
124   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
125
126   /**
127    * Signature of the peer that sends us the disconnect.  Only
128    * valid if the timestamp is AFTER the timestamp from the
129    * corresponding 'CONNECT' message.
130    */
131   struct GNUNET_CRYPTO_RsaSignature signature;
132
133 };
134
135
136 /**
137  * For each neighbour we keep a list of messages
138  * that we still want to transmit to the neighbour.
139  */
140 struct MessageQueue
141 {
142
143   /**
144    * This is a doubly linked list.
145    */
146   struct MessageQueue *next;
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *prev;
152
153   /**
154    * Once this message is actively being transmitted, which
155    * neighbour is it associated with?
156    */
157   struct NeighbourMapEntry *n;
158
159   /**
160    * Function to call once we're done.
161    */
162   GST_NeighbourSendContinuation cont;
163
164   /**
165    * Closure for 'cont'
166    */
167   void *cont_cls;
168
169   /**
170    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
171    * stuck together in memory.  Allocated at the end of this struct.
172    */
173   const char *message_buf;
174
175   /**
176    * Size of the message buf
177    */
178   size_t message_buf_size;
179
180   /**
181    * At what time should we fail?
182    */
183   struct GNUNET_TIME_Absolute timeout;
184
185 };
186
187 enum State
188 {
189   /* fresh peer or completely disconnected */
190   S_NOT_CONNECTED = 0,
191   /* sent CONNECT message to other peer, waiting for CONNECT_ACK */
192   S_CONNECT_SENT = 1,
193   /* received CONNECT message to other peer, sending CONNECT_ACK */
194   S_CONNECT_RECV = 4,
195   /* sent CONNECT_ACK message to other peer, wait for ACK or payload */
196   S_CONNECT_RECV_ACK_SENT = 8,
197   /* received ACK or payload */
198   S_CONNECTED = 16,
199   /* Disconnect in progress */
200   S_DISCONNECT = 32
201 };
202
203 /**
204  * Entry in neighbours.
205  */
206 struct NeighbourMapEntry
207 {
208
209   /**
210    * Head of list of messages we would like to send to this peer;
211    * must contain at most one message per client.
212    */
213   struct MessageQueue *messages_head;
214
215   /**
216    * Tail of list of messages we would like to send to this peer; must
217    * contain at most one message per client.
218    */
219   struct MessageQueue *messages_tail;
220
221   /**
222    * Performance data for the peer.
223    */
224   //struct GNUNET_ATS_Information *ats;
225
226   /**
227    * Are we currently trying to send a message? If so, which one?
228    */
229   struct MessageQueue *is_active;
230
231   /**
232    * Active session for communicating with the peer.
233    */
234   struct Session *session;
235
236   /**
237    * Name of the plugin we currently use.
238    */
239   char *plugin_name;
240
241   /**
242    * Address used for communicating with the peer, NULL for inbound connections.
243    */
244   void *addr;
245
246   /**
247    * Number of bytes in 'addr'.
248    */
249   size_t addrlen;
250
251   /**
252    * Identity of this neighbour.
253    */
254   struct GNUNET_PeerIdentity id;
255
256   /**
257    * ID of task scheduled to run when this peer is about to
258    * time out (will free resources associated with the peer).
259    */
260   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
261
262   /**
263    * ID of task scheduled to send keepalives.
264    */
265   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
266
267   /**
268    * ID of task scheduled to run when we should try transmitting
269    * the head of the message queue.
270    */
271   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
272
273   /**
274    * Tracker for inbound bandwidth.
275    */
276   struct GNUNET_BANDWIDTH_Tracker in_tracker;
277
278   /**
279    * Inbound bandwidth from ATS, activated when connection is up
280    */
281   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
282
283   /**
284    * Inbound bandwidth from ATS, activated when connection is up
285    */
286   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
287
288   /**
289    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
290    */
291   struct GNUNET_TIME_Absolute connect_ts;
292
293   /**
294    * Timeout for ATS
295    * We asked ATS for a new address for this peer
296    */
297   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
298
299   /**
300    * Task the resets the peer state after due to an pending
301    * unsuccessful connection setup
302    */
303   GNUNET_SCHEDULER_TaskIdentifier state_reset;
304
305   /**
306    * How often has the other peer (recently) violated the inbound
307    * traffic limit?  Incremented by 10 per violation, decremented by 1
308    * per non-violation (for each time interval).
309    */
310   unsigned int quota_violation_count;
311
312
313   /**
314    * The current state of the peer
315    * Element of enum State
316    */
317   int state;
318
319 };
320
321
322 /**
323  * All known neighbours and their HELLOs.
324  */
325 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
326
327 /**
328  * Closure for connect_notify_cb and disconnect_notify_cb
329  */
330 static void *callback_cls;
331
332 /**
333  * Function to call when we connected to a neighbour.
334  */
335 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
336
337 /**
338  * Function to call when we disconnected from a neighbour.
339  */
340 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
341
342 /**
343  * counter for connected neighbours
344  */
345 static int neighbours_connected;
346
347 /**
348  * Lookup a neighbour entry in the neighbours hash map.
349  *
350  * @param pid identity of the peer to look up
351  * @return the entry, NULL if there is no existing record
352  */
353 static struct NeighbourMapEntry *
354 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
355 {
356   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
357 }
358
359 #define change_state(n, state, ...) change (n, state, __LINE__)
360
361 static int
362 is_connecting (struct NeighbourMapEntry *n)
363 {
364   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
365     return GNUNET_YES;
366   return GNUNET_NO;
367 }
368
369 static int
370 is_connected (struct NeighbourMapEntry *n)
371 {
372   if (n->state == S_CONNECTED)
373     return GNUNET_YES;
374   return GNUNET_NO;
375 }
376
377 static int
378 is_disconnecting (struct NeighbourMapEntry *n)
379 {
380   if (n->state == S_DISCONNECT)
381     return GNUNET_YES;
382   return GNUNET_NO;
383 }
384
385 static const char *
386 print_state (int state)
387 {
388   switch (state)
389   {
390   case S_CONNECTED:
391     return "S_CONNECTED";
392     break;
393   case S_CONNECT_RECV:
394     return "S_CONNECT_RECV";
395     break;
396   case S_CONNECT_RECV_ACK_SENT:
397     return "S_CONNECT_RECV_ACK_SENT";
398     break;
399   case S_CONNECT_SENT:
400     return "S_CONNECT_SENT";
401     break;
402   case S_DISCONNECT:
403     return "S_DISCONNECT";
404     break;
405   case S_NOT_CONNECTED:
406     return "S_NOT_CONNECTED";
407     break;
408   default:
409     GNUNET_break (0);
410     break;
411   }
412   return NULL;
413 }
414
415 static int
416 change (struct NeighbourMapEntry *n, int state, int line);
417
418 static void
419 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
420
421 static void
422 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
423 {
424   struct NeighbourMapEntry *n = cls;
425
426   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
427
428 #if DEBUG_TRANSPORT
429 #endif
430   /* This jut a temporary debug message to check if a the value
431    * SETUP_CONNECTION_TIMEOUT was choosen to small for slow machines
432    */
433   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
434               "Information for developers: Connection to peer `%s' %s failed in state `%s', resetting connection attempt \n",
435               GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name, n->addr,
436                                                     n->addrlen),
437               print_state (n->state));
438
439   GNUNET_STATISTICS_update (GST_stats,
440                             gettext_noop
441                             ("# failed connection attempts due to timeout"), 1,
442                             GNUNET_NO);
443
444   /* resetting state */
445   n->state = S_NOT_CONNECTED;
446
447   /* destroying address */
448   GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
449                                 n->addrlen, NULL);
450
451   /* request new address */
452   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
453     GNUNET_SCHEDULER_cancel (n->ats_suggest);
454   n->ats_suggest =
455       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
456                                     n);
457   GNUNET_ATS_suggest_address (GST_ats, &n->id);
458 }
459
460 static int
461 change (struct NeighbourMapEntry *n, int state, int line)
462 {
463   char *old = strdup (print_state (n->state));
464   char *new = strdup (print_state (state));
465
466   /* allowed transitions */
467   int allowed = GNUNET_NO;
468
469   switch (n->state)
470   {
471   case S_NOT_CONNECTED:
472     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
473         (state == S_DISCONNECT))
474     {
475       allowed = GNUNET_YES;
476
477       /* Schedule reset task */
478       if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT))
479       {
480         GNUNET_assert (n->state_reset == GNUNET_SCHEDULER_NO_TASK);
481         n->state_reset =
482             GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
483                                           n);
484       }
485       break;
486     }
487     break;
488   case S_CONNECT_RECV:
489     if ((state == S_NOT_CONNECTED) || (state == S_DISCONNECT) ||
490         (state == S_CONNECTED) ||
491         /* FIXME SENT -> RECV ISSUE! */ (state == S_CONNECT_SENT))
492     {
493       if ((state == S_CONNECTED) || (state == S_DISCONNECT) ||
494           (state == S_NOT_CONNECTED))
495       {
496 #if DEBUG_TRANSPORT
497         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498                     "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
499                     GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name,
500                                                           n->addr, n->addrlen),
501                     print_state (n->state), print_state (state));
502 #endif
503         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
504         GNUNET_SCHEDULER_cancel (n->state_reset);
505         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
506       }
507
508       allowed = GNUNET_YES;
509       break;
510     }
511     break;
512   case S_CONNECT_SENT:
513     if ((state == S_NOT_CONNECTED) || (state == S_CONNECTED) ||
514         (state == S_DISCONNECT) ||
515         /* FIXME SENT -> RECV ISSUE! */ (state == S_CONNECT_RECV))
516     {
517       if ((state == S_CONNECTED) || (state == S_DISCONNECT) ||
518           (state == S_NOT_CONNECTED))
519       {
520 #if DEBUG_TRANSPORT
521         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
522                     "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
523                     GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name,
524                                                           n->addr, n->addrlen),
525                     print_state (n->state), print_state (state));
526 #endif
527         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
528         GNUNET_SCHEDULER_cancel (n->state_reset);
529         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
530       }
531
532       allowed = GNUNET_YES;
533       break;
534     }
535     break;
536   case S_CONNECTED:
537     if (state == S_DISCONNECT)
538     {
539       allowed = GNUNET_YES;
540       break;
541     }
542     break;
543   case S_DISCONNECT:
544     /*
545      * if (state == S_NOT_CONNECTED)
546      * {
547      * allowed = GNUNET_YES;
548      * break;
549      * } */
550     break;
551   default:
552     GNUNET_break (0);
553     break;
554
555   }
556
557   if (allowed == GNUNET_NO)
558   {
559     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
560                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
561                 new, line);
562     GNUNET_break (0);
563     GNUNET_free (old);
564     GNUNET_free (new);
565     return GNUNET_SYSERR;
566   }
567
568   n->state = state;
569 #if DEBUG_TRANSPORT
570   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
572               GNUNET_i2s (&n->id), n, old, new, line);
573 #endif
574   GNUNET_free (old);
575   GNUNET_free (new);
576   return GNUNET_OK;
577 }
578
579 static ssize_t
580 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
581                   size_t msgbuf_size, uint32_t priority,
582                   struct GNUNET_TIME_Relative timeout, struct Session *session,
583                   const char *plugin_name, const void *addr, size_t addrlen,
584                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
585                   void *cont_cls)
586 {
587   struct GNUNET_TRANSPORT_PluginFunctions *papi;
588   size_t ret = GNUNET_SYSERR;
589
590   /* FIXME : ats returns an address with all values 0 */
591   if (plugin_name == NULL)
592   {
593     if (cont != NULL)
594       cont (cont_cls, target, GNUNET_SYSERR);
595     return GNUNET_SYSERR;
596   }
597
598   if ((session == NULL) && (addr == NULL) && (addrlen == 0))
599   {
600     if (cont != NULL)
601       cont (cont_cls, target, GNUNET_SYSERR);
602     return GNUNET_SYSERR;
603   }
604
605   papi = GST_plugins_find (plugin_name);
606   if (papi == NULL)
607   {
608     if (cont != NULL)
609       cont (cont_cls, target, GNUNET_SYSERR);
610     return GNUNET_SYSERR;
611   }
612
613   ret =
614       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
615                   addr, addrlen, GNUNET_YES, cont, cont_cls);
616
617   if (ret == -1)
618   {
619     if (cont != NULL)
620       cont (cont_cls, target, GNUNET_SYSERR);
621   }
622   return ret;
623 }
624
625 /**
626  * Task invoked to start a transmission to another peer.
627  *
628  * @param cls the 'struct NeighbourMapEntry'
629  * @param tc scheduler context
630  */
631 static void
632 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
633
634
635 /**
636  * We're done with our transmission attempt, continue processing.
637  *
638  * @param cls the 'struct MessageQueue' of the message
639  * @param receiver intended receiver
640  * @param success whether it worked or not
641  */
642 static void
643 transmit_send_continuation (void *cls,
644                             const struct GNUNET_PeerIdentity *receiver,
645                             int success)
646 {
647   struct MessageQueue *mq;
648   struct NeighbourMapEntry *n;
649
650   mq = cls;
651   n = mq->n;
652   if (NULL != n)
653   {
654     GNUNET_assert (n->is_active == mq);
655     n->is_active = NULL;
656     if (success == GNUNET_YES)
657     {
658       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
659       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
660     }
661   }
662 #if DEBUG_TRANSPORT
663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
664               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
665               (success == GNUNET_OK) ? "successful" : "FAILED");
666 #endif
667   if (NULL != mq->cont)
668     mq->cont (mq->cont_cls, success);
669   GNUNET_free (mq);
670 }
671
672
673 /**
674  * Check the ready list for the given neighbour and if a plugin is
675  * ready for transmission (and if we have a message), do so!
676  *
677  * @param n target peer for which to transmit
678  */
679 static void
680 try_transmission_to_peer (struct NeighbourMapEntry *n)
681 {
682   struct MessageQueue *mq;
683   struct GNUNET_TIME_Relative timeout;
684   ssize_t ret;
685
686   if (n->is_active != NULL)
687   {
688     GNUNET_break (0);
689     return;                     /* transmission already pending */
690   }
691   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
692   {
693     GNUNET_break (0);
694     return;                     /* currently waiting for bandwidth */
695   }
696   while (NULL != (mq = n->messages_head))
697   {
698     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
699     if (timeout.rel_value > 0)
700       break;
701     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
702     n->is_active = mq;
703     mq->n = n;
704     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
705   }
706   if (NULL == mq)
707     return;                     /* no more messages */
708
709   if (GST_plugins_find (n->plugin_name) == NULL)
710   {
711     GNUNET_break (0);
712     return;
713   }
714   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
715   n->is_active = mq;
716   mq->n = n;
717
718   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
719   {
720     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
721                 GNUNET_i2s (&n->id));
722     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
723     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
724     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
725     return;
726   }
727
728   ret =
729       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
730                         timeout, n->session, n->plugin_name, n->addr,
731                         n->addrlen, GNUNET_YES, &transmit_send_continuation,
732                         mq);
733   if (ret == -1)
734   {
735     /* failure, but 'send' would not call continuation in this case,
736      * so we need to do it here! */
737     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
738   }
739
740 }
741
742
743 /**
744  * Task invoked to start a transmission to another peer.
745  *
746  * @param cls the 'struct NeighbourMapEntry'
747  * @param tc scheduler context
748  */
749 static void
750 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
751 {
752   struct NeighbourMapEntry *n = cls;
753
754   GNUNET_assert (NULL != lookup_neighbour (&n->id));
755   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
756   try_transmission_to_peer (n);
757 }
758
759
760 /**
761  * Initialize the neighbours subsystem.
762  *
763  * @param cls closure for callbacks
764  * @param connect_cb function to call if we connect to a peer
765  * @param disconnect_cb function to call if we disconnect from a peer
766  */
767 void
768 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
769                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
770 {
771   callback_cls = cls;
772   connect_notify_cb = connect_cb;
773   disconnect_notify_cb = disconnect_cb;
774   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
775 }
776
777
778 static void
779 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
780                       int result)
781 {
782 #if DEBUG_TRANSPORT
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "Sending DISCONNECT message to peer `%4s': %i\n",
785               GNUNET_i2s (target), result);
786 #endif
787 }
788
789
790 static int
791 send_disconnect (const struct GNUNET_PeerIdentity *target,
792                  const char *plugin_name, const char *sender_address,
793                  uint16_t sender_address_len, struct Session *session)
794 {
795   size_t ret;
796   struct SessionDisconnectMessage disconnect_msg;
797
798 #if DEBUG_TRANSPORT
799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800               "Sending DISCONNECT message to peer `%4s'\n",
801               GNUNET_i2s (target));
802 #endif
803
804   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
805   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
806   disconnect_msg.reserved = htonl (0);
807   disconnect_msg.purpose.size =
808       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
809              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
810              sizeof (struct GNUNET_TIME_AbsoluteNBO));
811   disconnect_msg.purpose.purpose =
812       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
813   disconnect_msg.timestamp =
814       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
815   disconnect_msg.public_key = GST_my_public_key;
816   GNUNET_assert (GNUNET_OK ==
817                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
818                                          &disconnect_msg.purpose,
819                                          &disconnect_msg.signature));
820
821   ret =
822       send_with_plugin (target, (const char *) &disconnect_msg,
823                         sizeof (disconnect_msg), UINT32_MAX,
824                         GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name,
825                         sender_address, sender_address_len, GNUNET_YES,
826                         &send_disconnect_cont, NULL);
827
828   if (ret == GNUNET_SYSERR)
829     return GNUNET_SYSERR;
830
831   GNUNET_STATISTICS_update (GST_stats,
832                             gettext_noop
833                             ("# peers disconnected due to external request"), 1,
834                             GNUNET_NO);
835   return GNUNET_OK;
836 }
837
838 /**
839  * Disconnect from the given neighbour, clean up the record.
840  *
841  * @param n neighbour to disconnect from
842  */
843 static void
844 disconnect_neighbour (struct NeighbourMapEntry *n)
845 {
846   struct MessageQueue *mq;
847   int was_connected = is_connected (n);
848
849   /* send DISCONNECT MESSAGE */
850   if (is_connected (n) || is_connecting (n))
851   {
852     if (GNUNET_OK ==
853         send_disconnect (&n->id, n->plugin_name, n->addr, n->addrlen,
854                          n->session))
855       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
856                   GNUNET_i2s (&n->id));
857     else
858       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859                   "Could not send DISCONNECT_MSG to `%s'\n",
860                   GNUNET_i2s (&n->id));
861   }
862
863
864   if (is_disconnecting (n))
865     return;
866   change_state (n, S_DISCONNECT);
867
868   if (n->plugin_name != NULL)
869   {
870     struct GNUNET_TRANSPORT_PluginFunctions *papi;
871     papi = GST_plugins_find (n->plugin_name);
872     if (papi != NULL)
873       papi->disconnect (papi->cls, &n->id);
874   }
875
876   while (NULL != (mq = n->messages_head))
877   {
878     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
879     if (NULL != mq->cont)
880       mq->cont (mq->cont_cls, GNUNET_SYSERR);
881     GNUNET_free (mq);
882   }
883   if (NULL != n->is_active)
884   {
885     n->is_active->n = NULL;
886     n->is_active = NULL;
887   }
888   if (was_connected)
889   {
890     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
891     GNUNET_SCHEDULER_cancel (n->keepalive_task);
892     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
893     GNUNET_assert (neighbours_connected > 0);
894     neighbours_connected--;
895     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
896                               GNUNET_NO);
897     disconnect_notify_cb (callback_cls, &n->id);
898   }
899   GNUNET_assert (GNUNET_YES ==
900                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
901                                                        &n->id.hashPubKey, n));
902   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
903   {
904     GNUNET_SCHEDULER_cancel (n->ats_suggest);
905     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
906   }
907   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
908   {
909     GNUNET_SCHEDULER_cancel (n->timeout_task);
910     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
911   }
912   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
913   {
914     GNUNET_SCHEDULER_cancel (n->transmission_task);
915     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
916   }
917   if (NULL != n->plugin_name)
918   {
919     GNUNET_free (n->plugin_name);
920     n->plugin_name = NULL;
921   }
922   if (NULL != n->addr)
923   {
924     GNUNET_free (n->addr);
925     n->addr = NULL;
926     n->addrlen = 0;
927   }
928   n->session = NULL;
929   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
930               GNUNET_i2s (&n->id), n);
931   GNUNET_free (n);
932 }
933
934
935 /**
936  * Peer has been idle for too long. Disconnect.
937  *
938  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
939  * @param tc scheduler context
940  */
941 static void
942 neighbour_timeout_task (void *cls,
943                         const struct GNUNET_SCHEDULER_TaskContext *tc)
944 {
945   struct NeighbourMapEntry *n = cls;
946
947   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
948
949   GNUNET_STATISTICS_update (GST_stats,
950                             gettext_noop
951                             ("# peers disconnected due to timeout"), 1,
952                             GNUNET_NO);
953   disconnect_neighbour (n);
954 }
955
956
957 /**
958  * Send another keepalive message.
959  *
960  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
961  * @param tc scheduler context
962  */
963 static void
964 neighbour_keepalive_task (void *cls,
965                           const struct GNUNET_SCHEDULER_TaskContext *tc)
966 {
967   struct NeighbourMapEntry *n = cls;
968   struct GNUNET_MessageHeader m;
969
970   n->keepalive_task =
971       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
972                                     &neighbour_keepalive_task, n);
973   GNUNET_assert (is_connected (n));
974   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
975                             GNUNET_NO);
976   m.size = htons (sizeof (struct GNUNET_MessageHeader));
977   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
978
979   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
980                     UINT32_MAX /* priority */ ,
981                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name,
982                     n->addr, n->addrlen, GNUNET_YES, NULL, NULL);
983 }
984
985
986 /**
987  * Disconnect from the given neighbour.
988  *
989  * @param cls unused
990  * @param key hash of neighbour's public key (not used)
991  * @param value the 'struct NeighbourMapEntry' of the neighbour
992  */
993 static int
994 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
995 {
996   struct NeighbourMapEntry *n = value;
997
998 #if DEBUG_TRANSPORT
999   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1000               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1001 #endif
1002   if (is_connected (n))
1003     GNUNET_STATISTICS_update (GST_stats,
1004                               gettext_noop
1005                               ("# peers disconnected due to global disconnect"),
1006                               1, GNUNET_NO);
1007   disconnect_neighbour (n);
1008   return GNUNET_OK;
1009 }
1010
1011
1012 static void
1013 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1014 {
1015   struct NeighbourMapEntry *n = cls;
1016
1017   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1018
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020               " ATS did not suggested address to connect to peer `%s'\n",
1021               GNUNET_i2s (&n->id));
1022
1023   disconnect_neighbour (n);
1024 }
1025
1026 /**
1027  * Cleanup the neighbours subsystem.
1028  */
1029 void
1030 GST_neighbours_stop ()
1031 {
1032   // This can happen during shutdown
1033   if (neighbours == NULL)
1034   {
1035     return;
1036   }
1037
1038   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1039                                          NULL);
1040   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1041   GNUNET_assert (neighbours_connected == 0);
1042   neighbours = NULL;
1043   callback_cls = NULL;
1044   connect_notify_cb = NULL;
1045   disconnect_notify_cb = NULL;
1046 }
1047
1048
1049 /**
1050  * We tried to send a SESSION_CONNECT message to another peer.  If this
1051  * succeeded, we change the state.  If it failed, we should tell
1052  * ATS to not use this address anymore (until it is re-validated).
1053  *
1054  * @param cls the 'struct NeighbourMapEntry'
1055  * @param success GNUNET_OK on success
1056  */
1057 static void
1058 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1059                            int success)
1060 {
1061   struct NeighbourMapEntry *n = cls;
1062
1063   GNUNET_assert (n != NULL);
1064   GNUNET_assert (!is_connected (n));
1065
1066   if (is_disconnecting (n))
1067     return;                     /* neighbour is going away */
1068
1069   if (GNUNET_YES != success)
1070   {
1071 #if DEBUG_TRANSPORT
1072     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073                 "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1074                 GNUNET_i2s (&n->id), n->plugin_name,
1075                 (n->addrlen ==
1076                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
1077                                                      n->addrlen), n->session);
1078 #endif
1079
1080     GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1081                                   n->addrlen, NULL);
1082
1083     change_state (n, S_NOT_CONNECTED);
1084
1085     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1086       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1087     n->ats_suggest =
1088         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1089                                       n);
1090     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1091     return;
1092   }
1093
1094
1095 }
1096
1097
1098 /**
1099  * We tried to switch addresses with an peer already connected. If it failed,
1100  * we should tell ATS to not use this address anymore (until it is re-validated).
1101  *
1102  * @param cls the 'struct NeighbourMapEntry'
1103  * @param success GNUNET_OK on success
1104  */
1105 static void
1106 send_switch_address_continuation (void *cls,
1107                                   const struct GNUNET_PeerIdentity *target,
1108                                   int success)
1109 {
1110   struct NeighbourMapEntry *n = cls;
1111
1112   GNUNET_assert (n != NULL);
1113   if (is_disconnecting (n))
1114     return;                     /* neighbour is going away */
1115
1116   GNUNET_assert (n->state == S_CONNECTED);
1117   if (GNUNET_YES != success)
1118   {
1119 #if DEBUG_TRANSPORT
1120     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121                 "Failed to switch connected peer `%s' to plugin `%s' address '%s' session %X, asking ATS for new address \n",
1122                 GNUNET_i2s (&n->id), n->plugin_name,
1123                 (n->addrlen ==
1124                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
1125                                                      n->addrlen), n->session);
1126 #endif
1127
1128     GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1129                                   n->addrlen, NULL);
1130
1131     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1132       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1133     n->ats_suggest =
1134         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1135                                       n);
1136     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1137     return;
1138   }
1139 }
1140
1141 /**
1142  * We tried to send a SESSION_CONNECT message to another peer.  If this
1143  * succeeded, we change the state.  If it failed, we should tell
1144  * ATS to not use this address anymore (until it is re-validated).
1145  *
1146  * @param cls the 'struct NeighbourMapEntry'
1147  * @param success GNUNET_OK on success
1148  */
1149 static void
1150 send_connect_ack_continuation (void *cls,
1151                                const struct GNUNET_PeerIdentity *target,
1152                                int success)
1153 {
1154   struct NeighbourMapEntry *n = cls;
1155
1156   GNUNET_assert (n != NULL);
1157
1158   if (GNUNET_YES == success)
1159     return;                     /* sending successful */
1160
1161   /* sending failed, ask for next address  */
1162 #if DEBUG_TRANSPORT
1163   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1164               "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1165               GNUNET_i2s (&n->id), n->plugin_name,
1166               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name,
1167                                                                  n->addr,
1168                                                                  n->addrlen),
1169               n->session);
1170 #endif
1171   change_state (n, S_NOT_CONNECTED);
1172
1173   GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1174                                 n->addrlen, NULL);
1175
1176   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1177     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1178   n->ats_suggest =
1179       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1180                                     n);
1181   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1182 }
1183
1184 /**
1185  * For an existing neighbour record, set the active connection to
1186  * the given address.
1187  *
1188  * @param peer identity of the peer to switch the address for
1189  * @param plugin_name name of transport that delivered the PONG
1190  * @param address address of the other peer, NULL if other peer
1191  *                       connected to us
1192  * @param address_len number of bytes in address
1193  * @param session session to use (or NULL)
1194  * @param ats performance data
1195  * @param ats_count number of entries in ats
1196  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1197  *         connection is not up (yet)
1198  */
1199 int
1200 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1201                                        const char *plugin_name,
1202                                        const void *address, size_t address_len,
1203                                        struct Session *session,
1204                                        const struct GNUNET_ATS_Information *ats,
1205                                        uint32_t ats_count,
1206                                        struct GNUNET_BANDWIDTH_Value32NBO
1207                                        bandwidth_in,
1208                                        struct GNUNET_BANDWIDTH_Value32NBO
1209                                        bandwidth_out)
1210 {
1211   struct NeighbourMapEntry *n;
1212   struct SessionConnectMessage connect_msg;
1213   size_t msg_len;
1214   size_t ret;
1215   int checks_failed;
1216
1217   // This can happen during shutdown
1218   if (neighbours == NULL)
1219   {
1220     return GNUNET_NO;
1221   }
1222
1223   checks_failed = GNUNET_NO;
1224
1225   if (plugin_name == NULL)
1226   {
1227     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1228                 "ATS offered suggested us empty address: plugin NULL");
1229     GNUNET_break_op (0);
1230     checks_failed = GNUNET_YES;
1231   }
1232   if ((address == NULL) && (address_len == 0) && (session == NULL))
1233   {
1234     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1235                 "ATS offered suggested us empty address: address NULL & session NULL");
1236     GNUNET_break_op (0);
1237     checks_failed = GNUNET_YES;
1238   }
1239
1240   n = lookup_neighbour (peer);
1241   if (NULL == n)
1242     checks_failed = GNUNET_YES;
1243
1244   if (checks_failed == GNUNET_YES)
1245   {
1246     GNUNET_ATS_address_destroyed (GST_ats, peer, plugin_name, address,
1247                                   address_len, session);
1248     if (n != NULL)
1249       GNUNET_ATS_suggest_address (GST_ats, peer);
1250     return GNUNET_NO;
1251   }
1252
1253   /* checks successful and neighbour != NULL */
1254 #if DEBUG_TRANSPORT
1255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1256               "ATS tells us to switch to plugin `%s' address '%s' session %X for %s peer `%s'\n",
1257               plugin_name,
1258               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
1259                                                                   address,
1260                                                                   address_len),
1261               session, (is_connected (n) ? "CONNECTED" : "NOT CONNECTED"),
1262               GNUNET_i2s (peer));
1263 #endif
1264
1265   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1266   {
1267     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1268     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1269   }
1270
1271   // do not switch addresses just update quotas
1272   if ((is_connected (n)) && (address_len == n->addrlen))
1273   {
1274     if ((0 == memcmp (address, n->addr, address_len)) &&
1275         (n->session == session))
1276     {
1277       struct QuotaSetMessage q_msg;
1278
1279 #if DEBUG_TRANSPORT
1280       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1281                   "Sending outbound quota of %u Bps and inbound quota of %u Bps for peer `%s' to all clients\n",
1282                   ntohl (n->bandwidth_out.value__),
1283                   ntohl (n->bandwidth_in.value__), GNUNET_i2s (peer));
1284 #endif
1285
1286       n->bandwidth_in = bandwidth_in;
1287       n->bandwidth_out = bandwidth_out;
1288       GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1289
1290       q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1291       q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1292       q_msg.quota = n->bandwidth_out;
1293       q_msg.peer = (*peer);
1294       GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1295       return GNUNET_NO;
1296     }
1297   }
1298
1299   GNUNET_free_non_null (n->addr);
1300   n->addr = GNUNET_malloc (address_len);
1301   memcpy (n->addr, address, address_len);
1302   n->bandwidth_in = bandwidth_in;
1303   n->bandwidth_out = bandwidth_out;
1304   n->addrlen = address_len;
1305   n->session = session;
1306   GNUNET_free_non_null (n->plugin_name);
1307   n->plugin_name = GNUNET_strdup (plugin_name);
1308   GNUNET_SCHEDULER_cancel (n->timeout_task);
1309   n->timeout_task =
1310       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1311                                     &neighbour_timeout_task, n);
1312
1313   if (n->state == S_DISCONNECT)
1314   {
1315     /* We are disconnecting, nothing to do here */
1316     return GNUNET_NO;
1317   }
1318   /* We are not connected/connecting and initiate a fresh connect */
1319   if (n->state == S_NOT_CONNECTED)
1320   {
1321     msg_len = sizeof (struct SessionConnectMessage);
1322     connect_msg.header.size = htons (msg_len);
1323     connect_msg.header.type =
1324         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1325     connect_msg.reserved = htonl (0);
1326     connect_msg.timestamp =
1327         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1328
1329     change_state (n, S_CONNECT_SENT);
1330
1331     ret =
1332         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1333                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1334                           plugin_name, address, address_len, GNUNET_YES,
1335                           &send_connect_continuation, n);
1336
1337
1338     return GNUNET_NO;
1339   }
1340   /* We received a CONNECT message and asked ATS for an address */
1341   else if (n->state == S_CONNECT_RECV)
1342   {
1343     msg_len = sizeof (struct SessionConnectMessage);
1344     connect_msg.header.size = htons (msg_len);
1345     connect_msg.header.type =
1346         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1347     connect_msg.reserved = htonl (0);
1348     connect_msg.timestamp =
1349         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1350
1351     ret =
1352         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1353                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1354                           plugin_name, address, address_len, GNUNET_YES,
1355                           &send_connect_ack_continuation, n);
1356     return GNUNET_NO;
1357   }
1358   /* connected peer is switching addresses */
1359   else if (n->state == S_CONNECTED)
1360   {
1361     msg_len = sizeof (struct SessionConnectMessage);
1362     connect_msg.header.size = htons (msg_len);
1363     connect_msg.header.type =
1364         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1365     connect_msg.reserved = htonl (0);
1366     connect_msg.timestamp =
1367         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1368
1369     ret =
1370         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1371                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1372                           plugin_name, address, address_len, GNUNET_YES,
1373                           &send_switch_address_continuation, n);
1374     if (ret == GNUNET_SYSERR)
1375     {
1376       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1377                   "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n",
1378                   GNUNET_i2s (peer), plugin_name,
1379                   (address_len ==
1380                    0) ? "<inbound>" : GST_plugins_a2s (plugin_name, address,
1381                                                        address_len), session);
1382     }
1383     return GNUNET_NO;
1384   }
1385   else if (n->state == S_CONNECT_SENT)
1386   {
1387     return GNUNET_NO;
1388   }
1389   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1390               "Invalid connection state to switch addresses %u \n", n->state);
1391   GNUNET_break_op (0);
1392   return GNUNET_NO;
1393 }
1394
1395
1396 /**
1397  * Create an entry in the neighbour map for the given peer
1398  *
1399  * @param peer peer to create an entry for
1400  * @return new neighbour map entry
1401  */
1402 static struct NeighbourMapEntry *
1403 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1404 {
1405   struct NeighbourMapEntry *n;
1406
1407 #if DEBUG_TRANSPORT
1408   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1409               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1410 #endif
1411   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1412   n->id = *peer;
1413   n->state = S_NOT_CONNECTED;
1414   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1415                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1416                                  MAX_BANDWIDTH_CARRY_S);
1417   n->timeout_task =
1418       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1419                                     &neighbour_timeout_task, n);
1420   GNUNET_assert (GNUNET_OK ==
1421                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1422                                                     &n->id.hashPubKey, n,
1423                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1424   return n;
1425 }
1426
1427
1428 /**
1429  * Try to create a connection to the given target (eventually).
1430  *
1431  * @param target peer to try to connect to
1432  */
1433 void
1434 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1435 {
1436   struct NeighbourMapEntry *n;
1437
1438   // This can happen during shutdown
1439   if (neighbours == NULL)
1440   {
1441     return;
1442   }
1443 #if DEBUG_TRANSPORT
1444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1445               GNUNET_i2s (target));
1446 #endif
1447   if (0 ==
1448       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1449   {
1450     /* my own hello */
1451     return;
1452   }
1453   n = lookup_neighbour (target);
1454
1455   if (NULL != n)
1456   {
1457     if ((is_connected (n)) || (is_connecting (n)))
1458       return;                   /* already connecting or connected */
1459     if (is_disconnecting (n))
1460       change_state (n, S_NOT_CONNECTED);
1461   }
1462
1463
1464   if (n == NULL)
1465     n = setup_neighbour (target);
1466 #if DEBUG_TRANSPORT
1467   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1468               "Asking ATS for suggested address to connect to peer `%s'\n",
1469               GNUNET_i2s (&n->id));
1470 #endif
1471
1472   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1473 }
1474
1475 /**
1476  * Test if we're connected to the given peer.
1477  *
1478  * @param target peer to test
1479  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1480  */
1481 int
1482 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1483 {
1484   struct NeighbourMapEntry *n;
1485
1486   // This can happen during shutdown
1487   if (neighbours == NULL)
1488   {
1489     return GNUNET_NO;
1490   }
1491
1492   n = lookup_neighbour (target);
1493
1494   if ((NULL == n) || (!is_connected (n)))
1495     return GNUNET_NO;           /* not connected */
1496   return GNUNET_YES;
1497 }
1498
1499
1500 /**
1501  * A session was terminated. Take note.
1502  *
1503  * @param peer identity of the peer where the session died
1504  * @param session session that is gone
1505  */
1506 void
1507 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1508                                    struct Session *session)
1509 {
1510   struct NeighbourMapEntry *n;
1511
1512   // This can happen during shutdown
1513   if (neighbours == NULL)
1514   {
1515     return;
1516   }
1517
1518 #if DEBUG_TRANSPORT
1519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1520               session, GNUNET_i2s (peer));
1521 #endif
1522
1523   n = lookup_neighbour (peer);
1524   if (NULL == n)
1525     return;
1526   if (session != n->session)
1527     return;                     /* doesn't affect us */
1528
1529   n->session = NULL;
1530   GNUNET_free (n->addr);
1531   n->addr = NULL;
1532   n->addrlen = 0;
1533
1534   /* not connected anymore anyway, shouldn't matter */
1535   if ((!is_connected (n)) && (!is_connecting (n)))
1536     return;
1537
1538   /* We are connected, so ask ATS to switch addresses */
1539   GNUNET_SCHEDULER_cancel (n->timeout_task);
1540   n->timeout_task =
1541       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1542                                     &neighbour_timeout_task, n);
1543   /* try QUICKLY to re-establish a connection, reduce timeout! */
1544   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1545     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1546   n->ats_suggest =
1547       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1548                                     n);
1549   GNUNET_ATS_suggest_address (GST_ats, peer);
1550 }
1551
1552
1553 /**
1554  * Transmit a message to the given target using the active connection.
1555  *
1556  * @param target destination
1557  * @param msg message to send
1558  * @param msg_size number of bytes in msg
1559  * @param timeout when to fail with timeout
1560  * @param cont function to call when done
1561  * @param cont_cls closure for 'cont'
1562  */
1563 void
1564 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1565                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1566                      GST_NeighbourSendContinuation cont, void *cont_cls)
1567 {
1568   struct NeighbourMapEntry *n;
1569   struct MessageQueue *mq;
1570
1571   // This can happen during shutdown
1572   if (neighbours == NULL)
1573   {
1574     return;
1575   }
1576
1577   n = lookup_neighbour (target);
1578   if ((n == NULL) || (!is_connected (n)))
1579   {
1580     GNUNET_STATISTICS_update (GST_stats,
1581                               gettext_noop
1582                               ("# messages not sent (no such peer or not connected)"),
1583                               1, GNUNET_NO);
1584 #if DEBUG_TRANSPORT
1585     if (n == NULL)
1586       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1587                   "Could not send message to peer `%s': unknown neighbour",
1588                   GNUNET_i2s (target));
1589     else if (!is_connected (n))
1590       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1591                   "Could not send message to peer `%s': not connected\n",
1592                   GNUNET_i2s (target));
1593 #endif
1594     if (NULL != cont)
1595       cont (cont_cls, GNUNET_SYSERR);
1596     return;
1597   }
1598
1599   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
1600   {
1601     GNUNET_STATISTICS_update (GST_stats,
1602                               gettext_noop
1603                               ("# messages not sent (no such peer or not connected)"),
1604                               1, GNUNET_NO);
1605 #if DEBUG_TRANSPORT
1606     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1607                 "Could not send message to peer `%s': no address available\n",
1608                 GNUNET_i2s (target));
1609 #endif
1610
1611     if (NULL != cont)
1612       cont (cont_cls, GNUNET_SYSERR);
1613     return;
1614   }
1615
1616   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1617   GNUNET_STATISTICS_update (GST_stats,
1618                             gettext_noop
1619                             ("# bytes in message queue for other peers"),
1620                             msg_size, GNUNET_NO);
1621   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1622   mq->cont = cont;
1623   mq->cont_cls = cont_cls;
1624   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1625   memcpy (&mq[1], msg, msg_size);
1626   mq->message_buf = (const char *) &mq[1];
1627   mq->message_buf_size = msg_size;
1628   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1629   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1630
1631   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1632       (NULL == n->is_active))
1633     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1634 }
1635
1636
1637 /**
1638  * We have received a message from the given sender.  How long should
1639  * we delay before receiving more?  (Also used to keep the peer marked
1640  * as live).
1641  *
1642  * @param sender sender of the message
1643  * @param size size of the message
1644  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1645  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1646  *                   GNUNET_SYSERR if the connection is not fully up yet
1647  * @return how long to wait before reading more from this sender
1648  */
1649 struct GNUNET_TIME_Relative
1650 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1651                                         *sender, ssize_t size, int *do_forward)
1652 {
1653   struct NeighbourMapEntry *n;
1654   struct GNUNET_TIME_Relative ret;
1655
1656   // This can happen during shutdown
1657   if (neighbours == NULL)
1658   {
1659     return GNUNET_TIME_UNIT_FOREVER_REL;
1660   }
1661
1662   n = lookup_neighbour (sender);
1663   if (n == NULL)
1664   {
1665     GST_neighbours_try_connect (sender);
1666     n = lookup_neighbour (sender);
1667     if (NULL == n)
1668     {
1669       GNUNET_STATISTICS_update (GST_stats,
1670                                 gettext_noop
1671                                 ("# messages discarded due to lack of neighbour record"),
1672                                 1, GNUNET_NO);
1673       *do_forward = GNUNET_NO;
1674       return GNUNET_TIME_UNIT_ZERO;
1675     }
1676   }
1677   if (!is_connected (n))
1678   {
1679     *do_forward = GNUNET_SYSERR;
1680     return GNUNET_TIME_UNIT_ZERO;
1681   }
1682   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1683   {
1684     n->quota_violation_count++;
1685 #if DEBUG_TRANSPORT
1686     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1687                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1688                 n->in_tracker.available_bytes_per_s__,
1689                 n->quota_violation_count);
1690 #endif
1691     /* Discount 32k per violation */
1692     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1693   }
1694   else
1695   {
1696     if (n->quota_violation_count > 0)
1697     {
1698       /* try to add 32k back */
1699       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1700       n->quota_violation_count--;
1701     }
1702   }
1703   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1704   {
1705     GNUNET_STATISTICS_update (GST_stats,
1706                               gettext_noop
1707                               ("# bandwidth quota violations by other peers"),
1708                               1, GNUNET_NO);
1709     *do_forward = GNUNET_NO;
1710     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1711   }
1712   *do_forward = GNUNET_YES;
1713   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1714   if (ret.rel_value > 0)
1715   {
1716 #if DEBUG_TRANSPORT
1717     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1718                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1719                 (unsigned long long) n->in_tracker.
1720                 consumption_since_last_update__,
1721                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1722                 (unsigned long long) ret.rel_value);
1723 #endif
1724     GNUNET_STATISTICS_update (GST_stats,
1725                               gettext_noop ("# ms throttling suggested"),
1726                               (int64_t) ret.rel_value, GNUNET_NO);
1727   }
1728   return ret;
1729 }
1730
1731
1732 /**
1733  * Keep the connection to the given neighbour alive longer,
1734  * we received a KEEPALIVE (or equivalent).
1735  *
1736  * @param neighbour neighbour to keep alive
1737  */
1738 void
1739 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1740 {
1741   struct NeighbourMapEntry *n;
1742
1743   // This can happen during shutdown
1744   if (neighbours == NULL)
1745   {
1746     return;
1747   }
1748
1749   n = lookup_neighbour (neighbour);
1750   if (NULL == n)
1751   {
1752     GNUNET_STATISTICS_update (GST_stats,
1753                               gettext_noop
1754                               ("# KEEPALIVE messages discarded (not connected)"),
1755                               1, GNUNET_NO);
1756     return;
1757   }
1758   GNUNET_SCHEDULER_cancel (n->timeout_task);
1759   n->timeout_task =
1760       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1761                                     &neighbour_timeout_task, n);
1762 }
1763
1764
1765 /**
1766  * Change the incoming quota for the given peer.
1767  *
1768  * @param neighbour identity of peer to change qutoa for
1769  * @param quota new quota
1770  */
1771 void
1772 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1773                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1774 {
1775   struct NeighbourMapEntry *n;
1776
1777   // This can happen during shutdown
1778   if (neighbours == NULL)
1779   {
1780     return;
1781   }
1782
1783   n = lookup_neighbour (neighbour);
1784   if (n == NULL)
1785   {
1786     GNUNET_STATISTICS_update (GST_stats,
1787                               gettext_noop
1788                               ("# SET QUOTA messages ignored (no such peer)"),
1789                               1, GNUNET_NO);
1790     return;
1791   }
1792   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1793   if (0 != ntohl (quota.value__))
1794     return;
1795 #if DEBUG_TRANSPORT
1796   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1797               GNUNET_i2s (&n->id), "SET_QUOTA");
1798 #endif
1799   if (is_connected (n))
1800     GNUNET_STATISTICS_update (GST_stats,
1801                               gettext_noop ("# disconnects due to quota of 0"),
1802                               1, GNUNET_NO);
1803   disconnect_neighbour (n);
1804 }
1805
1806
1807 /**
1808  * Closure for the neighbours_iterate function.
1809  */
1810 struct IteratorContext
1811 {
1812   /**
1813    * Function to call on each connected neighbour.
1814    */
1815   GST_NeighbourIterator cb;
1816
1817   /**
1818    * Closure for 'cb'.
1819    */
1820   void *cb_cls;
1821 };
1822
1823
1824 /**
1825  * Call the callback from the closure for each connected neighbour.
1826  *
1827  * @param cls the 'struct IteratorContext'
1828  * @param key the hash of the public key of the neighbour
1829  * @param value the 'struct NeighbourMapEntry'
1830  * @return GNUNET_OK (continue to iterate)
1831  */
1832 static int
1833 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1834 {
1835   struct IteratorContext *ic = cls;
1836   struct NeighbourMapEntry *n = value;
1837
1838   if (!is_connected (n))
1839     return GNUNET_OK;
1840
1841   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->plugin_name, n->addr, n->addrlen);
1842   return GNUNET_OK;
1843 }
1844
1845
1846 /**
1847  * Iterate over all connected neighbours.
1848  *
1849  * @param cb function to call
1850  * @param cb_cls closure for cb
1851  */
1852 void
1853 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1854 {
1855   struct IteratorContext ic;
1856
1857   // This can happen during shutdown
1858   if (neighbours == NULL)
1859   {
1860     return;
1861   }
1862
1863   ic.cb = cb;
1864   ic.cb_cls = cb_cls;
1865   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1866 }
1867
1868 /**
1869  * If we have an active connection to the given target, it must be shutdown.
1870  *
1871  * @param target peer to disconnect from
1872  */
1873 void
1874 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1875 {
1876   struct NeighbourMapEntry *n;
1877
1878   // This can happen during shutdown
1879   if (neighbours == NULL)
1880   {
1881     return;
1882   }
1883
1884   n = lookup_neighbour (target);
1885   if (NULL == n)
1886     return;                     /* not active */
1887   if (is_connected (n))
1888   {
1889     send_disconnect (&n->id, n->plugin_name, n->addr, n->addrlen, n->session);
1890
1891     n = lookup_neighbour (target);
1892     if (NULL == n)
1893       return;                   /* gone already */
1894   }
1895   disconnect_neighbour (n);
1896 }
1897
1898
1899 /**
1900  * We received a disconnect message from the given peer,
1901  * validate and process.
1902  *
1903  * @param peer sender of the message
1904  * @param msg the disconnect message
1905  */
1906 void
1907 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
1908                                           *peer,
1909                                           const struct GNUNET_MessageHeader
1910                                           *msg)
1911 {
1912   struct NeighbourMapEntry *n;
1913   const struct SessionDisconnectMessage *sdm;
1914   GNUNET_HashCode hc;
1915
1916 #if DEBUG_TRANSPORT
1917   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1918               "Received DISCONNECT message from peer `%s'\n",
1919               GNUNET_i2s (peer));
1920 #endif
1921
1922   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
1923   {
1924     // GNUNET_break_op (0);
1925     GNUNET_STATISTICS_update (GST_stats,
1926                               gettext_noop
1927                               ("# disconnect messages ignored (old format)"), 1,
1928                               GNUNET_NO);
1929     return;
1930   }
1931   sdm = (const struct SessionDisconnectMessage *) msg;
1932   n = lookup_neighbour (peer);
1933   if (NULL == n)
1934     return;                     /* gone already */
1935   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
1936       n->connect_ts.abs_value)
1937   {
1938     GNUNET_STATISTICS_update (GST_stats,
1939                               gettext_noop
1940                               ("# disconnect messages ignored (timestamp)"), 1,
1941                               GNUNET_NO);
1942     return;
1943   }
1944   GNUNET_CRYPTO_hash (&sdm->public_key,
1945                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1946                       &hc);
1947   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
1948   {
1949     GNUNET_break_op (0);
1950     return;
1951   }
1952   if (ntohl (sdm->purpose.size) !=
1953       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1954       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1955       sizeof (struct GNUNET_TIME_AbsoluteNBO))
1956   {
1957     GNUNET_break_op (0);
1958     return;
1959   }
1960   if (GNUNET_OK !=
1961       GNUNET_CRYPTO_rsa_verify
1962       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
1963        &sdm->signature, &sdm->public_key))
1964   {
1965     GNUNET_break_op (0);
1966     return;
1967   }
1968   GST_neighbours_force_disconnect (peer);
1969 }
1970
1971 /**
1972  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
1973  * Consider switching to it.
1974  *
1975  * @param message possibly a 'struct SessionConnectMessage' (check format)
1976  * @param peer identity of the peer to switch the address for
1977  * @param plugin_name name of transport that delivered the PONG
1978  * @param address address of the other peer, NULL if other peer
1979  *                       connected to us
1980  * @param address_len number of bytes in address
1981  * @param session session to use (or NULL)
1982  * @param ats performance data
1983  * @param ats_count number of entries in ats
1984   */
1985 void
1986 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
1987                                    const struct GNUNET_PeerIdentity *peer,
1988                                    const char *plugin_name,
1989                                    const char *sender_address,
1990                                    uint16_t sender_address_len,
1991                                    struct Session *session,
1992                                    const struct GNUNET_ATS_Information *ats,
1993                                    uint32_t ats_count)
1994 {
1995   const struct SessionConnectMessage *scm;
1996   struct QuotaSetMessage q_msg;
1997   struct GNUNET_MessageHeader msg;
1998   struct NeighbourMapEntry *n;
1999   size_t msg_len;
2000   size_t ret;
2001   int was_connected;
2002
2003 #if DEBUG_TRANSPORT
2004   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2005               "Received CONNECT_ACK message from peer `%s'\n",
2006               GNUNET_i2s (peer));
2007 #endif
2008
2009   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2010   {
2011     GNUNET_break_op (0);
2012     return;
2013   }
2014
2015   scm = (const struct SessionConnectMessage *) message;
2016   GNUNET_break_op (ntohl (scm->reserved) == 0);
2017   n = lookup_neighbour (peer);
2018   if (NULL == n)
2019     n = setup_neighbour (peer);
2020
2021   if (!is_connecting(n))
2022   {
2023     GNUNET_STATISTICS_update (GST_stats,
2024         gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
2025         GNUNET_NO);
2026     return;
2027   }
2028
2029   if (NULL != session)
2030     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2031                      "transport-ats",
2032                      "Giving ATS session %p of plugin %s for peer %s\n",
2033                      session, plugin_name, GNUNET_i2s (peer));
2034   GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2035                              sender_address_len, session, ats, ats_count);
2036
2037   was_connected = is_connected (n);
2038   if (!is_connected (n))
2039     change_state (n, S_CONNECTED);
2040
2041   GNUNET_ATS_address_in_use (GST_ats, peer, plugin_name, sender_address,
2042                              sender_address_len, session, GNUNET_YES);
2043
2044 #if DEBUG_TRANSPORT
2045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2046               "Setting inbound quota of %u for peer `%s' to \n",
2047               ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id));
2048 #endif
2049   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2050
2051   /* send ACK (ACK) */
2052   msg_len = sizeof (msg);
2053   msg.size = htons (msg_len);
2054   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2055
2056   ret =
2057       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2058                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2059                         n->plugin_name, n->addr, n->addrlen, GNUNET_YES, NULL,
2060                         NULL);
2061
2062   if (ret == GNUNET_SYSERR)
2063     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2064                 "Failed to send SESSION_ACK to `%4s' using plugin `%s' address '%s' session %X\n",
2065                 GNUNET_i2s (&n->id), n->plugin_name,
2066                 (n->addrlen ==
2067                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2068                                                      n->addrlen), n->session);
2069
2070
2071   if (!was_connected)
2072   {
2073     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2074       n->keepalive_task =
2075           GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2076                                         &neighbour_keepalive_task, n);
2077
2078     neighbours_connected++;
2079     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2080                               GNUNET_NO);
2081 #if DEBUG_TRANSPORT
2082     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2083                 "Notify about connect of `%4s' using plugin `%s' address '%s' session %X LINE %u\n",
2084                 GNUNET_i2s (&n->id), n->plugin_name,
2085                 (n->addrlen ==
2086                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2087                                                      n->addrlen), n->session,
2088                 __LINE__);
2089 #endif
2090     connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2091   }
2092
2093 #if DEBUG_TRANSPORT
2094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2095               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2096               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2097 #endif
2098   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2099   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2100   q_msg.quota = n->bandwidth_out;
2101   q_msg.peer = (*peer);
2102   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2103
2104 }
2105
2106 void
2107 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2108                            const struct GNUNET_PeerIdentity *peer,
2109                            const char *plugin_name, const char *sender_address,
2110                            uint16_t sender_address_len, struct Session *session,
2111                            const struct GNUNET_ATS_Information *ats,
2112                            uint32_t ats_count)
2113 {
2114   struct NeighbourMapEntry *n;
2115   struct QuotaSetMessage q_msg;
2116   int was_connected;
2117
2118 #if DEBUG_TRANSPORT
2119   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2120               GNUNET_i2s (peer));
2121 #endif
2122
2123   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2124   {
2125     GNUNET_break_op (0);
2126     return;
2127   }
2128
2129   n = lookup_neighbour (peer);
2130   if (NULL == n)
2131   {
2132     send_disconnect (peer, plugin_name, sender_address, sender_address_len,
2133                      session);
2134     GNUNET_break (0);
2135     return;
2136   }
2137
2138   if (is_connected (n))
2139     return;
2140
2141   if (!is_connecting(n))
2142   {
2143     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2144                               GNUNET_NO);
2145     return;
2146   }
2147
2148   if (NULL != session)
2149     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2150                      "transport-ats",
2151                      "Giving ATS session %p of plugin %s for peer %s\n",
2152                      session, plugin_name, GNUNET_i2s (peer));
2153   GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2154                              sender_address_len, session, ats, ats_count);
2155
2156   was_connected = is_connected (n);
2157   change_state (n, S_CONNECTED);
2158
2159   GNUNET_ATS_address_in_use (GST_ats, peer, plugin_name, sender_address,
2160                              sender_address_len, session, GNUNET_YES);
2161
2162   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2163
2164   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2165     n->keepalive_task =
2166         GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2167                                       &neighbour_keepalive_task, n);
2168
2169   if (!was_connected)
2170   {
2171     neighbours_connected++;
2172     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2173                               GNUNET_NO);
2174
2175 #if DEBUG_TRANSPORT
2176     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2177                 "Notify about connect of `%4s' using plugin `%s' address '%s' session %X LINE %u\n",
2178                 GNUNET_i2s (&n->id), n->plugin_name,
2179                 (n->addrlen ==
2180                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2181                                                      n->addrlen), n->session,
2182                 __LINE__);
2183 #endif
2184     connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2185   }
2186 #if DEBUG_TRANSPORT
2187   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2188               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2189               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2190 #endif
2191   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2192   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2193   q_msg.quota = n->bandwidth_out;
2194   q_msg.peer = (*peer);
2195   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2196 }
2197
2198 struct BlackListCheckContext
2199 {
2200   struct GNUNET_ATS_Information *ats;
2201
2202   uint32_t ats_count;
2203
2204   struct Session *session;
2205
2206   char *sender_address;
2207
2208   uint16_t sender_address_len;
2209
2210   char *plugin_name;
2211
2212   struct GNUNET_TIME_Absolute ts;
2213 };
2214
2215
2216 static void
2217 handle_connect_blacklist_cont (void *cls,
2218                                const struct GNUNET_PeerIdentity *peer,
2219                                int result)
2220 {
2221   struct NeighbourMapEntry *n;
2222   struct BlackListCheckContext *bcc = cls;
2223
2224 #if DEBUG_TRANSPORT
2225   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2226               "Blacklist check due to CONNECT message: `%s'\n",
2227               GNUNET_i2s (peer),
2228               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2229 #endif
2230
2231   /* not allowed */
2232   if (GNUNET_OK != result)
2233   {
2234     GNUNET_free (bcc);
2235     return;
2236   }
2237
2238   n = lookup_neighbour (peer);
2239   if (NULL == n)
2240     n = setup_neighbour (peer);
2241
2242   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2243   {
2244     if (NULL != bcc->session)
2245       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2246                        "transport-ats",
2247                        "Giving ATS session %p of plugin %s address `%s' for peer %s\n",
2248                        bcc->session, bcc->plugin_name,
2249                        GST_plugins_a2s (bcc->plugin_name, bcc->sender_address,
2250                                         bcc->sender_address_len),
2251                        GNUNET_i2s (peer));
2252     GNUNET_ATS_address_update (GST_ats, peer, bcc->plugin_name,
2253                                bcc->sender_address, bcc->sender_address_len,
2254                                bcc->session, bcc->ats, bcc->ats_count);
2255     n->connect_ts = bcc->ts;
2256   }
2257
2258   GNUNET_free (bcc);
2259
2260   if (n->state != S_CONNECT_RECV)
2261     change_state (n, S_CONNECT_RECV);
2262
2263   /* Ask ATS for an address to connect via that address */
2264   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2265     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2266   n->ats_suggest =
2267       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2268                                     n);
2269   GNUNET_ATS_suggest_address (GST_ats, peer);
2270 }
2271
2272 /**
2273  * We received a 'SESSION_CONNECT' message from the other peer.
2274  * Consider switching to it.
2275  *
2276  * @param message possibly a 'struct SessionConnectMessage' (check format)
2277  * @param peer identity of the peer to switch the address for
2278  * @param plugin_name name of transport that delivered the PONG
2279  * @param address address of the other peer, NULL if other peer
2280  *                       connected to us
2281  * @param address_len number of bytes in address
2282  * @param session session to use (or NULL)
2283  * @param ats performance data
2284  * @param ats_count number of entries in ats (excluding 0-termination)
2285   */
2286 void
2287 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2288                                const struct GNUNET_PeerIdentity *peer,
2289                                const char *plugin_name,
2290                                const char *sender_address,
2291                                uint16_t sender_address_len,
2292                                struct Session *session,
2293                                const struct GNUNET_ATS_Information *ats,
2294                                uint32_t ats_count)
2295 {
2296   const struct SessionConnectMessage *scm;
2297   struct NeighbourMapEntry *n;
2298   struct BlackListCheckContext *bcc = NULL;
2299
2300 #if DEBUG_TRANSPORT
2301   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2302               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2303 #endif
2304
2305   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2306   {
2307     GNUNET_break_op (0);
2308     return;
2309   }
2310
2311   scm = (const struct SessionConnectMessage *) message;
2312   GNUNET_break_op (ntohl (scm->reserved) == 0);
2313
2314   n = lookup_neighbour (peer);
2315   if (n != NULL)
2316   {
2317     /* connected peer switches addresses */
2318     if (is_connected (n))
2319     {
2320       GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2321                                  sender_address_len, session, ats, ats_count);
2322       return;
2323     }
2324   }
2325
2326   /* we are not connected to this peer */
2327   /* do blacklist check */
2328   bcc =
2329       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2330                      sizeof (struct GNUNET_ATS_Information) * ats_count +
2331                      sender_address_len + strlen (plugin_name) + 1);
2332
2333   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2334
2335   bcc->ats_count = ats_count;
2336   bcc->sender_address_len = sender_address_len;
2337   bcc->session = session;
2338
2339   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2340   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2341
2342   bcc->sender_address = (char *) &bcc->ats[ats_count];
2343   memcpy (bcc->sender_address, sender_address, sender_address_len);
2344
2345   bcc->plugin_name = &bcc->sender_address[sender_address_len];
2346   strcpy (bcc->plugin_name, plugin_name);
2347
2348   GST_blacklist_test_allowed (peer, plugin_name, handle_connect_blacklist_cont,
2349                               bcc);
2350 }
2351
2352
2353 /* end of file gnunet-service-transport_neighbours.c */