LRN: Call-transport-disconnect-to-clear-session-message-q.patch:
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours?
52  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
53  * send 3 keepalives in each interval, so 3 messages would need to be
54  * lost in a row for a disconnect).
55  */
56 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
57
58
59 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
60
61
62 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
63
64
65 /**
66  * Entry in neighbours.
67  */
68 struct NeighbourMapEntry;
69
70 /**
71  * Message a peer sends to another to indicate its
72  * preference for communicating via a particular
73  * session (and the desire to establish a real
74  * connection).
75  */
76 struct SessionConnectMessage
77 {
78   /**
79    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
80    */
81   struct GNUNET_MessageHeader header;
82
83   /**
84    * Always zero.
85    */
86   uint32_t reserved GNUNET_PACKED;
87
88   /**
89    * Absolute time at the sender.  Only the most recent connect
90    * message implies which session is preferred by the sender.
91    */
92   struct GNUNET_TIME_AbsoluteNBO timestamp;
93
94 };
95
96
97 struct SessionDisconnectMessage
98 {
99   /**
100    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Always zero.
106    */
107   uint32_t reserved GNUNET_PACKED;
108
109   /**
110    * Purpose of the signature.  Extends over the timestamp.
111    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
112    */
113   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
114
115   /**
116    * Absolute time at the sender.  Only the most recent connect
117    * message implies which session is preferred by the sender.
118    */
119   struct GNUNET_TIME_AbsoluteNBO timestamp;
120
121   /**
122    * Public key of the sender.
123    */
124   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
125
126   /**
127    * Signature of the peer that sends us the disconnect.  Only
128    * valid if the timestamp is AFTER the timestamp from the
129    * corresponding 'CONNECT' message.
130    */
131   struct GNUNET_CRYPTO_RsaSignature signature;
132
133 };
134
135
136 /**
137  * For each neighbour we keep a list of messages
138  * that we still want to transmit to the neighbour.
139  */
140 struct MessageQueue
141 {
142
143   /**
144    * This is a doubly linked list.
145    */
146   struct MessageQueue *next;
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *prev;
152
153   /**
154    * Once this message is actively being transmitted, which
155    * neighbour is it associated with?
156    */
157   struct NeighbourMapEntry *n;
158
159   /**
160    * Function to call once we're done.
161    */
162   GST_NeighbourSendContinuation cont;
163
164   /**
165    * Closure for 'cont'
166    */
167   void *cont_cls;
168
169   /**
170    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
171    * stuck together in memory.  Allocated at the end of this struct.
172    */
173   const char *message_buf;
174
175   /**
176    * Size of the message buf
177    */
178   size_t message_buf_size;
179
180   /**
181    * At what time should we fail?
182    */
183   struct GNUNET_TIME_Absolute timeout;
184
185 };
186
187 enum State
188 {
189   /* fresh peer or completely disconnected */
190   S_NOT_CONNECTED = 0,
191   /* sent CONNECT message to other peer, waiting for CONNECT_ACK */
192   S_CONNECT_SENT = 1,
193   /* received CONNECT message to other peer, sending CONNECT_ACK */
194   S_CONNECT_RECV = 4,
195   /* sent CONNECT_ACK message to other peer, wait for ACK or payload */
196   S_CONNECT_RECV_ACK_SENT = 8,
197   /* received ACK or payload */
198   S_CONNECTED = 16,
199   /* Disconnect in progress */
200   S_DISCONNECT = 32
201 };
202
203 /**
204  * Entry in neighbours.
205  */
206 struct NeighbourMapEntry
207 {
208
209   /**
210    * Head of list of messages we would like to send to this peer;
211    * must contain at most one message per client.
212    */
213   struct MessageQueue *messages_head;
214
215   /**
216    * Tail of list of messages we would like to send to this peer; must
217    * contain at most one message per client.
218    */
219   struct MessageQueue *messages_tail;
220
221   /**
222    * Performance data for the peer.
223    */
224   //struct GNUNET_ATS_Information *ats;
225
226   /**
227    * Are we currently trying to send a message? If so, which one?
228    */
229   struct MessageQueue *is_active;
230
231   /**
232    * Active session for communicating with the peer.
233    */
234   struct Session *session;
235
236   /**
237    * Name of the plugin we currently use.
238    */
239   char *plugin_name;
240
241   /**
242    * Address used for communicating with the peer, NULL for inbound connections.
243    */
244   void *addr;
245
246   /**
247    * Number of bytes in 'addr'.
248    */
249   size_t addrlen;
250
251   /**
252    * Identity of this neighbour.
253    */
254   struct GNUNET_PeerIdentity id;
255
256   /**
257    * ID of task scheduled to run when this peer is about to
258    * time out (will free resources associated with the peer).
259    */
260   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
261
262   /**
263    * ID of task scheduled to send keepalives.
264    */
265   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
266
267   /**
268    * ID of task scheduled to run when we should try transmitting
269    * the head of the message queue.
270    */
271   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
272
273   /**
274    * Tracker for inbound bandwidth.
275    */
276   struct GNUNET_BANDWIDTH_Tracker in_tracker;
277
278   /**
279    * Inbound bandwidth from ATS, activated when connection is up
280    */
281   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
282
283   /**
284    * Inbound bandwidth from ATS, activated when connection is up
285    */
286   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
287
288   /**
289    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
290    */
291   struct GNUNET_TIME_Absolute connect_ts;
292
293   /**
294    * Timeout for ATS
295    * We asked ATS for a new address for this peer
296    */
297   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
298
299   /**
300    * Task the resets the peer state after due to an pending
301    * unsuccessful connection setup
302    */
303   GNUNET_SCHEDULER_TaskIdentifier state_reset;
304
305   /**
306    * How often has the other peer (recently) violated the inbound
307    * traffic limit?  Incremented by 10 per violation, decremented by 1
308    * per non-violation (for each time interval).
309    */
310   unsigned int quota_violation_count;
311
312
313   /**
314    * The current state of the peer
315    * Element of enum State
316    */
317   int state;
318
319 };
320
321
322 /**
323  * All known neighbours and their HELLOs.
324  */
325 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
326
327 /**
328  * Closure for connect_notify_cb and disconnect_notify_cb
329  */
330 static void *callback_cls;
331
332 /**
333  * Function to call when we connected to a neighbour.
334  */
335 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
336
337 /**
338  * Function to call when we disconnected from a neighbour.
339  */
340 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
341
342 /**
343  * counter for connected neighbours
344  */
345 static int neighbours_connected;
346
347 /**
348  * Lookup a neighbour entry in the neighbours hash map.
349  *
350  * @param pid identity of the peer to look up
351  * @return the entry, NULL if there is no existing record
352  */
353 static struct NeighbourMapEntry *
354 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
355 {
356   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
357 }
358
359 #define change_state(n, state, ...) change (n, state, __LINE__)
360
361 static int
362 is_connecting (struct NeighbourMapEntry *n)
363 {
364   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
365     return GNUNET_YES;
366   return GNUNET_NO;
367 }
368
369 static int
370 is_connected (struct NeighbourMapEntry *n)
371 {
372   if (n->state == S_CONNECTED)
373     return GNUNET_YES;
374   return GNUNET_NO;
375 }
376
377 static int
378 is_disconnecting (struct NeighbourMapEntry *n)
379 {
380   if (n->state == S_DISCONNECT)
381     return GNUNET_YES;
382   return GNUNET_NO;
383 }
384
385 static const char *
386 print_state (int state)
387 {
388   switch (state)
389   {
390   case S_CONNECTED:
391     return "S_CONNECTED";
392     break;
393   case S_CONNECT_RECV:
394     return "S_CONNECT_RECV";
395     break;
396   case S_CONNECT_RECV_ACK_SENT:
397     return "S_CONNECT_RECV_ACK_SENT";
398     break;
399   case S_CONNECT_SENT:
400     return "S_CONNECT_SENT";
401     break;
402   case S_DISCONNECT:
403     return "S_DISCONNECT";
404     break;
405   case S_NOT_CONNECTED:
406     return "S_NOT_CONNECTED";
407     break;
408   default:
409     GNUNET_break (0);
410     break;
411   }
412   return NULL;
413 }
414
415 static int
416 change (struct NeighbourMapEntry *n, int state, int line);
417
418 static void
419 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
420
421 static void
422 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
423 {
424   struct NeighbourMapEntry *n = cls;
425
426   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
427
428 #if DEBUG_TRANSPORT
429   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
430               "Connection to peer `%s' %s failed in state `%s', resetting connection attempt \n",
431               GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name, n->addr,
432                                                     n->addrlen),
433               print_state (n->state));
434 #endif
435   GNUNET_STATISTICS_update (GST_stats,
436                             gettext_noop
437                             ("# failed connection attempts due to timeout"), 1,
438                             GNUNET_NO);
439
440   /* resetting state */
441   n->state = S_NOT_CONNECTED;
442
443   /* destroying address */
444   GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
445                                 n->addrlen, NULL);
446
447   /* request new address */
448   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
449     GNUNET_SCHEDULER_cancel (n->ats_suggest);
450   n->ats_suggest =
451       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
452                                     n);
453   GNUNET_ATS_suggest_address (GST_ats, &n->id);
454 }
455
456 static int
457 change (struct NeighbourMapEntry *n, int state, int line)
458 {
459   char *old = strdup (print_state (n->state));
460   char *new = strdup (print_state (state));
461
462   /* allowed transitions */
463   int allowed = GNUNET_NO;
464
465   switch (n->state)
466   {
467   case S_NOT_CONNECTED:
468     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
469         (state == S_DISCONNECT))
470     {
471       allowed = GNUNET_YES;
472
473       /* Schedule reset task */
474       if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT))
475       {
476         GNUNET_assert (n->state_reset == GNUNET_SCHEDULER_NO_TASK);
477         n->state_reset =
478             GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
479                                           n);
480       }
481
482       break;
483     }
484     break;
485   case S_CONNECT_RECV:
486     if ((state == S_NOT_CONNECTED) || (state == S_DISCONNECT) ||
487         (state == S_CONNECTED) ||
488         /* FIXME SENT -> RECV ISSUE! */ (state == S_CONNECT_SENT))
489     {
490       if ((state == S_CONNECTED) || (state == S_DISCONNECT) ||
491           (state == S_NOT_CONNECTED))
492       {
493 #if DEBUG_TRANSPORT
494         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495                     "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
496                     GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name,
497                                                           n->addr, n->addrlen),
498                     print_state (n->state), print_state (state));
499 #endif
500         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
501         GNUNET_SCHEDULER_cancel (n->state_reset);
502         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
503       }
504
505       allowed = GNUNET_YES;
506       break;
507     }
508     break;
509   case S_CONNECT_SENT:
510     if ((state == S_NOT_CONNECTED) || (state == S_CONNECTED) ||
511         (state == S_DISCONNECT) ||
512         /* FIXME SENT -> RECV ISSUE! */ (state == S_CONNECT_RECV))
513     {
514       if ((state == S_CONNECTED) || (state == S_DISCONNECT) ||
515           (state == S_NOT_CONNECTED))
516       {
517 #if DEBUG_TRANSPORT
518         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519                     "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
520                     GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name,
521                                                           n->addr, n->addrlen),
522                     print_state (n->state), print_state (state));
523 #endif
524         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
525         GNUNET_SCHEDULER_cancel (n->state_reset);
526         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
527       }
528
529       allowed = GNUNET_YES;
530       break;
531     }
532     break;
533   case S_CONNECTED:
534     if (state == S_DISCONNECT)
535     {
536       allowed = GNUNET_YES;
537       break;
538     }
539     break;
540   case S_DISCONNECT:
541     /*
542      * if (state == S_NOT_CONNECTED)
543      * {
544      * allowed = GNUNET_YES;
545      * break;
546      * } */
547     break;
548   default:
549     GNUNET_break (0);
550     break;
551
552   }
553
554   if (allowed == GNUNET_NO)
555   {
556     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
557                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
558                 new, line);
559     GNUNET_break (0);
560     GNUNET_free (old);
561     GNUNET_free (new);
562     return GNUNET_SYSERR;
563   }
564
565   n->state = state;
566 #if DEBUG_TRANSPORT
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
569               GNUNET_i2s (&n->id), n, old, new, line);
570 #endif
571   GNUNET_free (old);
572   GNUNET_free (new);
573   return GNUNET_OK;
574 }
575
576 static ssize_t
577 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
578                   size_t msgbuf_size, uint32_t priority,
579                   struct GNUNET_TIME_Relative timeout, struct Session *session,
580                   const char *plugin_name, const void *addr, size_t addrlen,
581                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
582                   void *cont_cls)
583 {
584   struct GNUNET_TRANSPORT_PluginFunctions *papi;
585   size_t ret = GNUNET_SYSERR;
586
587   /* FIXME : ats returns an address with all values 0 */
588   if (plugin_name == NULL)
589   {
590     if (cont != NULL)
591       cont (cont_cls, target, GNUNET_SYSERR);
592     return GNUNET_SYSERR;
593   }
594
595   if ((session == NULL) && (addr == NULL) && (addrlen == 0))
596   {
597     if (cont != NULL)
598       cont (cont_cls, target, GNUNET_SYSERR);
599     return GNUNET_SYSERR;
600   }
601
602   papi = GST_plugins_find (plugin_name);
603   if (papi == NULL)
604   {
605     if (cont != NULL)
606       cont (cont_cls, target, GNUNET_SYSERR);
607     return GNUNET_SYSERR;
608   }
609
610   ret =
611       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
612                   addr, addrlen, GNUNET_YES, cont, cont_cls);
613
614   if (ret == -1)
615   {
616     if (cont != NULL)
617       cont (cont_cls, target, GNUNET_SYSERR);
618   }
619   return ret;
620 }
621
622 /**
623  * Task invoked to start a transmission to another peer.
624  *
625  * @param cls the 'struct NeighbourMapEntry'
626  * @param tc scheduler context
627  */
628 static void
629 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
630
631
632 /**
633  * We're done with our transmission attempt, continue processing.
634  *
635  * @param cls the 'struct MessageQueue' of the message
636  * @param receiver intended receiver
637  * @param success whether it worked or not
638  */
639 static void
640 transmit_send_continuation (void *cls,
641                             const struct GNUNET_PeerIdentity *receiver,
642                             int success)
643 {
644   struct MessageQueue *mq;
645   struct NeighbourMapEntry *n;
646
647   mq = cls;
648   n = mq->n;
649   if (NULL != n)
650   {
651     GNUNET_assert (n->is_active == mq);
652     n->is_active = NULL;
653     if (success == GNUNET_YES)
654     {
655       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
656       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
657     }
658   }
659 #if DEBUG_TRANSPORT
660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
661               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
662               (success == GNUNET_OK) ? "successful" : "FAILED");
663 #endif
664   if (NULL != mq->cont)
665     mq->cont (mq->cont_cls, success);
666   GNUNET_free (mq);
667 }
668
669
670 /**
671  * Check the ready list for the given neighbour and if a plugin is
672  * ready for transmission (and if we have a message), do so!
673  *
674  * @param n target peer for which to transmit
675  */
676 static void
677 try_transmission_to_peer (struct NeighbourMapEntry *n)
678 {
679   struct MessageQueue *mq;
680   struct GNUNET_TIME_Relative timeout;
681   ssize_t ret;
682
683   if (n->is_active != NULL)
684   {
685     GNUNET_break (0);
686     return;                     /* transmission already pending */
687   }
688   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
689   {
690     GNUNET_break (0);
691     return;                     /* currently waiting for bandwidth */
692   }
693   while (NULL != (mq = n->messages_head))
694   {
695     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
696     if (timeout.rel_value > 0)
697       break;
698     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
699     n->is_active = mq;
700     mq->n = n;
701     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
702   }
703   if (NULL == mq)
704     return;                     /* no more messages */
705
706   if (GST_plugins_find (n->plugin_name) == NULL)
707   {
708     GNUNET_break (0);
709     return;
710   }
711   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
712   n->is_active = mq;
713   mq->n = n;
714
715   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
716   {
717     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
718                 GNUNET_i2s (&n->id));
719     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
720     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
721     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
722     return;
723   }
724
725   ret =
726       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
727                         timeout, n->session, n->plugin_name, n->addr,
728                         n->addrlen, GNUNET_YES, &transmit_send_continuation,
729                         mq);
730   if (ret == -1)
731   {
732     /* failure, but 'send' would not call continuation in this case,
733      * so we need to do it here! */
734     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
735   }
736
737 }
738
739
740 /**
741  * Task invoked to start a transmission to another peer.
742  *
743  * @param cls the 'struct NeighbourMapEntry'
744  * @param tc scheduler context
745  */
746 static void
747 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
748 {
749   struct NeighbourMapEntry *n = cls;
750
751   GNUNET_assert (NULL != lookup_neighbour (&n->id));
752   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
753   try_transmission_to_peer (n);
754 }
755
756
757 /**
758  * Initialize the neighbours subsystem.
759  *
760  * @param cls closure for callbacks
761  * @param connect_cb function to call if we connect to a peer
762  * @param disconnect_cb function to call if we disconnect from a peer
763  */
764 void
765 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
766                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
767 {
768   callback_cls = cls;
769   connect_notify_cb = connect_cb;
770   disconnect_notify_cb = disconnect_cb;
771   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
772 }
773
774
775 static void
776 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
777                       int result)
778 {
779 #if DEBUG_TRANSPORT
780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781               "Sending DISCONNECT message to peer `%4s': %i\n",
782               GNUNET_i2s (target), result);
783 #endif
784 }
785
786
787 static int
788 send_disconnect (const struct GNUNET_PeerIdentity *target,
789                  const char *plugin_name, const char *sender_address,
790                  uint16_t sender_address_len, struct Session *session)
791 {
792   size_t ret;
793   struct SessionDisconnectMessage disconnect_msg;
794
795 #if DEBUG_TRANSPORT
796   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
797               "Sending DISCONNECT message to peer `%4s'\n",
798               GNUNET_i2s (target));
799 #endif
800
801   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
802   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
803   disconnect_msg.reserved = htonl (0);
804   disconnect_msg.purpose.size =
805       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
806              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
807              sizeof (struct GNUNET_TIME_AbsoluteNBO));
808   disconnect_msg.purpose.purpose =
809       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
810   disconnect_msg.timestamp =
811       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
812   disconnect_msg.public_key = GST_my_public_key;
813   GNUNET_assert (GNUNET_OK ==
814                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
815                                          &disconnect_msg.purpose,
816                                          &disconnect_msg.signature));
817
818   ret =
819       send_with_plugin (target, (const char *) &disconnect_msg,
820                         sizeof (disconnect_msg), UINT32_MAX,
821                         GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name,
822                         sender_address, sender_address_len, GNUNET_YES,
823                         &send_disconnect_cont, NULL);
824
825   if (ret == GNUNET_SYSERR)
826     return GNUNET_SYSERR;
827
828   GNUNET_STATISTICS_update (GST_stats,
829                             gettext_noop
830                             ("# peers disconnected due to external request"), 1,
831                             GNUNET_NO);
832   return GNUNET_OK;
833 }
834
835 /**
836  * Disconnect from the given neighbour, clean up the record.
837  *
838  * @param n neighbour to disconnect from
839  */
840 static void
841 disconnect_neighbour (struct NeighbourMapEntry *n)
842 {
843   struct MessageQueue *mq;
844   int was_connected = is_connected (n);
845
846   /* send DISCONNECT MESSAGE */
847   if (is_connected (n) || is_connecting (n))
848   {
849     if (GNUNET_OK ==
850         send_disconnect (&n->id, n->plugin_name, n->addr, n->addrlen,
851                          n->session))
852       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
853                   GNUNET_i2s (&n->id));
854     else
855       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856                   "Could not send DISCONNECT_MSG to `%s'\n",
857                   GNUNET_i2s (&n->id));
858   }
859
860
861   if (is_disconnecting (n))
862     return;
863   change_state (n, S_DISCONNECT);
864
865   if (n->plugin_name != NULL)
866   {
867     struct GNUNET_TRANSPORT_PluginFunctions *papi;
868     papi = GST_plugins_find (n->plugin_name);
869     if (papi != NULL)
870       papi->disconnect (papi->cls, &n->id);
871   }
872
873   while (NULL != (mq = n->messages_head))
874   {
875     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
876     if (NULL != mq->cont)
877       mq->cont (mq->cont_cls, GNUNET_SYSERR);
878     GNUNET_free (mq);
879   }
880   if (NULL != n->is_active)
881   {
882     n->is_active->n = NULL;
883     n->is_active = NULL;
884   }
885   if (was_connected)
886   {
887     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
888     GNUNET_SCHEDULER_cancel (n->keepalive_task);
889     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
890     GNUNET_assert (neighbours_connected > 0);
891     neighbours_connected--;
892     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
893                               GNUNET_NO);
894     disconnect_notify_cb (callback_cls, &n->id);
895   }
896   GNUNET_assert (GNUNET_YES ==
897                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
898                                                        &n->id.hashPubKey, n));
899   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
900   {
901     GNUNET_SCHEDULER_cancel (n->ats_suggest);
902     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
903   }
904   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
905   {
906     GNUNET_SCHEDULER_cancel (n->timeout_task);
907     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
908   }
909   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
910   {
911     GNUNET_SCHEDULER_cancel (n->transmission_task);
912     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
913   }
914   if (NULL != n->plugin_name)
915   {
916     GNUNET_free (n->plugin_name);
917     n->plugin_name = NULL;
918   }
919   if (NULL != n->addr)
920   {
921     GNUNET_free (n->addr);
922     n->addr = NULL;
923     n->addrlen = 0;
924   }
925   n->session = NULL;
926   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
927               GNUNET_i2s (&n->id), n);
928   GNUNET_free (n);
929 }
930
931
932 /**
933  * Peer has been idle for too long. Disconnect.
934  *
935  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
936  * @param tc scheduler context
937  */
938 static void
939 neighbour_timeout_task (void *cls,
940                         const struct GNUNET_SCHEDULER_TaskContext *tc)
941 {
942   struct NeighbourMapEntry *n = cls;
943
944   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
945
946   GNUNET_STATISTICS_update (GST_stats,
947                             gettext_noop
948                             ("# peers disconnected due to timeout"), 1,
949                             GNUNET_NO);
950   disconnect_neighbour (n);
951 }
952
953
954 /**
955  * Send another keepalive message.
956  *
957  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
958  * @param tc scheduler context
959  */
960 static void
961 neighbour_keepalive_task (void *cls,
962                           const struct GNUNET_SCHEDULER_TaskContext *tc)
963 {
964   struct NeighbourMapEntry *n = cls;
965   struct GNUNET_MessageHeader m;
966
967   n->keepalive_task =
968       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
969                                     &neighbour_keepalive_task, n);
970   GNUNET_assert (is_connected (n));
971   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
972                             GNUNET_NO);
973   m.size = htons (sizeof (struct GNUNET_MessageHeader));
974   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
975
976   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
977                     UINT32_MAX /* priority */ ,
978                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name,
979                     n->addr, n->addrlen, GNUNET_YES, NULL, NULL);
980 }
981
982
983 /**
984  * Disconnect from the given neighbour.
985  *
986  * @param cls unused
987  * @param key hash of neighbour's public key (not used)
988  * @param value the 'struct NeighbourMapEntry' of the neighbour
989  */
990 static int
991 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
992 {
993   struct NeighbourMapEntry *n = value;
994
995 #if DEBUG_TRANSPORT
996   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
997               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
998 #endif
999   if (is_connected (n))
1000     GNUNET_STATISTICS_update (GST_stats,
1001                               gettext_noop
1002                               ("# peers disconnected due to global disconnect"),
1003                               1, GNUNET_NO);
1004   disconnect_neighbour (n);
1005   return GNUNET_OK;
1006 }
1007
1008
1009 static void
1010 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1011 {
1012   struct NeighbourMapEntry *n = cls;
1013
1014   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1015
1016   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017               " ATS did not suggested address to connect to peer `%s'\n",
1018               GNUNET_i2s (&n->id));
1019
1020   disconnect_neighbour (n);
1021 }
1022
1023 /**
1024  * Cleanup the neighbours subsystem.
1025  */
1026 void
1027 GST_neighbours_stop ()
1028 {
1029   // This can happen during shutdown
1030   if (neighbours == NULL)
1031   {
1032     return;
1033   }
1034
1035   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1036                                          NULL);
1037   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1038   GNUNET_assert (neighbours_connected == 0);
1039   neighbours = NULL;
1040   callback_cls = NULL;
1041   connect_notify_cb = NULL;
1042   disconnect_notify_cb = NULL;
1043 }
1044
1045
1046 /**
1047  * We tried to send a SESSION_CONNECT message to another peer.  If this
1048  * succeeded, we change the state.  If it failed, we should tell
1049  * ATS to not use this address anymore (until it is re-validated).
1050  *
1051  * @param cls the 'struct NeighbourMapEntry'
1052  * @param success GNUNET_OK on success
1053  */
1054 static void
1055 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1056                            int success)
1057 {
1058   struct NeighbourMapEntry *n = cls;
1059
1060   GNUNET_assert (n != NULL);
1061   GNUNET_assert (!is_connected (n));
1062
1063   if (is_disconnecting (n))
1064     return;                     /* neighbour is going away */
1065
1066   if (GNUNET_YES != success)
1067   {
1068 #if DEBUG_TRANSPORT
1069     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070                 "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1071                 GNUNET_i2s (&n->id), n->plugin_name,
1072                 (n->addrlen ==
1073                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
1074                                                      n->addrlen), n->session);
1075 #endif
1076
1077     GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1078                                   n->addrlen, NULL);
1079
1080     change_state (n, S_NOT_CONNECTED);
1081
1082     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1083       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1084     n->ats_suggest =
1085         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1086                                       n);
1087     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1088     return;
1089   }
1090
1091
1092 }
1093
1094
1095 /**
1096  * We tried to switch addresses with an peer already connected. If it failed,
1097  * we should tell ATS to not use this address anymore (until it is re-validated).
1098  *
1099  * @param cls the 'struct NeighbourMapEntry'
1100  * @param success GNUNET_OK on success
1101  */
1102 static void
1103 send_switch_address_continuation (void *cls,
1104                                   const struct GNUNET_PeerIdentity *target,
1105                                   int success)
1106 {
1107   struct NeighbourMapEntry *n = cls;
1108
1109   GNUNET_assert (n != NULL);
1110   if (is_disconnecting (n))
1111     return;                     /* neighbour is going away */
1112
1113   GNUNET_assert (n->state == S_CONNECTED);
1114   if (GNUNET_YES != success)
1115   {
1116 #if DEBUG_TRANSPORT
1117     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118                 "Failed to switch connected peer `%s' to plugin `%s' address '%s' session %X, asking ATS for new address \n",
1119                 GNUNET_i2s (&n->id), n->plugin_name,
1120                 (n->addrlen ==
1121                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
1122                                                      n->addrlen), n->session);
1123 #endif
1124
1125     GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1126                                   n->addrlen, NULL);
1127
1128     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1129       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1130     n->ats_suggest =
1131         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1132                                       n);
1133     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1134     return;
1135   }
1136 }
1137
1138 /**
1139  * We tried to send a SESSION_CONNECT message to another peer.  If this
1140  * succeeded, we change the state.  If it failed, we should tell
1141  * ATS to not use this address anymore (until it is re-validated).
1142  *
1143  * @param cls the 'struct NeighbourMapEntry'
1144  * @param success GNUNET_OK on success
1145  */
1146 static void
1147 send_connect_ack_continuation (void *cls,
1148                                const struct GNUNET_PeerIdentity *target,
1149                                int success)
1150 {
1151   struct NeighbourMapEntry *n = cls;
1152
1153   GNUNET_assert (n != NULL);
1154
1155   if (GNUNET_YES == success)
1156     return;                     /* sending successful */
1157
1158   /* sending failed, ask for next address  */
1159 #if DEBUG_TRANSPORT
1160   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1161               "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1162               GNUNET_i2s (&n->id), n->plugin_name,
1163               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name,
1164                                                                  n->addr,
1165                                                                  n->addrlen),
1166               n->session);
1167 #endif
1168   change_state (n, S_NOT_CONNECTED);
1169
1170   GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1171                                 n->addrlen, NULL);
1172
1173   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1174     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1175   n->ats_suggest =
1176       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1177                                     n);
1178   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1179 }
1180
1181 /**
1182  * For an existing neighbour record, set the active connection to
1183  * the given address.
1184  *
1185  * @param peer identity of the peer to switch the address for
1186  * @param plugin_name name of transport that delivered the PONG
1187  * @param address address of the other peer, NULL if other peer
1188  *                       connected to us
1189  * @param address_len number of bytes in address
1190  * @param session session to use (or NULL)
1191  * @param ats performance data
1192  * @param ats_count number of entries in ats
1193  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1194  *         connection is not up (yet)
1195  */
1196 int
1197 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1198                                        const char *plugin_name,
1199                                        const void *address, size_t address_len,
1200                                        struct Session *session,
1201                                        const struct GNUNET_ATS_Information *ats,
1202                                        uint32_t ats_count,
1203                                        struct GNUNET_BANDWIDTH_Value32NBO
1204                                        bandwidth_in,
1205                                        struct GNUNET_BANDWIDTH_Value32NBO
1206                                        bandwidth_out)
1207 {
1208   struct NeighbourMapEntry *n;
1209   struct SessionConnectMessage connect_msg;
1210   size_t msg_len;
1211   size_t ret;
1212   int checks_failed;
1213
1214   // This can happen during shutdown
1215   if (neighbours == NULL)
1216   {
1217     return GNUNET_NO;
1218   }
1219
1220   checks_failed = GNUNET_NO;
1221
1222   if (plugin_name == NULL)
1223   {
1224     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1225                 "ATS offered suggested us empty address: plugin NULL");
1226     GNUNET_break_op (0);
1227     checks_failed = GNUNET_YES;
1228   }
1229   if ((address == NULL) && (address_len == 0) && (session == NULL))
1230   {
1231     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1232                 "ATS offered suggested us empty address: address NULL & session NULL");
1233     GNUNET_break_op (0);
1234     checks_failed = GNUNET_YES;
1235   }
1236
1237   n = lookup_neighbour (peer);
1238   if (NULL == n)
1239     checks_failed = GNUNET_YES;
1240
1241   if (checks_failed == GNUNET_YES)
1242   {
1243     GNUNET_ATS_address_destroyed (GST_ats, peer, plugin_name, address,
1244                                   address_len, session);
1245     if (n != NULL)
1246       GNUNET_ATS_suggest_address (GST_ats, peer);
1247     return GNUNET_NO;
1248   }
1249
1250   /* checks successful and neighbour != NULL */
1251 #if DEBUG_TRANSPORT
1252   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1253               "ATS tells us to switch to plugin `%s' address '%s' session %X for %s peer `%s'\n",
1254               plugin_name,
1255               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
1256                                                                   address,
1257                                                                   address_len),
1258               session, (is_connected (n) ? "CONNECTED" : "NOT CONNECTED"),
1259               GNUNET_i2s (peer));
1260 #endif
1261
1262   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1263   {
1264     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1265     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1266   }
1267
1268   // do not switch addresses just update quotas
1269   if ((is_connected (n)) && (address_len == n->addrlen))
1270   {
1271     if ((0 == memcmp (address, n->addr, address_len)) &&
1272         (n->session == session))
1273     {
1274       struct QuotaSetMessage q_msg;
1275
1276 #if DEBUG_TRANSPORT
1277       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1278                   "Sending outbound quota of %u Bps and inbound quota of %u Bps for peer `%s' to all clients\n",
1279                   ntohl (n->bandwidth_out.value__),
1280                   ntohl (n->bandwidth_in.value__), GNUNET_i2s (peer));
1281 #endif
1282
1283       n->bandwidth_in = bandwidth_in;
1284       n->bandwidth_out = bandwidth_out;
1285       GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1286
1287       q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1288       q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1289       q_msg.quota = n->bandwidth_out;
1290       q_msg.peer = (*peer);
1291       GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1292       return GNUNET_NO;
1293     }
1294   }
1295
1296   GNUNET_free_non_null (n->addr);
1297   n->addr = GNUNET_malloc (address_len);
1298   memcpy (n->addr, address, address_len);
1299   n->bandwidth_in = bandwidth_in;
1300   n->bandwidth_out = bandwidth_out;
1301   n->addrlen = address_len;
1302   n->session = session;
1303   GNUNET_free_non_null (n->plugin_name);
1304   n->plugin_name = GNUNET_strdup (plugin_name);
1305   GNUNET_SCHEDULER_cancel (n->timeout_task);
1306   n->timeout_task =
1307       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1308                                     &neighbour_timeout_task, n);
1309
1310   if (n->state == S_DISCONNECT)
1311   {
1312     /* We are disconnecting, nothing to do here */
1313     return GNUNET_NO;
1314   }
1315   /* We are not connected/connecting and initiate a fresh connect */
1316   if (n->state == S_NOT_CONNECTED)
1317   {
1318     msg_len = sizeof (struct SessionConnectMessage);
1319     connect_msg.header.size = htons (msg_len);
1320     connect_msg.header.type =
1321         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1322     connect_msg.reserved = htonl (0);
1323     connect_msg.timestamp =
1324         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1325
1326     change_state (n, S_CONNECT_SENT);
1327
1328     ret =
1329         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1330                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1331                           plugin_name, address, address_len, GNUNET_YES,
1332                           &send_connect_continuation, n);
1333
1334     return GNUNET_NO;
1335   }
1336   /* We received a CONNECT message and asked ATS for an address */
1337   else if (n->state == S_CONNECT_RECV)
1338   {
1339     msg_len = sizeof (struct SessionConnectMessage);
1340     connect_msg.header.size = htons (msg_len);
1341     connect_msg.header.type =
1342         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1343     connect_msg.reserved = htonl (0);
1344     connect_msg.timestamp =
1345         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1346
1347     ret =
1348         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1349                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1350                           plugin_name, address, address_len, GNUNET_YES,
1351                           &send_connect_ack_continuation, n);
1352     return GNUNET_NO;
1353   }
1354   /* connected peer is switching addresses */
1355   else if (n->state == S_CONNECTED)
1356   {
1357     msg_len = sizeof (struct SessionConnectMessage);
1358     connect_msg.header.size = htons (msg_len);
1359     connect_msg.header.type =
1360         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1361     connect_msg.reserved = htonl (0);
1362     connect_msg.timestamp =
1363         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1364
1365     ret =
1366         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1367                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1368                           plugin_name, address, address_len, GNUNET_YES,
1369                           &send_switch_address_continuation, n);
1370     if (ret == GNUNET_SYSERR)
1371     {
1372       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373                   "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n",
1374                   GNUNET_i2s (peer), plugin_name,
1375                   (address_len ==
1376                    0) ? "<inbound>" : GST_plugins_a2s (plugin_name, address,
1377                                                        address_len), session);
1378     }
1379     return GNUNET_NO;
1380   }
1381   else if (n->state == S_CONNECT_SENT)
1382   {
1383     return GNUNET_NO;
1384   }
1385   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1386               "Invalid connection state to switch addresses %u \n", n->state);
1387   GNUNET_break_op (0);
1388   return GNUNET_NO;
1389 }
1390
1391
1392 /**
1393  * Create an entry in the neighbour map for the given peer
1394  *
1395  * @param peer peer to create an entry for
1396  * @return new neighbour map entry
1397  */
1398 static struct NeighbourMapEntry *
1399 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1400 {
1401   struct NeighbourMapEntry *n;
1402
1403 #if DEBUG_TRANSPORT
1404   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1405               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1406 #endif
1407   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1408   n->id = *peer;
1409   n->state = S_NOT_CONNECTED;
1410   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1411                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1412                                  MAX_BANDWIDTH_CARRY_S);
1413   n->timeout_task =
1414       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1415                                     &neighbour_timeout_task, n);
1416   GNUNET_assert (GNUNET_OK ==
1417                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1418                                                     &n->id.hashPubKey, n,
1419                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1420   return n;
1421 }
1422
1423
1424 /**
1425  * Try to create a connection to the given target (eventually).
1426  *
1427  * @param target peer to try to connect to
1428  */
1429 void
1430 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1431 {
1432   struct NeighbourMapEntry *n;
1433
1434   // This can happen during shutdown
1435   if (neighbours == NULL)
1436   {
1437     return;
1438   }
1439 #if DEBUG_TRANSPORT
1440   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1441               GNUNET_i2s (target));
1442 #endif
1443   if (0 ==
1444       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1445   {
1446     /* my own hello */
1447     return;
1448   }
1449   n = lookup_neighbour (target);
1450
1451   if (NULL != n)
1452   {
1453     if ((is_connected (n)) || (is_connecting (n)))
1454       return;                   /* already connecting or connected */
1455     if (is_disconnecting (n))
1456       change_state (n, S_NOT_CONNECTED);
1457   }
1458
1459
1460   if (n == NULL)
1461     n = setup_neighbour (target);
1462 #if DEBUG_TRANSPORT
1463   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1464               "Asking ATS for suggested address to connect to peer `%s'\n",
1465               GNUNET_i2s (&n->id));
1466 #endif
1467
1468   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1469 }
1470
1471 /**
1472  * Test if we're connected to the given peer.
1473  *
1474  * @param target peer to test
1475  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1476  */
1477 int
1478 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1479 {
1480   struct NeighbourMapEntry *n;
1481
1482   // This can happen during shutdown
1483   if (neighbours == NULL)
1484   {
1485     return GNUNET_NO;
1486   }
1487
1488   n = lookup_neighbour (target);
1489
1490   if ((NULL == n) || (!is_connected (n)))
1491     return GNUNET_NO;           /* not connected */
1492   return GNUNET_YES;
1493 }
1494
1495
1496 /**
1497  * A session was terminated. Take note.
1498  *
1499  * @param peer identity of the peer where the session died
1500  * @param session session that is gone
1501  */
1502 void
1503 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1504                                    struct Session *session)
1505 {
1506   struct NeighbourMapEntry *n;
1507
1508   // This can happen during shutdown
1509   if (neighbours == NULL)
1510   {
1511     return;
1512   }
1513
1514 #if DEBUG_TRANSPORT
1515   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1516               session, GNUNET_i2s (peer));
1517 #endif
1518
1519   n = lookup_neighbour (peer);
1520   if (NULL == n)
1521     return;
1522   if (session != n->session)
1523     return;                     /* doesn't affect us */
1524
1525   n->session = NULL;
1526   GNUNET_free (n->addr);
1527   n->addr = NULL;
1528   n->addrlen = 0;
1529
1530   /* not connected anymore anyway, shouldn't matter */
1531   if ((!is_connected (n)) && (!is_connecting (n)))
1532     return;
1533
1534   /* We are connected, so ask ATS to switch addresses */
1535   GNUNET_SCHEDULER_cancel (n->timeout_task);
1536   n->timeout_task =
1537       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1538                                     &neighbour_timeout_task, n);
1539   /* try QUICKLY to re-establish a connection, reduce timeout! */
1540   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1541     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1542   n->ats_suggest =
1543       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1544                                     n);
1545   GNUNET_ATS_suggest_address (GST_ats, peer);
1546 }
1547
1548
1549 /**
1550  * Transmit a message to the given target using the active connection.
1551  *
1552  * @param target destination
1553  * @param msg message to send
1554  * @param msg_size number of bytes in msg
1555  * @param timeout when to fail with timeout
1556  * @param cont function to call when done
1557  * @param cont_cls closure for 'cont'
1558  */
1559 void
1560 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1561                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1562                      GST_NeighbourSendContinuation cont, void *cont_cls)
1563 {
1564   struct NeighbourMapEntry *n;
1565   struct MessageQueue *mq;
1566
1567   // This can happen during shutdown
1568   if (neighbours == NULL)
1569   {
1570     return;
1571   }
1572
1573   n = lookup_neighbour (target);
1574   if ((n == NULL) || (!is_connected (n)))
1575   {
1576     GNUNET_STATISTICS_update (GST_stats,
1577                               gettext_noop
1578                               ("# messages not sent (no such peer or not connected)"),
1579                               1, GNUNET_NO);
1580 #if DEBUG_TRANSPORT
1581     if (n == NULL)
1582       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1583                   "Could not send message to peer `%s': unknown neighbour",
1584                   GNUNET_i2s (target));
1585     else if (!is_connected (n))
1586       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1587                   "Could not send message to peer `%s': not connected\n",
1588                   GNUNET_i2s (target));
1589 #endif
1590     if (NULL != cont)
1591       cont (cont_cls, GNUNET_SYSERR);
1592     return;
1593   }
1594
1595   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
1596   {
1597     GNUNET_STATISTICS_update (GST_stats,
1598                               gettext_noop
1599                               ("# messages not sent (no such peer or not connected)"),
1600                               1, GNUNET_NO);
1601 #if DEBUG_TRANSPORT
1602     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1603                 "Could not send message to peer `%s': no address available\n",
1604                 GNUNET_i2s (target));
1605 #endif
1606
1607     if (NULL != cont)
1608       cont (cont_cls, GNUNET_SYSERR);
1609     return;
1610   }
1611
1612   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1613   GNUNET_STATISTICS_update (GST_stats,
1614                             gettext_noop
1615                             ("# bytes in message queue for other peers"),
1616                             msg_size, GNUNET_NO);
1617   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1618   mq->cont = cont;
1619   mq->cont_cls = cont_cls;
1620   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1621   memcpy (&mq[1], msg, msg_size);
1622   mq->message_buf = (const char *) &mq[1];
1623   mq->message_buf_size = msg_size;
1624   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1625   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1626
1627   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1628       (NULL == n->is_active))
1629     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1630 }
1631
1632
1633 /**
1634  * We have received a message from the given sender.  How long should
1635  * we delay before receiving more?  (Also used to keep the peer marked
1636  * as live).
1637  *
1638  * @param sender sender of the message
1639  * @param size size of the message
1640  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1641  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1642  *                   GNUNET_SYSERR if the connection is not fully up yet
1643  * @return how long to wait before reading more from this sender
1644  */
1645 struct GNUNET_TIME_Relative
1646 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1647                                         *sender, ssize_t size, int *do_forward)
1648 {
1649   struct NeighbourMapEntry *n;
1650   struct GNUNET_TIME_Relative ret;
1651
1652   // This can happen during shutdown
1653   if (neighbours == NULL)
1654   {
1655     return GNUNET_TIME_UNIT_FOREVER_REL;
1656   }
1657
1658   n = lookup_neighbour (sender);
1659   if (n == NULL)
1660   {
1661     GST_neighbours_try_connect (sender);
1662     n = lookup_neighbour (sender);
1663     if (NULL == n)
1664     {
1665       GNUNET_STATISTICS_update (GST_stats,
1666                                 gettext_noop
1667                                 ("# messages discarded due to lack of neighbour record"),
1668                                 1, GNUNET_NO);
1669       *do_forward = GNUNET_NO;
1670       return GNUNET_TIME_UNIT_ZERO;
1671     }
1672   }
1673   if (!is_connected (n))
1674   {
1675     *do_forward = GNUNET_SYSERR;
1676     return GNUNET_TIME_UNIT_ZERO;
1677   }
1678   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1679   {
1680     n->quota_violation_count++;
1681 #if DEBUG_TRANSPORT
1682     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1684                 n->in_tracker.available_bytes_per_s__,
1685                 n->quota_violation_count);
1686 #endif
1687     /* Discount 32k per violation */
1688     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1689   }
1690   else
1691   {
1692     if (n->quota_violation_count > 0)
1693     {
1694       /* try to add 32k back */
1695       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1696       n->quota_violation_count--;
1697     }
1698   }
1699   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1700   {
1701     GNUNET_STATISTICS_update (GST_stats,
1702                               gettext_noop
1703                               ("# bandwidth quota violations by other peers"),
1704                               1, GNUNET_NO);
1705     *do_forward = GNUNET_NO;
1706     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1707   }
1708   *do_forward = GNUNET_YES;
1709   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1710   if (ret.rel_value > 0)
1711   {
1712 #if DEBUG_TRANSPORT
1713     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1714                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1715                 (unsigned long long) n->in_tracker.
1716                 consumption_since_last_update__,
1717                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1718                 (unsigned long long) ret.rel_value);
1719 #endif
1720     GNUNET_STATISTICS_update (GST_stats,
1721                               gettext_noop ("# ms throttling suggested"),
1722                               (int64_t) ret.rel_value, GNUNET_NO);
1723   }
1724   return ret;
1725 }
1726
1727
1728 /**
1729  * Keep the connection to the given neighbour alive longer,
1730  * we received a KEEPALIVE (or equivalent).
1731  *
1732  * @param neighbour neighbour to keep alive
1733  */
1734 void
1735 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1736 {
1737   struct NeighbourMapEntry *n;
1738
1739   // This can happen during shutdown
1740   if (neighbours == NULL)
1741   {
1742     return;
1743   }
1744
1745   n = lookup_neighbour (neighbour);
1746   if (NULL == n)
1747   {
1748     GNUNET_STATISTICS_update (GST_stats,
1749                               gettext_noop
1750                               ("# KEEPALIVE messages discarded (not connected)"),
1751                               1, GNUNET_NO);
1752     return;
1753   }
1754   GNUNET_SCHEDULER_cancel (n->timeout_task);
1755   n->timeout_task =
1756       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1757                                     &neighbour_timeout_task, n);
1758 }
1759
1760
1761 /**
1762  * Change the incoming quota for the given peer.
1763  *
1764  * @param neighbour identity of peer to change qutoa for
1765  * @param quota new quota
1766  */
1767 void
1768 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1769                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1770 {
1771   struct NeighbourMapEntry *n;
1772
1773   // This can happen during shutdown
1774   if (neighbours == NULL)
1775   {
1776     return;
1777   }
1778
1779   n = lookup_neighbour (neighbour);
1780   if (n == NULL)
1781   {
1782     GNUNET_STATISTICS_update (GST_stats,
1783                               gettext_noop
1784                               ("# SET QUOTA messages ignored (no such peer)"),
1785                               1, GNUNET_NO);
1786     return;
1787   }
1788   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1789   if (0 != ntohl (quota.value__))
1790     return;
1791 #if DEBUG_TRANSPORT
1792   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1793               GNUNET_i2s (&n->id), "SET_QUOTA");
1794 #endif
1795   if (is_connected (n))
1796     GNUNET_STATISTICS_update (GST_stats,
1797                               gettext_noop ("# disconnects due to quota of 0"),
1798                               1, GNUNET_NO);
1799   disconnect_neighbour (n);
1800 }
1801
1802
1803 /**
1804  * Closure for the neighbours_iterate function.
1805  */
1806 struct IteratorContext
1807 {
1808   /**
1809    * Function to call on each connected neighbour.
1810    */
1811   GST_NeighbourIterator cb;
1812
1813   /**
1814    * Closure for 'cb'.
1815    */
1816   void *cb_cls;
1817 };
1818
1819
1820 /**
1821  * Call the callback from the closure for each connected neighbour.
1822  *
1823  * @param cls the 'struct IteratorContext'
1824  * @param key the hash of the public key of the neighbour
1825  * @param value the 'struct NeighbourMapEntry'
1826  * @return GNUNET_OK (continue to iterate)
1827  */
1828 static int
1829 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1830 {
1831   struct IteratorContext *ic = cls;
1832   struct NeighbourMapEntry *n = value;
1833
1834   if (!is_connected (n))
1835     return GNUNET_OK;
1836
1837   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->plugin_name, n->addr, n->addrlen);
1838   return GNUNET_OK;
1839 }
1840
1841
1842 /**
1843  * Iterate over all connected neighbours.
1844  *
1845  * @param cb function to call
1846  * @param cb_cls closure for cb
1847  */
1848 void
1849 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1850 {
1851   struct IteratorContext ic;
1852
1853   // This can happen during shutdown
1854   if (neighbours == NULL)
1855   {
1856     return;
1857   }
1858
1859   ic.cb = cb;
1860   ic.cb_cls = cb_cls;
1861   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1862 }
1863
1864 /**
1865  * If we have an active connection to the given target, it must be shutdown.
1866  *
1867  * @param target peer to disconnect from
1868  */
1869 void
1870 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1871 {
1872   struct NeighbourMapEntry *n;
1873
1874   // This can happen during shutdown
1875   if (neighbours == NULL)
1876   {
1877     return;
1878   }
1879
1880   n = lookup_neighbour (target);
1881   if (NULL == n)
1882     return;                     /* not active */
1883   if (is_connected (n))
1884   {
1885     send_disconnect (&n->id, n->plugin_name, n->addr, n->addrlen, n->session);
1886
1887     n = lookup_neighbour (target);
1888     if (NULL == n)
1889       return;                   /* gone already */
1890   }
1891   disconnect_neighbour (n);
1892 }
1893
1894
1895 /**
1896  * We received a disconnect message from the given peer,
1897  * validate and process.
1898  *
1899  * @param peer sender of the message
1900  * @param msg the disconnect message
1901  */
1902 void
1903 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
1904                                           *peer,
1905                                           const struct GNUNET_MessageHeader
1906                                           *msg)
1907 {
1908   struct NeighbourMapEntry *n;
1909   const struct SessionDisconnectMessage *sdm;
1910   GNUNET_HashCode hc;
1911
1912 #if DEBUG_TRANSPORT
1913   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1914               "Received DISCONNECT message from peer `%s'\n",
1915               GNUNET_i2s (peer));
1916 #endif
1917
1918   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
1919   {
1920     // GNUNET_break_op (0);
1921     GNUNET_STATISTICS_update (GST_stats,
1922                               gettext_noop
1923                               ("# disconnect messages ignored (old format)"), 1,
1924                               GNUNET_NO);
1925     return;
1926   }
1927   sdm = (const struct SessionDisconnectMessage *) msg;
1928   n = lookup_neighbour (peer);
1929   if (NULL == n)
1930     return;                     /* gone already */
1931   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
1932       n->connect_ts.abs_value)
1933   {
1934     GNUNET_STATISTICS_update (GST_stats,
1935                               gettext_noop
1936                               ("# disconnect messages ignored (timestamp)"), 1,
1937                               GNUNET_NO);
1938     return;
1939   }
1940   GNUNET_CRYPTO_hash (&sdm->public_key,
1941                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1942                       &hc);
1943   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
1944   {
1945     GNUNET_break_op (0);
1946     return;
1947   }
1948   if (ntohl (sdm->purpose.size) !=
1949       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1950       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1951       sizeof (struct GNUNET_TIME_AbsoluteNBO))
1952   {
1953     GNUNET_break_op (0);
1954     return;
1955   }
1956   if (GNUNET_OK !=
1957       GNUNET_CRYPTO_rsa_verify
1958       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
1959        &sdm->signature, &sdm->public_key))
1960   {
1961     GNUNET_break_op (0);
1962     return;
1963   }
1964   GST_neighbours_force_disconnect (peer);
1965 }
1966
1967 /**
1968  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
1969  * Consider switching to it.
1970  *
1971  * @param message possibly a 'struct SessionConnectMessage' (check format)
1972  * @param peer identity of the peer to switch the address for
1973  * @param plugin_name name of transport that delivered the PONG
1974  * @param address address of the other peer, NULL if other peer
1975  *                       connected to us
1976  * @param address_len number of bytes in address
1977  * @param session session to use (or NULL)
1978  * @param ats performance data
1979  * @param ats_count number of entries in ats
1980   */
1981 void
1982 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
1983                                    const struct GNUNET_PeerIdentity *peer,
1984                                    const char *plugin_name,
1985                                    const char *sender_address,
1986                                    uint16_t sender_address_len,
1987                                    struct Session *session,
1988                                    const struct GNUNET_ATS_Information *ats,
1989                                    uint32_t ats_count)
1990 {
1991   const struct SessionConnectMessage *scm;
1992   struct QuotaSetMessage q_msg;
1993   struct GNUNET_MessageHeader msg;
1994   struct NeighbourMapEntry *n;
1995   size_t msg_len;
1996   size_t ret;
1997   int was_connected;
1998
1999 #if DEBUG_TRANSPORT
2000   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2001               "Received CONNECT_ACK message from peer `%s'\n",
2002               GNUNET_i2s (peer));
2003 #endif
2004
2005   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2006   {
2007     GNUNET_break_op (0);
2008     return;
2009   }
2010
2011   scm = (const struct SessionConnectMessage *) message;
2012   GNUNET_break_op (ntohl (scm->reserved) == 0);
2013   n = lookup_neighbour (peer);
2014   if (NULL == n)
2015     n = setup_neighbour (peer);
2016 /*
2017   if (n->state != S_CONNECT_SENT)
2018   {
2019     GNUNET_break (0);
2020     send_disconnect(&n->id, n->plugin_name, n->addr, n->addrlen, n->session);
2021     return;
2022   }
2023 */
2024   if (NULL != session)
2025     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2026                      "transport-ats",
2027                      "Giving ATS session %p of plugin %s for peer %s\n",
2028                      session, plugin_name, GNUNET_i2s (peer));
2029   GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2030                              sender_address_len, session, ats, ats_count);
2031
2032   was_connected = is_connected (n);
2033   if (!is_connected (n))
2034     change_state (n, S_CONNECTED);
2035
2036   GNUNET_ATS_address_in_use (GST_ats, peer, plugin_name, sender_address,
2037                              sender_address_len, session, GNUNET_YES);
2038
2039 #if DEBUG_TRANSPORT
2040   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2041               "Setting inbound quota of %u for peer `%s' to \n",
2042               ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id));
2043 #endif
2044   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2045
2046   /* send ACK (ACK) */
2047   msg_len = sizeof (msg);
2048   msg.size = htons (msg_len);
2049   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2050
2051   ret =
2052       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2053                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2054                         n->plugin_name, n->addr, n->addrlen, GNUNET_YES, NULL,
2055                         NULL);
2056
2057   if (ret == GNUNET_SYSERR)
2058     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2059                 "Failed to send SESSION_ACK to `%4s' using plugin `%s' address '%s' session %X\n",
2060                 GNUNET_i2s (&n->id), n->plugin_name,
2061                 (n->addrlen ==
2062                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2063                                                      n->addrlen), n->session);
2064
2065
2066   if (!was_connected)
2067   {
2068     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2069       n->keepalive_task =
2070           GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2071                                         &neighbour_keepalive_task, n);
2072
2073     neighbours_connected++;
2074     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2075                               GNUNET_NO);
2076 #if DEBUG_TRANSPORT
2077     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2078                 "Notify about connect of `%4s' using plugin `%s' address '%s' session %X LINE %u\n",
2079                 GNUNET_i2s (&n->id), n->plugin_name,
2080                 (n->addrlen ==
2081                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2082                                                      n->addrlen), n->session,
2083                 __LINE__);
2084 #endif
2085     connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2086   }
2087
2088 #if DEBUG_TRANSPORT
2089   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2090               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2091               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2092 #endif
2093   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2094   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2095   q_msg.quota = n->bandwidth_out;
2096   q_msg.peer = (*peer);
2097   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2098
2099 }
2100
2101 void
2102 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2103                            const struct GNUNET_PeerIdentity *peer,
2104                            const char *plugin_name, const char *sender_address,
2105                            uint16_t sender_address_len, struct Session *session,
2106                            const struct GNUNET_ATS_Information *ats,
2107                            uint32_t ats_count)
2108 {
2109   struct NeighbourMapEntry *n;
2110   struct QuotaSetMessage q_msg;
2111   int was_connected;
2112
2113 #if DEBUG_TRANSPORT
2114   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2115               GNUNET_i2s (peer));
2116 #endif
2117
2118   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2119   {
2120     GNUNET_break_op (0);
2121     return;
2122   }
2123
2124   n = lookup_neighbour (peer);
2125   if (NULL == n)
2126   {
2127     send_disconnect (peer, plugin_name, sender_address, sender_address_len,
2128                      session);
2129     GNUNET_break (0);
2130     return;
2131   }
2132
2133   if (is_connected (n))
2134     return;
2135
2136   if (NULL != session)
2137     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2138                      "transport-ats",
2139                      "Giving ATS session %p of plugin %s for peer %s\n",
2140                      session, plugin_name, GNUNET_i2s (peer));
2141   GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2142                              sender_address_len, session, ats, ats_count);
2143
2144   was_connected = is_connected (n);
2145   change_state (n, S_CONNECTED);
2146
2147   GNUNET_ATS_address_in_use (GST_ats, peer, plugin_name, sender_address,
2148                              sender_address_len, session, GNUNET_YES);
2149
2150   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2151
2152   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2153     n->keepalive_task =
2154         GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2155                                       &neighbour_keepalive_task, n);
2156
2157   if (!was_connected)
2158   {
2159     neighbours_connected++;
2160     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2161                               GNUNET_NO);
2162
2163 #if DEBUG_TRANSPORT
2164     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2165                 "Notify about connect of `%4s' using plugin `%s' address '%s' session %X LINE %u\n",
2166                 GNUNET_i2s (&n->id), n->plugin_name,
2167                 (n->addrlen ==
2168                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2169                                                      n->addrlen), n->session,
2170                 __LINE__);
2171 #endif
2172     connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2173   }
2174 #if DEBUG_TRANSPORT
2175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2176               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2177               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2178 #endif
2179   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2180   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2181   q_msg.quota = n->bandwidth_out;
2182   q_msg.peer = (*peer);
2183   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2184 }
2185
2186 struct BlackListCheckContext
2187 {
2188   struct GNUNET_ATS_Information *ats;
2189
2190   uint32_t ats_count;
2191
2192   struct Session *session;
2193
2194   char *sender_address;
2195
2196   uint16_t sender_address_len;
2197
2198   char *plugin_name;
2199
2200   struct GNUNET_TIME_Absolute ts;
2201 };
2202
2203
2204 static void
2205 handle_connect_blacklist_cont (void *cls,
2206                                const struct GNUNET_PeerIdentity *peer,
2207                                int result)
2208 {
2209   struct NeighbourMapEntry *n;
2210   struct BlackListCheckContext *bcc = cls;
2211
2212 #if DEBUG_TRANSPORT
2213   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2214               "Blacklist check due to CONNECT message: `%s'\n",
2215               GNUNET_i2s (peer),
2216               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2217 #endif
2218
2219   /* not allowed */
2220   if (GNUNET_OK != result)
2221   {
2222     GNUNET_free (bcc);
2223     return;
2224   }
2225
2226   n = lookup_neighbour (peer);
2227   if (NULL == n)
2228     n = setup_neighbour (peer);
2229
2230   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2231   {
2232     if (NULL != bcc->session)
2233       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2234                        "transport-ats",
2235                        "Giving ATS session %p of plugin %s address `%s' for peer %s\n",
2236                        bcc->session, bcc->plugin_name,
2237                        GST_plugins_a2s (bcc->plugin_name, bcc->sender_address,
2238                                         bcc->sender_address_len),
2239                        GNUNET_i2s (peer));
2240     GNUNET_ATS_address_update (GST_ats, peer, bcc->plugin_name,
2241                                bcc->sender_address, bcc->sender_address_len,
2242                                bcc->session, bcc->ats, bcc->ats_count);
2243     n->connect_ts = bcc->ts;
2244   }
2245
2246   GNUNET_free (bcc);
2247
2248   if (n->state != S_CONNECT_RECV)
2249     change_state (n, S_CONNECT_RECV);
2250
2251   /* Ask ATS for an address to connect via that address */
2252   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2253     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2254   n->ats_suggest =
2255       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2256                                     n);
2257   GNUNET_ATS_suggest_address (GST_ats, peer);
2258 }
2259
2260 /**
2261  * We received a 'SESSION_CONNECT' message from the other peer.
2262  * Consider switching to it.
2263  *
2264  * @param message possibly a 'struct SessionConnectMessage' (check format)
2265  * @param peer identity of the peer to switch the address for
2266  * @param plugin_name name of transport that delivered the PONG
2267  * @param address address of the other peer, NULL if other peer
2268  *                       connected to us
2269  * @param address_len number of bytes in address
2270  * @param session session to use (or NULL)
2271  * @param ats performance data
2272  * @param ats_count number of entries in ats (excluding 0-termination)
2273   */
2274 void
2275 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2276                                const struct GNUNET_PeerIdentity *peer,
2277                                const char *plugin_name,
2278                                const char *sender_address,
2279                                uint16_t sender_address_len,
2280                                struct Session *session,
2281                                const struct GNUNET_ATS_Information *ats,
2282                                uint32_t ats_count)
2283 {
2284   const struct SessionConnectMessage *scm;
2285   struct NeighbourMapEntry *n;
2286   struct BlackListCheckContext *bcc = NULL;
2287
2288 #if DEBUG_TRANSPORT
2289   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2290               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2291 #endif
2292
2293   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2294   {
2295     GNUNET_break_op (0);
2296     return;
2297   }
2298
2299   scm = (const struct SessionConnectMessage *) message;
2300   GNUNET_break_op (ntohl (scm->reserved) == 0);
2301
2302   n = lookup_neighbour (peer);
2303   if (n != NULL)
2304   {
2305     /* connected peer switches addresses */
2306     if (is_connected (n))
2307     {
2308       GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2309                                  sender_address_len, session, ats, ats_count);
2310       return;
2311     }
2312   }
2313
2314   /* we are not connected to this peer */
2315   /* do blacklist check */
2316   bcc =
2317       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2318                      sizeof (struct GNUNET_ATS_Information) * ats_count +
2319                      sender_address_len + strlen (plugin_name) + 1);
2320
2321   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2322
2323   bcc->ats_count = ats_count;
2324   bcc->sender_address_len = sender_address_len;
2325   bcc->session = session;
2326
2327   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2328   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2329
2330   bcc->sender_address = (char *) &bcc->ats[ats_count];
2331   memcpy (bcc->sender_address, sender_address, sender_address_len);
2332
2333   bcc->plugin_name = &bcc->sender_address[sender_address_len];
2334   strcpy (bcc->plugin_name, plugin_name);
2335
2336   GST_blacklist_test_allowed (peer, plugin_name, handle_connect_blacklist_cont,
2337                               bcc);
2338 }
2339
2340
2341 /* end of file gnunet-service-transport_neighbours.c */