(no commit message)
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours?
52  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
53  * send 3 keepalives in each interval, so 3 messages would need to be
54  * lost in a row for a disconnect).
55  */
56 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
57
58
59 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
60
61
62 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
63
64
65 /**
66  * Entry in neighbours.
67  */
68 struct NeighbourMapEntry;
69
70 /**
71  * Message a peer sends to another to indicate its
72  * preference for communicating via a particular
73  * session (and the desire to establish a real
74  * connection).
75  */
76 struct SessionConnectMessage
77 {
78   /**
79    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
80    */
81   struct GNUNET_MessageHeader header;
82
83   /**
84    * Always zero.
85    */
86   uint32_t reserved GNUNET_PACKED;
87
88   /**
89    * Absolute time at the sender.  Only the most recent connect
90    * message implies which session is preferred by the sender.
91    */
92   struct GNUNET_TIME_AbsoluteNBO timestamp;
93
94 };
95
96
97 struct SessionDisconnectMessage
98 {
99   /**
100    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Always zero.
106    */
107   uint32_t reserved GNUNET_PACKED;
108
109   /**
110    * Purpose of the signature.  Extends over the timestamp.
111    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
112    */
113   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
114
115   /**
116    * Absolute time at the sender.  Only the most recent connect
117    * message implies which session is preferred by the sender.
118    */
119   struct GNUNET_TIME_AbsoluteNBO timestamp;
120
121   /**
122    * Public key of the sender.
123    */
124   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
125
126   /**
127    * Signature of the peer that sends us the disconnect.  Only
128    * valid if the timestamp is AFTER the timestamp from the
129    * corresponding 'CONNECT' message.
130    */
131   struct GNUNET_CRYPTO_RsaSignature signature;
132
133 };
134
135
136 /**
137  * For each neighbour we keep a list of messages
138  * that we still want to transmit to the neighbour.
139  */
140 struct MessageQueue
141 {
142
143   /**
144    * This is a doubly linked list.
145    */
146   struct MessageQueue *next;
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *prev;
152
153   /**
154    * Once this message is actively being transmitted, which
155    * neighbour is it associated with?
156    */
157   struct NeighbourMapEntry *n;
158
159   /**
160    * Function to call once we're done.
161    */
162   GST_NeighbourSendContinuation cont;
163
164   /**
165    * Closure for 'cont'
166    */
167   void *cont_cls;
168
169   /**
170    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
171    * stuck together in memory.  Allocated at the end of this struct.
172    */
173   const char *message_buf;
174
175   /**
176    * Size of the message buf
177    */
178   size_t message_buf_size;
179
180   /**
181    * At what time should we fail?
182    */
183   struct GNUNET_TIME_Absolute timeout;
184
185 };
186
187 enum State
188 {
189   /* fresh peer or completely disconnected */
190   S_NOT_CONNECTED = 0,
191   /* sent CONNECT message to other peer, waiting for CONNECT_ACK */
192   S_CONNECT_SENT = 1,
193   /* received CONNECT message to other peer, sending CONNECT_ACK */
194   S_CONNECT_RECV = 4,
195   /* sent CONNECT_ACK message to other peer, wait for ACK or payload */
196   S_CONNECT_RECV_ACK_SENT = 8,
197   /* received ACK or payload */
198   S_CONNECTED = 16,
199   /* Disconnect in progress */
200   S_DISCONNECT = 32
201 };
202
203 /**
204  * Entry in neighbours.
205  */
206 struct NeighbourMapEntry
207 {
208
209   /**
210    * Head of list of messages we would like to send to this peer;
211    * must contain at most one message per client.
212    */
213   struct MessageQueue *messages_head;
214
215   /**
216    * Tail of list of messages we would like to send to this peer; must
217    * contain at most one message per client.
218    */
219   struct MessageQueue *messages_tail;
220
221   /**
222    * Performance data for the peer.
223    */
224   //struct GNUNET_ATS_Information *ats;
225
226   /**
227    * Are we currently trying to send a message? If so, which one?
228    */
229   struct MessageQueue *is_active;
230
231   /**
232    * Active session for communicating with the peer.
233    */
234   struct Session *session;
235
236   /**
237    * Name of the plugin we currently use.
238    */
239   char *plugin_name;
240
241   /**
242    * Address used for communicating with the peer, NULL for inbound connections.
243    */
244   void *addr;
245
246   /**
247    * Number of bytes in 'addr'.
248    */
249   size_t addrlen;
250
251   /**
252    * Identity of this neighbour.
253    */
254   struct GNUNET_PeerIdentity id;
255
256   /**
257    * ID of task scheduled to run when this peer is about to
258    * time out (will free resources associated with the peer).
259    */
260   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
261
262   /**
263    * ID of task scheduled to send keepalives.
264    */
265   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
266
267   /**
268    * ID of task scheduled to run when we should try transmitting
269    * the head of the message queue.
270    */
271   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
272
273   /**
274    * Tracker for inbound bandwidth.
275    */
276   struct GNUNET_BANDWIDTH_Tracker in_tracker;
277
278   /**
279    * Inbound bandwidth from ATS, activated when connection is up
280    */
281   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
282
283   /**
284    * Inbound bandwidth from ATS, activated when connection is up
285    */
286   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
287
288   /**
289    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
290    */
291   struct GNUNET_TIME_Absolute connect_ts;
292
293   /**
294    * Timeout for ATS
295    * We asked ATS for a new address for this peer
296    */
297   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
298
299   /**
300    * Task the resets the peer state after due to an pending
301    * unsuccessful connection setup
302    */
303   GNUNET_SCHEDULER_TaskIdentifier state_reset;
304
305   /**
306    * How often has the other peer (recently) violated the inbound
307    * traffic limit?  Incremented by 10 per violation, decremented by 1
308    * per non-violation (for each time interval).
309    */
310   unsigned int quota_violation_count;
311
312
313   /**
314    * The current state of the peer
315    * Element of enum State
316    */
317   int state;
318
319 };
320
321
322 /**
323  * All known neighbours and their HELLOs.
324  */
325 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
326
327 /**
328  * Closure for connect_notify_cb and disconnect_notify_cb
329  */
330 static void *callback_cls;
331
332 /**
333  * Function to call when we connected to a neighbour.
334  */
335 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
336
337 /**
338  * Function to call when we disconnected from a neighbour.
339  */
340 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
341
342 /**
343  * counter for connected neighbours
344  */
345 static int neighbours_connected;
346
347 /**
348  * Lookup a neighbour entry in the neighbours hash map.
349  *
350  * @param pid identity of the peer to look up
351  * @return the entry, NULL if there is no existing record
352  */
353 static struct NeighbourMapEntry *
354 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
355 {
356   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
357 }
358
359 #define change_state(n, state, ...) change (n, state, __LINE__)
360
361 static int
362 is_connecting (struct NeighbourMapEntry *n)
363 {
364   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
365     return GNUNET_YES;
366   return GNUNET_NO;
367 }
368
369 static int
370 is_connected (struct NeighbourMapEntry *n)
371 {
372   if (n->state == S_CONNECTED)
373     return GNUNET_YES;
374   return GNUNET_NO;
375 }
376
377 static int
378 is_disconnecting (struct NeighbourMapEntry *n)
379 {
380   if (n->state == S_DISCONNECT)
381     return GNUNET_YES;
382   return GNUNET_NO;
383 }
384
385 static const char *
386 print_state (int state)
387 {
388   switch (state)
389   {
390   case S_CONNECTED:
391     return "S_CONNECTED";
392     break;
393   case S_CONNECT_RECV:
394     return "S_CONNECT_RECV";
395     break;
396   case S_CONNECT_RECV_ACK_SENT:
397     return "S_CONNECT_RECV_ACK_SENT";
398     break;
399   case S_CONNECT_SENT:
400     return "S_CONNECT_SENT";
401     break;
402   case S_DISCONNECT:
403     return "S_DISCONNECT";
404     break;
405   case S_NOT_CONNECTED:
406     return "S_NOT_CONNECTED";
407     break;
408   default:
409     GNUNET_break (0);
410     break;
411   }
412   return NULL;
413 }
414
415 static int
416 change (struct NeighbourMapEntry *n, int state, int line);
417
418 static void
419 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
420
421 static void
422 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
423 {
424   struct NeighbourMapEntry *n = cls;
425
426   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
427
428 #if DEBUG_TRANSPORT
429 #endif
430   /* This jut a temporary debug message to check if a the value
431    * SETUP_CONNECTION_TIMEOUT was choosen to small for slow machines
432    */
433   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
434               "Information for developers: Connection to peer `%s' %s failed in state `%s', resetting connection attempt \n",
435               GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name, n->addr,
436                                                     n->addrlen),
437               print_state (n->state));
438
439   GNUNET_STATISTICS_update (GST_stats,
440                             gettext_noop
441                             ("# failed connection attempts due to timeout"), 1,
442                             GNUNET_NO);
443
444   /* resetting state */
445   n->state = S_NOT_CONNECTED;
446
447   /* destroying address */
448   GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
449                                 n->addrlen, NULL);
450
451   /* request new address */
452   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
453     GNUNET_SCHEDULER_cancel (n->ats_suggest);
454   n->ats_suggest =
455       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
456                                     n);
457   GNUNET_ATS_suggest_address (GST_ats, &n->id);
458 }
459
460 static int
461 change (struct NeighbourMapEntry *n, int state, int line)
462 {
463   char *old = strdup (print_state (n->state));
464   char *new = strdup (print_state (state));
465
466   /* allowed transitions */
467   int allowed = GNUNET_NO;
468
469   switch (n->state)
470   {
471   case S_NOT_CONNECTED:
472     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
473         (state == S_DISCONNECT))
474     {
475       allowed = GNUNET_YES;
476
477       /* Schedule reset task */
478       if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT))
479       {
480         GNUNET_assert (n->state_reset == GNUNET_SCHEDULER_NO_TASK);
481         n->state_reset =
482             GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
483                                           n);
484       }
485       break;
486     }
487     break;
488   case S_CONNECT_RECV:
489     if ((state == S_NOT_CONNECTED) || (state == S_DISCONNECT) ||
490         (state == S_CONNECTED) ||
491         /* FIXME SENT -> RECV ISSUE! */ (state == S_CONNECT_SENT))
492     {
493       if ((state == S_CONNECTED) || (state == S_DISCONNECT) ||
494           (state == S_NOT_CONNECTED))
495       {
496 #if DEBUG_TRANSPORT
497         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498                     "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
499                     GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name,
500                                                           n->addr, n->addrlen),
501                     print_state (n->state), print_state (state));
502 #endif
503         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
504         GNUNET_SCHEDULER_cancel (n->state_reset);
505         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
506       }
507
508       allowed = GNUNET_YES;
509       break;
510     }
511     break;
512   case S_CONNECT_SENT:
513     if ((state == S_NOT_CONNECTED) || (state == S_CONNECTED) ||
514         (state == S_DISCONNECT) ||
515         /* FIXME SENT -> RECV ISSUE! */ (state == S_CONNECT_RECV))
516     {
517       if ((state == S_CONNECTED) || (state == S_DISCONNECT) ||
518           (state == S_NOT_CONNECTED))
519       {
520 #if DEBUG_TRANSPORT
521         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
522                     "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
523                     GNUNET_i2s (&n->id), GST_plugins_a2s (n->plugin_name,
524                                                           n->addr, n->addrlen),
525                     print_state (n->state), print_state (state));
526 #endif
527         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
528         GNUNET_SCHEDULER_cancel (n->state_reset);
529         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
530       }
531
532       allowed = GNUNET_YES;
533       break;
534     }
535     break;
536   case S_CONNECTED:
537     if (state == S_DISCONNECT)
538     {
539       allowed = GNUNET_YES;
540       break;
541     }
542     break;
543   case S_DISCONNECT:
544     /*
545      * if (state == S_NOT_CONNECTED)
546      * {
547      * allowed = GNUNET_YES;
548      * break;
549      * } */
550     break;
551   default:
552     GNUNET_break (0);
553     break;
554
555   }
556
557   if (allowed == GNUNET_NO)
558   {
559     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
560                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
561                 new, line);
562     GNUNET_break (0);
563     GNUNET_free (old);
564     GNUNET_free (new);
565     return GNUNET_SYSERR;
566   }
567
568   n->state = state;
569 #if DEBUG_TRANSPORT
570   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571               "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
572               GNUNET_i2s (&n->id), n, old, new, line);
573 #endif
574   GNUNET_free (old);
575   GNUNET_free (new);
576   return GNUNET_OK;
577 }
578
579 static ssize_t
580 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
581                   size_t msgbuf_size, uint32_t priority,
582                   struct GNUNET_TIME_Relative timeout, struct Session *session,
583                   const char *plugin_name, const void *addr, size_t addrlen,
584                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
585                   void *cont_cls)
586 {
587   struct GNUNET_TRANSPORT_PluginFunctions *papi;
588   size_t ret = GNUNET_SYSERR;
589
590   /* FIXME : ats returns an address with all values 0 */
591   if (plugin_name == NULL)
592   {
593     if (cont != NULL)
594       cont (cont_cls, target, GNUNET_SYSERR);
595     return GNUNET_SYSERR;
596   }
597
598   if ((session == NULL) && (addr == NULL) && (addrlen == 0))
599   {
600     if (cont != NULL)
601       cont (cont_cls, target, GNUNET_SYSERR);
602     return GNUNET_SYSERR;
603   }
604
605   papi = GST_plugins_find (plugin_name);
606   if (papi == NULL)
607   {
608     if (cont != NULL)
609       cont (cont_cls, target, GNUNET_SYSERR);
610     return GNUNET_SYSERR;
611   }
612
613   ret =
614       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
615                   addr, addrlen, GNUNET_YES, cont, cont_cls);
616
617   if (ret == -1)
618   {
619     if (cont != NULL)
620       cont (cont_cls, target, GNUNET_SYSERR);
621   }
622   return ret;
623 }
624
625 /**
626  * Task invoked to start a transmission to another peer.
627  *
628  * @param cls the 'struct NeighbourMapEntry'
629  * @param tc scheduler context
630  */
631 static void
632 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
633
634
635 /**
636  * We're done with our transmission attempt, continue processing.
637  *
638  * @param cls the 'struct MessageQueue' of the message
639  * @param receiver intended receiver
640  * @param success whether it worked or not
641  */
642 static void
643 transmit_send_continuation (void *cls,
644                             const struct GNUNET_PeerIdentity *receiver,
645                             int success)
646 {
647   struct MessageQueue *mq;
648   struct NeighbourMapEntry *n;
649
650   mq = cls;
651   n = mq->n;
652   if (NULL != n)
653   {
654     GNUNET_assert (n->is_active == mq);
655     n->is_active = NULL;
656     if (success == GNUNET_YES)
657     {
658       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
659       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
660     }
661   }
662 #if DEBUG_TRANSPORT
663   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
664               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
665               (success == GNUNET_OK) ? "successful" : "FAILED");
666 #endif
667   if (NULL != mq->cont)
668     mq->cont (mq->cont_cls, success);
669   GNUNET_free (mq);
670 }
671
672
673 /**
674  * Check the ready list for the given neighbour and if a plugin is
675  * ready for transmission (and if we have a message), do so!
676  *
677  * @param n target peer for which to transmit
678  */
679 static void
680 try_transmission_to_peer (struct NeighbourMapEntry *n)
681 {
682   struct MessageQueue *mq;
683   struct GNUNET_TIME_Relative timeout;
684   ssize_t ret;
685
686   if (n->is_active != NULL)
687   {
688     GNUNET_break (0);
689     return;                     /* transmission already pending */
690   }
691   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
692   {
693     GNUNET_break (0);
694     return;                     /* currently waiting for bandwidth */
695   }
696   while (NULL != (mq = n->messages_head))
697   {
698     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
699     if (timeout.rel_value > 0)
700       break;
701     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
702     n->is_active = mq;
703     mq->n = n;
704     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
705   }
706   if (NULL == mq)
707     return;                     /* no more messages */
708
709   if (GST_plugins_find (n->plugin_name) == NULL)
710   {
711     GNUNET_break (0);
712     return;
713   }
714   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
715   n->is_active = mq;
716   mq->n = n;
717
718   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
719   {
720     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
721                 GNUNET_i2s (&n->id));
722     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
723     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
724     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
725     return;
726   }
727
728   ret =
729       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
730                         timeout, n->session, n->plugin_name, n->addr,
731                         n->addrlen, GNUNET_YES, &transmit_send_continuation,
732                         mq);
733   if (ret == -1)
734   {
735     /* failure, but 'send' would not call continuation in this case,
736      * so we need to do it here! */
737     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
738   }
739
740 }
741
742
743 /**
744  * Task invoked to start a transmission to another peer.
745  *
746  * @param cls the 'struct NeighbourMapEntry'
747  * @param tc scheduler context
748  */
749 static void
750 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
751 {
752   struct NeighbourMapEntry *n = cls;
753
754   GNUNET_assert (NULL != lookup_neighbour (&n->id));
755   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
756   try_transmission_to_peer (n);
757 }
758
759
760 /**
761  * Initialize the neighbours subsystem.
762  *
763  * @param cls closure for callbacks
764  * @param connect_cb function to call if we connect to a peer
765  * @param disconnect_cb function to call if we disconnect from a peer
766  */
767 void
768 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
769                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
770 {
771   callback_cls = cls;
772   connect_notify_cb = connect_cb;
773   disconnect_notify_cb = disconnect_cb;
774   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
775 }
776
777
778 static void
779 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
780                       int result)
781 {
782 #if DEBUG_TRANSPORT
783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
784               "Sending DISCONNECT message to peer `%4s': %i\n",
785               GNUNET_i2s (target), result);
786 #endif
787 }
788
789
790 static int
791 send_disconnect (const struct GNUNET_PeerIdentity *target,
792                  const char *plugin_name, const char *sender_address,
793                  uint16_t sender_address_len, struct Session *session)
794 {
795   size_t ret;
796   struct SessionDisconnectMessage disconnect_msg;
797
798 #if DEBUG_TRANSPORT
799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800               "Sending DISCONNECT message to peer `%4s'\n",
801               GNUNET_i2s (target));
802 #endif
803
804   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
805   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
806   disconnect_msg.reserved = htonl (0);
807   disconnect_msg.purpose.size =
808       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
809              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
810              sizeof (struct GNUNET_TIME_AbsoluteNBO));
811   disconnect_msg.purpose.purpose =
812       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
813   disconnect_msg.timestamp =
814       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
815   disconnect_msg.public_key = GST_my_public_key;
816   GNUNET_assert (GNUNET_OK ==
817                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
818                                          &disconnect_msg.purpose,
819                                          &disconnect_msg.signature));
820
821   ret =
822       send_with_plugin (target, (const char *) &disconnect_msg,
823                         sizeof (disconnect_msg), UINT32_MAX,
824                         GNUNET_TIME_UNIT_FOREVER_REL, session, plugin_name,
825                         sender_address, sender_address_len, GNUNET_YES,
826                         &send_disconnect_cont, NULL);
827
828   if (ret == GNUNET_SYSERR)
829     return GNUNET_SYSERR;
830
831   GNUNET_STATISTICS_update (GST_stats,
832                             gettext_noop
833                             ("# peers disconnected due to external request"), 1,
834                             GNUNET_NO);
835   return GNUNET_OK;
836 }
837
838 /**
839  * Disconnect from the given neighbour, clean up the record.
840  *
841  * @param n neighbour to disconnect from
842  */
843 static void
844 disconnect_neighbour (struct NeighbourMapEntry *n)
845 {
846   struct MessageQueue *mq;
847   int was_connected = is_connected (n);
848
849   /* send DISCONNECT MESSAGE */
850   if (is_connected (n) || is_connecting (n))
851   {
852     if (GNUNET_OK ==
853         send_disconnect (&n->id, n->plugin_name, n->addr, n->addrlen,
854                          n->session))
855       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
856                   GNUNET_i2s (&n->id));
857     else
858       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859                   "Could not send DISCONNECT_MSG to `%s'\n",
860                   GNUNET_i2s (&n->id));
861   }
862
863   if (is_connected(n))
864   {
865      GNUNET_ATS_address_in_use (GST_ats, &n->id, n->plugin_name,
866          n->addr, n->addrlen, n->session, GNUNET_NO);
867   }
868
869
870   if (is_disconnecting (n))
871     return;
872   change_state (n, S_DISCONNECT);
873
874   if (n->plugin_name != NULL)
875   {
876     struct GNUNET_TRANSPORT_PluginFunctions *papi;
877     papi = GST_plugins_find (n->plugin_name);
878     if (papi != NULL)
879       papi->disconnect (papi->cls, &n->id);
880   }
881
882   while (NULL != (mq = n->messages_head))
883   {
884     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
885     if (NULL != mq->cont)
886       mq->cont (mq->cont_cls, GNUNET_SYSERR);
887     GNUNET_free (mq);
888   }
889   if (NULL != n->is_active)
890   {
891     n->is_active->n = NULL;
892     n->is_active = NULL;
893   }
894   if (was_connected)
895   {
896     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
897     GNUNET_SCHEDULER_cancel (n->keepalive_task);
898     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
899     GNUNET_assert (neighbours_connected > 0);
900     neighbours_connected--;
901     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
902                               GNUNET_NO);
903     disconnect_notify_cb (callback_cls, &n->id);
904   }
905   GNUNET_assert (GNUNET_YES ==
906                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
907                                                        &n->id.hashPubKey, n));
908   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
909   {
910     GNUNET_SCHEDULER_cancel (n->ats_suggest);
911     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
912   }
913   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
914   {
915     GNUNET_SCHEDULER_cancel (n->timeout_task);
916     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
917   }
918   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
919   {
920     GNUNET_SCHEDULER_cancel (n->transmission_task);
921     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
922   }
923   if (NULL != n->plugin_name)
924   {
925     GNUNET_free (n->plugin_name);
926     n->plugin_name = NULL;
927   }
928   if (NULL != n->addr)
929   {
930     GNUNET_free (n->addr);
931     n->addr = NULL;
932     n->addrlen = 0;
933   }
934   n->session = NULL;
935   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
936               GNUNET_i2s (&n->id), n);
937   GNUNET_free (n);
938 }
939
940
941 /**
942  * Peer has been idle for too long. Disconnect.
943  *
944  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
945  * @param tc scheduler context
946  */
947 static void
948 neighbour_timeout_task (void *cls,
949                         const struct GNUNET_SCHEDULER_TaskContext *tc)
950 {
951   struct NeighbourMapEntry *n = cls;
952
953   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
954
955   GNUNET_STATISTICS_update (GST_stats,
956                             gettext_noop
957                             ("# peers disconnected due to timeout"), 1,
958                             GNUNET_NO);
959   disconnect_neighbour (n);
960 }
961
962
963 /**
964  * Send another keepalive message.
965  *
966  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
967  * @param tc scheduler context
968  */
969 static void
970 neighbour_keepalive_task (void *cls,
971                           const struct GNUNET_SCHEDULER_TaskContext *tc)
972 {
973   struct NeighbourMapEntry *n = cls;
974   struct GNUNET_MessageHeader m;
975
976   n->keepalive_task =
977       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
978                                     &neighbour_keepalive_task, n);
979   GNUNET_assert (is_connected (n));
980   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
981                             GNUNET_NO);
982   m.size = htons (sizeof (struct GNUNET_MessageHeader));
983   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
984
985   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
986                     UINT32_MAX /* priority */ ,
987                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->plugin_name,
988                     n->addr, n->addrlen, GNUNET_YES, NULL, NULL);
989 }
990
991
992 /**
993  * Disconnect from the given neighbour.
994  *
995  * @param cls unused
996  * @param key hash of neighbour's public key (not used)
997  * @param value the 'struct NeighbourMapEntry' of the neighbour
998  */
999 static int
1000 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1001 {
1002   struct NeighbourMapEntry *n = value;
1003
1004 #if DEBUG_TRANSPORT
1005   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1006               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1007 #endif
1008   if (is_connected (n))
1009     GNUNET_STATISTICS_update (GST_stats,
1010                               gettext_noop
1011                               ("# peers disconnected due to global disconnect"),
1012                               1, GNUNET_NO);
1013   disconnect_neighbour (n);
1014   return GNUNET_OK;
1015 }
1016
1017
1018 static void
1019 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1020 {
1021   struct NeighbourMapEntry *n = cls;
1022
1023   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1024
1025   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1026               " ATS did not suggested address to connect to peer `%s'\n",
1027               GNUNET_i2s (&n->id));
1028
1029   disconnect_neighbour (n);
1030 }
1031
1032 /**
1033  * Cleanup the neighbours subsystem.
1034  */
1035 void
1036 GST_neighbours_stop ()
1037 {
1038   // This can happen during shutdown
1039   if (neighbours == NULL)
1040   {
1041     return;
1042   }
1043
1044   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1045                                          NULL);
1046   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1047   GNUNET_assert (neighbours_connected == 0);
1048   neighbours = NULL;
1049   callback_cls = NULL;
1050   connect_notify_cb = NULL;
1051   disconnect_notify_cb = NULL;
1052 }
1053
1054
1055 /**
1056  * We tried to send a SESSION_CONNECT message to another peer.  If this
1057  * succeeded, we change the state.  If it failed, we should tell
1058  * ATS to not use this address anymore (until it is re-validated).
1059  *
1060  * @param cls the 'struct NeighbourMapEntry'
1061  * @param success GNUNET_OK on success
1062  */
1063 static void
1064 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1065                            int success)
1066 {
1067   struct NeighbourMapEntry *n = cls;
1068
1069   GNUNET_assert (n != NULL);
1070   GNUNET_assert (!is_connected (n));
1071
1072   if (is_disconnecting (n))
1073     return;                     /* neighbour is going away */
1074
1075   if (GNUNET_YES != success)
1076   {
1077 #if DEBUG_TRANSPORT
1078     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1079                 "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1080                 GNUNET_i2s (&n->id), n->plugin_name,
1081                 (n->addrlen ==
1082                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
1083                                                      n->addrlen), n->session);
1084 #endif
1085
1086     GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1087                                   n->addrlen, NULL);
1088
1089     change_state (n, S_NOT_CONNECTED);
1090
1091     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1092       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1093     n->ats_suggest =
1094         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1095                                       n);
1096     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1097     return;
1098   }
1099
1100
1101 }
1102
1103
1104 /**
1105  * We tried to switch addresses with an peer already connected. If it failed,
1106  * we should tell ATS to not use this address anymore (until it is re-validated).
1107  *
1108  * @param cls the 'struct NeighbourMapEntry'
1109  * @param success GNUNET_OK on success
1110  */
1111 static void
1112 send_switch_address_continuation (void *cls,
1113                                   const struct GNUNET_PeerIdentity *target,
1114                                   int success)
1115 {
1116   struct NeighbourMapEntry *n = cls;
1117
1118   GNUNET_assert (n != NULL);
1119   if (is_disconnecting (n))
1120     return;                     /* neighbour is going away */
1121
1122   GNUNET_assert (n->state == S_CONNECTED);
1123   if (GNUNET_YES != success)
1124   {
1125 #if DEBUG_TRANSPORT
1126     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1127                 "Failed to switch connected peer `%s' to plugin `%s' address '%s' session %X, asking ATS for new address \n",
1128                 GNUNET_i2s (&n->id), n->plugin_name,
1129                 (n->addrlen ==
1130                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
1131                                                      n->addrlen), n->session);
1132 #endif
1133
1134     GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1135                                   n->addrlen, NULL);
1136
1137     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1138       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1139     n->ats_suggest =
1140         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1141                                       n);
1142     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1143     return;
1144   }
1145 }
1146
1147 /**
1148  * We tried to send a SESSION_CONNECT message to another peer.  If this
1149  * succeeded, we change the state.  If it failed, we should tell
1150  * ATS to not use this address anymore (until it is re-validated).
1151  *
1152  * @param cls the 'struct NeighbourMapEntry'
1153  * @param success GNUNET_OK on success
1154  */
1155 static void
1156 send_connect_ack_continuation (void *cls,
1157                                const struct GNUNET_PeerIdentity *target,
1158                                int success)
1159 {
1160   struct NeighbourMapEntry *n = cls;
1161
1162   GNUNET_assert (n != NULL);
1163
1164   if (GNUNET_YES == success)
1165     return;                     /* sending successful */
1166
1167   /* sending failed, ask for next address  */
1168 #if DEBUG_TRANSPORT
1169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170               "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1171               GNUNET_i2s (&n->id), n->plugin_name,
1172               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name,
1173                                                                  n->addr,
1174                                                                  n->addrlen),
1175               n->session);
1176 #endif
1177   change_state (n, S_NOT_CONNECTED);
1178
1179   GNUNET_ATS_address_destroyed (GST_ats, &n->id, n->plugin_name, n->addr,
1180                                 n->addrlen, NULL);
1181
1182   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1183     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1184   n->ats_suggest =
1185       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1186                                     n);
1187   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1188 }
1189
1190 /**
1191  * For an existing neighbour record, set the active connection to
1192  * the given address.
1193  *
1194  * @param peer identity of the peer to switch the address for
1195  * @param plugin_name name of transport that delivered the PONG
1196  * @param address address of the other peer, NULL if other peer
1197  *                       connected to us
1198  * @param address_len number of bytes in address
1199  * @param session session to use (or NULL)
1200  * @param ats performance data
1201  * @param ats_count number of entries in ats
1202  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1203  *         connection is not up (yet)
1204  */
1205 int
1206 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1207                                        const char *plugin_name,
1208                                        const void *address, size_t address_len,
1209                                        struct Session *session,
1210                                        const struct GNUNET_ATS_Information *ats,
1211                                        uint32_t ats_count,
1212                                        struct GNUNET_BANDWIDTH_Value32NBO
1213                                        bandwidth_in,
1214                                        struct GNUNET_BANDWIDTH_Value32NBO
1215                                        bandwidth_out)
1216 {
1217   struct NeighbourMapEntry *n;
1218   struct SessionConnectMessage connect_msg;
1219   size_t msg_len;
1220   size_t ret;
1221   int checks_failed;
1222
1223   // This can happen during shutdown
1224   if (neighbours == NULL)
1225   {
1226     return GNUNET_NO;
1227   }
1228
1229   checks_failed = GNUNET_NO;
1230
1231   if (plugin_name == NULL)
1232   {
1233     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1234                 "ATS offered suggested us empty address: plugin NULL");
1235     GNUNET_break_op (0);
1236     checks_failed = GNUNET_YES;
1237   }
1238   if ((address == NULL) && (address_len == 0) && (session == NULL))
1239   {
1240     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1241                 "ATS offered suggested us empty address: address NULL & session NULL");
1242     GNUNET_break_op (0);
1243     checks_failed = GNUNET_YES;
1244   }
1245
1246   n = lookup_neighbour (peer);
1247   if (NULL == n)
1248     checks_failed = GNUNET_YES;
1249
1250   if (checks_failed == GNUNET_YES)
1251   {
1252     GNUNET_ATS_address_destroyed (GST_ats, peer, plugin_name, address,
1253                                   address_len, session);
1254     if (n != NULL)
1255       GNUNET_ATS_suggest_address (GST_ats, peer);
1256     return GNUNET_NO;
1257   }
1258
1259   /* checks successful and neighbour != NULL */
1260 #if DEBUG_TRANSPORT
1261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262               "ATS tells us to switch to plugin `%s' address '%s' session %X for %s peer `%s'\n",
1263               plugin_name,
1264               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
1265                                                                   address,
1266                                                                   address_len),
1267               session, (is_connected (n) ? "CONNECTED" : "NOT CONNECTED"),
1268               GNUNET_i2s (peer));
1269 #endif
1270
1271   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1272   {
1273     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1274     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1275   }
1276
1277   // do not switch addresses just update quotas
1278   if ((is_connected (n)) && (address_len == n->addrlen))
1279   {
1280     if ((0 == memcmp (address, n->addr, address_len)) &&
1281         (n->session == session))
1282     {
1283       struct QuotaSetMessage q_msg;
1284
1285 #if DEBUG_TRANSPORT
1286       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1287                   "Sending outbound quota of %u Bps and inbound quota of %u Bps for peer `%s' to all clients\n",
1288                   ntohl (n->bandwidth_out.value__),
1289                   ntohl (n->bandwidth_in.value__), GNUNET_i2s (peer));
1290 #endif
1291
1292       n->bandwidth_in = bandwidth_in;
1293       n->bandwidth_out = bandwidth_out;
1294       GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1295
1296       q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1297       q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1298       q_msg.quota = n->bandwidth_out;
1299       q_msg.peer = (*peer);
1300       GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1301       return GNUNET_NO;
1302     }
1303   }
1304
1305   GNUNET_free_non_null (n->addr);
1306   n->addr = GNUNET_malloc (address_len);
1307   memcpy (n->addr, address, address_len);
1308   n->bandwidth_in = bandwidth_in;
1309   n->bandwidth_out = bandwidth_out;
1310   n->addrlen = address_len;
1311   n->session = session;
1312   GNUNET_free_non_null (n->plugin_name);
1313   n->plugin_name = GNUNET_strdup (plugin_name);
1314   GNUNET_SCHEDULER_cancel (n->timeout_task);
1315   n->timeout_task =
1316       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1317                                     &neighbour_timeout_task, n);
1318
1319   if (n->state == S_DISCONNECT)
1320   {
1321     /* We are disconnecting, nothing to do here */
1322     return GNUNET_NO;
1323   }
1324   /* We are not connected/connecting and initiate a fresh connect */
1325   if (n->state == S_NOT_CONNECTED)
1326   {
1327     msg_len = sizeof (struct SessionConnectMessage);
1328     connect_msg.header.size = htons (msg_len);
1329     connect_msg.header.type =
1330         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1331     connect_msg.reserved = htonl (0);
1332     connect_msg.timestamp =
1333         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1334
1335     change_state (n, S_CONNECT_SENT);
1336
1337     ret =
1338         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1339                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1340                           plugin_name, address, address_len, GNUNET_YES,
1341                           &send_connect_continuation, n);
1342
1343
1344     return GNUNET_NO;
1345   }
1346   /* We received a CONNECT message and asked ATS for an address */
1347   else if (n->state == S_CONNECT_RECV)
1348   {
1349     msg_len = sizeof (struct SessionConnectMessage);
1350     connect_msg.header.size = htons (msg_len);
1351     connect_msg.header.type =
1352         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1353     connect_msg.reserved = htonl (0);
1354     connect_msg.timestamp =
1355         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1356
1357     ret =
1358         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1359                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1360                           plugin_name, address, address_len, GNUNET_YES,
1361                           &send_connect_ack_continuation, n);
1362     return GNUNET_NO;
1363   }
1364   /* connected peer is switching addresses */
1365   else if (n->state == S_CONNECTED)
1366   {
1367     msg_len = sizeof (struct SessionConnectMessage);
1368     connect_msg.header.size = htons (msg_len);
1369     connect_msg.header.type =
1370         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1371     connect_msg.reserved = htonl (0);
1372     connect_msg.timestamp =
1373         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1374
1375     ret =
1376         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1377                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1378                           plugin_name, address, address_len, GNUNET_YES,
1379                           &send_switch_address_continuation, n);
1380     if (ret == GNUNET_SYSERR)
1381     {
1382       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1383                   "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n",
1384                   GNUNET_i2s (peer), plugin_name,
1385                   (address_len ==
1386                    0) ? "<inbound>" : GST_plugins_a2s (plugin_name, address,
1387                                                        address_len), session);
1388     }
1389     return GNUNET_NO;
1390   }
1391   else if (n->state == S_CONNECT_SENT)
1392   {
1393     return GNUNET_NO;
1394   }
1395   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1396               "Invalid connection state to switch addresses %u \n", n->state);
1397   GNUNET_break_op (0);
1398   return GNUNET_NO;
1399 }
1400
1401
1402 /**
1403  * Create an entry in the neighbour map for the given peer
1404  *
1405  * @param peer peer to create an entry for
1406  * @return new neighbour map entry
1407  */
1408 static struct NeighbourMapEntry *
1409 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1410 {
1411   struct NeighbourMapEntry *n;
1412
1413 #if DEBUG_TRANSPORT
1414   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1415               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1416 #endif
1417   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1418   n->id = *peer;
1419   n->state = S_NOT_CONNECTED;
1420   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1421                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1422                                  MAX_BANDWIDTH_CARRY_S);
1423   n->timeout_task =
1424       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1425                                     &neighbour_timeout_task, n);
1426   GNUNET_assert (GNUNET_OK ==
1427                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1428                                                     &n->id.hashPubKey, n,
1429                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1430   return n;
1431 }
1432
1433
1434 /**
1435  * Try to create a connection to the given target (eventually).
1436  *
1437  * @param target peer to try to connect to
1438  */
1439 void
1440 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1441 {
1442   struct NeighbourMapEntry *n;
1443
1444   // This can happen during shutdown
1445   if (neighbours == NULL)
1446   {
1447     return;
1448   }
1449 #if DEBUG_TRANSPORT
1450   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1451               GNUNET_i2s (target));
1452 #endif
1453   if (0 ==
1454       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1455   {
1456     /* my own hello */
1457     return;
1458   }
1459   n = lookup_neighbour (target);
1460
1461   if (NULL != n)
1462   {
1463     if ((is_connected (n)) || (is_connecting (n)))
1464       return;                   /* already connecting or connected */
1465     if (is_disconnecting (n))
1466       change_state (n, S_NOT_CONNECTED);
1467   }
1468
1469
1470   if (n == NULL)
1471     n = setup_neighbour (target);
1472 #if DEBUG_TRANSPORT
1473   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1474               "Asking ATS for suggested address to connect to peer `%s'\n",
1475               GNUNET_i2s (&n->id));
1476 #endif
1477
1478   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1479 }
1480
1481 /**
1482  * Test if we're connected to the given peer.
1483  *
1484  * @param target peer to test
1485  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1486  */
1487 int
1488 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1489 {
1490   struct NeighbourMapEntry *n;
1491
1492   // This can happen during shutdown
1493   if (neighbours == NULL)
1494   {
1495     return GNUNET_NO;
1496   }
1497
1498   n = lookup_neighbour (target);
1499
1500   if ((NULL == n) || (!is_connected (n)))
1501     return GNUNET_NO;           /* not connected */
1502   return GNUNET_YES;
1503 }
1504
1505
1506 /**
1507  * A session was terminated. Take note.
1508  *
1509  * @param peer identity of the peer where the session died
1510  * @param session session that is gone
1511  */
1512 void
1513 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1514                                    struct Session *session)
1515 {
1516   struct NeighbourMapEntry *n;
1517
1518   // This can happen during shutdown
1519   if (neighbours == NULL)
1520   {
1521     return;
1522   }
1523
1524 #if DEBUG_TRANSPORT
1525   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1526               session, GNUNET_i2s (peer));
1527 #endif
1528
1529   n = lookup_neighbour (peer);
1530   if (NULL == n)
1531     return;
1532   if (session != n->session)
1533     return;                     /* doesn't affect us */
1534
1535   n->session = NULL;
1536   GNUNET_free (n->addr);
1537   n->addr = NULL;
1538   n->addrlen = 0;
1539
1540   /* not connected anymore anyway, shouldn't matter */
1541   if ((!is_connected (n)) && (!is_connecting (n)))
1542     return;
1543
1544   /* We are connected, so ask ATS to switch addresses */
1545   GNUNET_SCHEDULER_cancel (n->timeout_task);
1546   n->timeout_task =
1547       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1548                                     &neighbour_timeout_task, n);
1549   /* try QUICKLY to re-establish a connection, reduce timeout! */
1550   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1551     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1552   n->ats_suggest =
1553       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1554                                     n);
1555   GNUNET_ATS_suggest_address (GST_ats, peer);
1556 }
1557
1558
1559 /**
1560  * Transmit a message to the given target using the active connection.
1561  *
1562  * @param target destination
1563  * @param msg message to send
1564  * @param msg_size number of bytes in msg
1565  * @param timeout when to fail with timeout
1566  * @param cont function to call when done
1567  * @param cont_cls closure for 'cont'
1568  */
1569 void
1570 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1571                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1572                      GST_NeighbourSendContinuation cont, void *cont_cls)
1573 {
1574   struct NeighbourMapEntry *n;
1575   struct MessageQueue *mq;
1576
1577   // This can happen during shutdown
1578   if (neighbours == NULL)
1579   {
1580     return;
1581   }
1582
1583   n = lookup_neighbour (target);
1584   if ((n == NULL) || (!is_connected (n)))
1585   {
1586     GNUNET_STATISTICS_update (GST_stats,
1587                               gettext_noop
1588                               ("# messages not sent (no such peer or not connected)"),
1589                               1, GNUNET_NO);
1590 #if DEBUG_TRANSPORT
1591     if (n == NULL)
1592       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1593                   "Could not send message to peer `%s': unknown neighbour",
1594                   GNUNET_i2s (target));
1595     else if (!is_connected (n))
1596       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1597                   "Could not send message to peer `%s': not connected\n",
1598                   GNUNET_i2s (target));
1599 #endif
1600     if (NULL != cont)
1601       cont (cont_cls, GNUNET_SYSERR);
1602     return;
1603   }
1604
1605   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
1606   {
1607     GNUNET_STATISTICS_update (GST_stats,
1608                               gettext_noop
1609                               ("# messages not sent (no such peer or not connected)"),
1610                               1, GNUNET_NO);
1611 #if DEBUG_TRANSPORT
1612     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1613                 "Could not send message to peer `%s': no address available\n",
1614                 GNUNET_i2s (target));
1615 #endif
1616
1617     if (NULL != cont)
1618       cont (cont_cls, GNUNET_SYSERR);
1619     return;
1620   }
1621
1622   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1623   GNUNET_STATISTICS_update (GST_stats,
1624                             gettext_noop
1625                             ("# bytes in message queue for other peers"),
1626                             msg_size, GNUNET_NO);
1627   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1628   mq->cont = cont;
1629   mq->cont_cls = cont_cls;
1630   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1631   memcpy (&mq[1], msg, msg_size);
1632   mq->message_buf = (const char *) &mq[1];
1633   mq->message_buf_size = msg_size;
1634   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1635   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1636
1637   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1638       (NULL == n->is_active))
1639     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1640 }
1641
1642
1643 /**
1644  * We have received a message from the given sender.  How long should
1645  * we delay before receiving more?  (Also used to keep the peer marked
1646  * as live).
1647  *
1648  * @param sender sender of the message
1649  * @param size size of the message
1650  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1651  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1652  *                   GNUNET_SYSERR if the connection is not fully up yet
1653  * @return how long to wait before reading more from this sender
1654  */
1655 struct GNUNET_TIME_Relative
1656 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1657                                         *sender, ssize_t size, int *do_forward)
1658 {
1659   struct NeighbourMapEntry *n;
1660   struct GNUNET_TIME_Relative ret;
1661
1662   // This can happen during shutdown
1663   if (neighbours == NULL)
1664   {
1665     return GNUNET_TIME_UNIT_FOREVER_REL;
1666   }
1667
1668   n = lookup_neighbour (sender);
1669   if (n == NULL)
1670   {
1671     GST_neighbours_try_connect (sender);
1672     n = lookup_neighbour (sender);
1673     if (NULL == n)
1674     {
1675       GNUNET_STATISTICS_update (GST_stats,
1676                                 gettext_noop
1677                                 ("# messages discarded due to lack of neighbour record"),
1678                                 1, GNUNET_NO);
1679       *do_forward = GNUNET_NO;
1680       return GNUNET_TIME_UNIT_ZERO;
1681     }
1682   }
1683   if (!is_connected (n))
1684   {
1685     *do_forward = GNUNET_SYSERR;
1686     return GNUNET_TIME_UNIT_ZERO;
1687   }
1688   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1689   {
1690     n->quota_violation_count++;
1691 #if DEBUG_TRANSPORT
1692     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1693                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1694                 n->in_tracker.available_bytes_per_s__,
1695                 n->quota_violation_count);
1696 #endif
1697     /* Discount 32k per violation */
1698     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1699   }
1700   else
1701   {
1702     if (n->quota_violation_count > 0)
1703     {
1704       /* try to add 32k back */
1705       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1706       n->quota_violation_count--;
1707     }
1708   }
1709   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1710   {
1711     GNUNET_STATISTICS_update (GST_stats,
1712                               gettext_noop
1713                               ("# bandwidth quota violations by other peers"),
1714                               1, GNUNET_NO);
1715     *do_forward = GNUNET_NO;
1716     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1717   }
1718   *do_forward = GNUNET_YES;
1719   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1720   if (ret.rel_value > 0)
1721   {
1722 #if DEBUG_TRANSPORT
1723     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1724                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1725                 (unsigned long long) n->in_tracker.
1726                 consumption_since_last_update__,
1727                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1728                 (unsigned long long) ret.rel_value);
1729 #endif
1730     GNUNET_STATISTICS_update (GST_stats,
1731                               gettext_noop ("# ms throttling suggested"),
1732                               (int64_t) ret.rel_value, GNUNET_NO);
1733   }
1734   return ret;
1735 }
1736
1737
1738 /**
1739  * Keep the connection to the given neighbour alive longer,
1740  * we received a KEEPALIVE (or equivalent).
1741  *
1742  * @param neighbour neighbour to keep alive
1743  */
1744 void
1745 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1746 {
1747   struct NeighbourMapEntry *n;
1748
1749   // This can happen during shutdown
1750   if (neighbours == NULL)
1751   {
1752     return;
1753   }
1754
1755   n = lookup_neighbour (neighbour);
1756   if (NULL == n)
1757   {
1758     GNUNET_STATISTICS_update (GST_stats,
1759                               gettext_noop
1760                               ("# KEEPALIVE messages discarded (not connected)"),
1761                               1, GNUNET_NO);
1762     return;
1763   }
1764   GNUNET_SCHEDULER_cancel (n->timeout_task);
1765   n->timeout_task =
1766       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1767                                     &neighbour_timeout_task, n);
1768 }
1769
1770
1771 /**
1772  * Change the incoming quota for the given peer.
1773  *
1774  * @param neighbour identity of peer to change qutoa for
1775  * @param quota new quota
1776  */
1777 void
1778 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1779                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1780 {
1781   struct NeighbourMapEntry *n;
1782
1783   // This can happen during shutdown
1784   if (neighbours == NULL)
1785   {
1786     return;
1787   }
1788
1789   n = lookup_neighbour (neighbour);
1790   if (n == NULL)
1791   {
1792     GNUNET_STATISTICS_update (GST_stats,
1793                               gettext_noop
1794                               ("# SET QUOTA messages ignored (no such peer)"),
1795                               1, GNUNET_NO);
1796     return;
1797   }
1798   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1799   if (0 != ntohl (quota.value__))
1800     return;
1801 #if DEBUG_TRANSPORT
1802   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1803               GNUNET_i2s (&n->id), "SET_QUOTA");
1804 #endif
1805   if (is_connected (n))
1806     GNUNET_STATISTICS_update (GST_stats,
1807                               gettext_noop ("# disconnects due to quota of 0"),
1808                               1, GNUNET_NO);
1809   disconnect_neighbour (n);
1810 }
1811
1812
1813 /**
1814  * Closure for the neighbours_iterate function.
1815  */
1816 struct IteratorContext
1817 {
1818   /**
1819    * Function to call on each connected neighbour.
1820    */
1821   GST_NeighbourIterator cb;
1822
1823   /**
1824    * Closure for 'cb'.
1825    */
1826   void *cb_cls;
1827 };
1828
1829
1830 /**
1831  * Call the callback from the closure for each connected neighbour.
1832  *
1833  * @param cls the 'struct IteratorContext'
1834  * @param key the hash of the public key of the neighbour
1835  * @param value the 'struct NeighbourMapEntry'
1836  * @return GNUNET_OK (continue to iterate)
1837  */
1838 static int
1839 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1840 {
1841   struct IteratorContext *ic = cls;
1842   struct NeighbourMapEntry *n = value;
1843
1844   if (!is_connected (n))
1845     return GNUNET_OK;
1846
1847   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->plugin_name, n->addr, n->addrlen);
1848   return GNUNET_OK;
1849 }
1850
1851
1852 /**
1853  * Iterate over all connected neighbours.
1854  *
1855  * @param cb function to call
1856  * @param cb_cls closure for cb
1857  */
1858 void
1859 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1860 {
1861   struct IteratorContext ic;
1862
1863   // This can happen during shutdown
1864   if (neighbours == NULL)
1865   {
1866     return;
1867   }
1868
1869   ic.cb = cb;
1870   ic.cb_cls = cb_cls;
1871   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1872 }
1873
1874 /**
1875  * If we have an active connection to the given target, it must be shutdown.
1876  *
1877  * @param target peer to disconnect from
1878  */
1879 void
1880 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1881 {
1882   struct NeighbourMapEntry *n;
1883
1884   // This can happen during shutdown
1885   if (neighbours == NULL)
1886   {
1887     return;
1888   }
1889
1890   n = lookup_neighbour (target);
1891   if (NULL == n)
1892     return;                     /* not active */
1893   if (is_connected (n))
1894   {
1895     send_disconnect (&n->id, n->plugin_name, n->addr, n->addrlen, n->session);
1896
1897     n = lookup_neighbour (target);
1898     if (NULL == n)
1899       return;                   /* gone already */
1900   }
1901   disconnect_neighbour (n);
1902 }
1903
1904
1905 /**
1906  * We received a disconnect message from the given peer,
1907  * validate and process.
1908  *
1909  * @param peer sender of the message
1910  * @param msg the disconnect message
1911  */
1912 void
1913 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
1914                                           *peer,
1915                                           const struct GNUNET_MessageHeader
1916                                           *msg)
1917 {
1918   struct NeighbourMapEntry *n;
1919   const struct SessionDisconnectMessage *sdm;
1920   GNUNET_HashCode hc;
1921
1922 #if DEBUG_TRANSPORT
1923   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1924               "Received DISCONNECT message from peer `%s'\n",
1925               GNUNET_i2s (peer));
1926 #endif
1927
1928   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
1929   {
1930     // GNUNET_break_op (0);
1931     GNUNET_STATISTICS_update (GST_stats,
1932                               gettext_noop
1933                               ("# disconnect messages ignored (old format)"), 1,
1934                               GNUNET_NO);
1935     return;
1936   }
1937   sdm = (const struct SessionDisconnectMessage *) msg;
1938   n = lookup_neighbour (peer);
1939   if (NULL == n)
1940     return;                     /* gone already */
1941   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
1942       n->connect_ts.abs_value)
1943   {
1944     GNUNET_STATISTICS_update (GST_stats,
1945                               gettext_noop
1946                               ("# disconnect messages ignored (timestamp)"), 1,
1947                               GNUNET_NO);
1948     return;
1949   }
1950   GNUNET_CRYPTO_hash (&sdm->public_key,
1951                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1952                       &hc);
1953   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
1954   {
1955     GNUNET_break_op (0);
1956     return;
1957   }
1958   if (ntohl (sdm->purpose.size) !=
1959       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1960       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1961       sizeof (struct GNUNET_TIME_AbsoluteNBO))
1962   {
1963     GNUNET_break_op (0);
1964     return;
1965   }
1966   if (GNUNET_OK !=
1967       GNUNET_CRYPTO_rsa_verify
1968       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
1969        &sdm->signature, &sdm->public_key))
1970   {
1971     GNUNET_break_op (0);
1972     return;
1973   }
1974   GST_neighbours_force_disconnect (peer);
1975 }
1976
1977 /**
1978  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
1979  * Consider switching to it.
1980  *
1981  * @param message possibly a 'struct SessionConnectMessage' (check format)
1982  * @param peer identity of the peer to switch the address for
1983  * @param plugin_name name of transport that delivered the PONG
1984  * @param address address of the other peer, NULL if other peer
1985  *                       connected to us
1986  * @param address_len number of bytes in address
1987  * @param session session to use (or NULL)
1988  * @param ats performance data
1989  * @param ats_count number of entries in ats
1990   */
1991 void
1992 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
1993                                    const struct GNUNET_PeerIdentity *peer,
1994                                    const char *plugin_name,
1995                                    const char *sender_address,
1996                                    uint16_t sender_address_len,
1997                                    struct Session *session,
1998                                    const struct GNUNET_ATS_Information *ats,
1999                                    uint32_t ats_count)
2000 {
2001   const struct SessionConnectMessage *scm;
2002   struct QuotaSetMessage q_msg;
2003   struct GNUNET_MessageHeader msg;
2004   struct NeighbourMapEntry *n;
2005   size_t msg_len;
2006   size_t ret;
2007   int was_connected;
2008
2009 #if DEBUG_TRANSPORT
2010   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2011               "Received CONNECT_ACK message from peer `%s'\n",
2012               GNUNET_i2s (peer));
2013 #endif
2014
2015   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2016   {
2017     GNUNET_break_op (0);
2018     return;
2019   }
2020
2021   scm = (const struct SessionConnectMessage *) message;
2022   GNUNET_break_op (ntohl (scm->reserved) == 0);
2023   n = lookup_neighbour (peer);
2024   if (NULL == n)
2025     n = setup_neighbour (peer);
2026
2027   if (!is_connecting(n))
2028   {
2029     GNUNET_STATISTICS_update (GST_stats,
2030         gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
2031         GNUNET_NO);
2032     return;
2033   }
2034
2035   if (NULL != session)
2036     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2037                      "transport-ats",
2038                      "Giving ATS session %p of plugin %s for peer %s\n",
2039                      session, plugin_name, GNUNET_i2s (peer));
2040   GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2041                              sender_address_len, session, ats, ats_count);
2042
2043   was_connected = is_connected (n);
2044   if (!is_connected (n))
2045     change_state (n, S_CONNECTED);
2046
2047   GNUNET_ATS_address_in_use (GST_ats, &n->id, n->plugin_name, n->addr,
2048                              n->addrlen, n->addr, GNUNET_YES);
2049
2050 #if DEBUG_TRANSPORT
2051   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052               "Setting inbound quota of %u for peer `%s' to \n",
2053               ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id));
2054 #endif
2055   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2056
2057   /* send ACK (ACK) */
2058   msg_len = sizeof (msg);
2059   msg.size = htons (msg_len);
2060   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2061
2062   ret =
2063       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2064                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2065                         n->plugin_name, n->addr, n->addrlen, GNUNET_YES, NULL,
2066                         NULL);
2067
2068   if (ret == GNUNET_SYSERR)
2069     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2070                 "Failed to send SESSION_ACK to `%4s' using plugin `%s' address '%s' session %X\n",
2071                 GNUNET_i2s (&n->id), n->plugin_name,
2072                 (n->addrlen ==
2073                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2074                                                      n->addrlen), n->session);
2075
2076
2077   if (!was_connected)
2078   {
2079     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2080       n->keepalive_task =
2081           GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2082                                         &neighbour_keepalive_task, n);
2083
2084     neighbours_connected++;
2085     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2086                               GNUNET_NO);
2087 #if DEBUG_TRANSPORT
2088     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2089                 "Notify about connect of `%4s' using plugin `%s' address '%s' session %X LINE %u\n",
2090                 GNUNET_i2s (&n->id), n->plugin_name,
2091                 (n->addrlen ==
2092                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2093                                                      n->addrlen), n->session,
2094                 __LINE__);
2095 #endif
2096     connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2097   }
2098
2099 #if DEBUG_TRANSPORT
2100   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2101               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2102               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2103 #endif
2104   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2105   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2106   q_msg.quota = n->bandwidth_out;
2107   q_msg.peer = (*peer);
2108   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2109
2110 }
2111
2112 void
2113 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2114                            const struct GNUNET_PeerIdentity *peer,
2115                            const char *plugin_name, const char *sender_address,
2116                            uint16_t sender_address_len, struct Session *session,
2117                            const struct GNUNET_ATS_Information *ats,
2118                            uint32_t ats_count)
2119 {
2120   struct NeighbourMapEntry *n;
2121   struct QuotaSetMessage q_msg;
2122   int was_connected;
2123
2124 #if DEBUG_TRANSPORT
2125   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2126               GNUNET_i2s (peer));
2127 #endif
2128
2129   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2130   {
2131     GNUNET_break_op (0);
2132     return;
2133   }
2134
2135   n = lookup_neighbour (peer);
2136   if (NULL == n)
2137   {
2138     send_disconnect (peer, plugin_name, sender_address, sender_address_len,
2139                      session);
2140     GNUNET_break (0);
2141     return;
2142   }
2143
2144   if (is_connected (n))
2145     return;
2146
2147   if (!is_connecting(n))
2148   {
2149     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2150                               GNUNET_NO);
2151     return;
2152   }
2153
2154   if (NULL != session)
2155     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2156                      "transport-ats",
2157                      "Giving ATS session %p of plugin %s for peer %s\n",
2158                      session, plugin_name, GNUNET_i2s (peer));
2159   GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2160                              sender_address_len, session, ats, ats_count);
2161
2162   was_connected = is_connected (n);
2163   change_state (n, S_CONNECTED);
2164
2165   GNUNET_ATS_address_in_use (GST_ats, &n->id, n->plugin_name, n->addr,
2166                              n->addrlen, n->addr, GNUNET_YES);
2167
2168   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2169
2170   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2171     n->keepalive_task =
2172         GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2173                                       &neighbour_keepalive_task, n);
2174
2175   if (!was_connected)
2176   {
2177     neighbours_connected++;
2178     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2179                               GNUNET_NO);
2180
2181 #if DEBUG_TRANSPORT
2182     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2183                 "Notify about connect of `%4s' using plugin `%s' address '%s' session %X LINE %u\n",
2184                 GNUNET_i2s (&n->id), n->plugin_name,
2185                 (n->addrlen ==
2186                  0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name, n->addr,
2187                                                      n->addrlen), n->session,
2188                 __LINE__);
2189 #endif
2190     connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2191   }
2192 #if DEBUG_TRANSPORT
2193   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2194               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2195               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2196 #endif
2197   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2198   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2199   q_msg.quota = n->bandwidth_out;
2200   q_msg.peer = (*peer);
2201   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2202 }
2203
2204 struct BlackListCheckContext
2205 {
2206   struct GNUNET_ATS_Information *ats;
2207
2208   uint32_t ats_count;
2209
2210   struct Session *session;
2211
2212   char *sender_address;
2213
2214   uint16_t sender_address_len;
2215
2216   char *plugin_name;
2217
2218   struct GNUNET_TIME_Absolute ts;
2219 };
2220
2221
2222 static void
2223 handle_connect_blacklist_cont (void *cls,
2224                                const struct GNUNET_PeerIdentity *peer,
2225                                int result)
2226 {
2227   struct NeighbourMapEntry *n;
2228   struct BlackListCheckContext *bcc = cls;
2229
2230 #if DEBUG_TRANSPORT
2231   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2232               "Blacklist check due to CONNECT message: `%s'\n",
2233               GNUNET_i2s (peer),
2234               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2235 #endif
2236
2237   /* not allowed */
2238   if (GNUNET_OK != result)
2239   {
2240     GNUNET_free (bcc);
2241     return;
2242   }
2243
2244   n = lookup_neighbour (peer);
2245   if (NULL == n)
2246     n = setup_neighbour (peer);
2247
2248   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2249   {
2250     if (NULL != bcc->session)
2251       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2252                        "transport-ats",
2253                        "Giving ATS session %p of plugin %s address `%s' for peer %s\n",
2254                        bcc->session, bcc->plugin_name,
2255                        GST_plugins_a2s (bcc->plugin_name, bcc->sender_address,
2256                                         bcc->sender_address_len),
2257                        GNUNET_i2s (peer));
2258     GNUNET_ATS_address_update (GST_ats, peer, bcc->plugin_name,
2259                                bcc->sender_address, bcc->sender_address_len,
2260                                bcc->session, bcc->ats, bcc->ats_count);
2261     n->connect_ts = bcc->ts;
2262   }
2263
2264   GNUNET_free (bcc);
2265
2266   if (n->state != S_CONNECT_RECV)
2267     change_state (n, S_CONNECT_RECV);
2268
2269   /* Ask ATS for an address to connect via that address */
2270   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2271     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2272   n->ats_suggest =
2273       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2274                                     n);
2275   GNUNET_ATS_suggest_address (GST_ats, peer);
2276 }
2277
2278 /**
2279  * We received a 'SESSION_CONNECT' message from the other peer.
2280  * Consider switching to it.
2281  *
2282  * @param message possibly a 'struct SessionConnectMessage' (check format)
2283  * @param peer identity of the peer to switch the address for
2284  * @param plugin_name name of transport that delivered the PONG
2285  * @param address address of the other peer, NULL if other peer
2286  *                       connected to us
2287  * @param address_len number of bytes in address
2288  * @param session session to use (or NULL)
2289  * @param ats performance data
2290  * @param ats_count number of entries in ats (excluding 0-termination)
2291   */
2292 void
2293 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2294                                const struct GNUNET_PeerIdentity *peer,
2295                                const char *plugin_name,
2296                                const char *sender_address,
2297                                uint16_t sender_address_len,
2298                                struct Session *session,
2299                                const struct GNUNET_ATS_Information *ats,
2300                                uint32_t ats_count)
2301 {
2302   const struct SessionConnectMessage *scm;
2303   struct NeighbourMapEntry *n;
2304   struct BlackListCheckContext *bcc = NULL;
2305
2306 #if DEBUG_TRANSPORT
2307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2308               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2309 #endif
2310
2311   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2312   {
2313     GNUNET_break_op (0);
2314     return;
2315   }
2316
2317   scm = (const struct SessionConnectMessage *) message;
2318   GNUNET_break_op (ntohl (scm->reserved) == 0);
2319
2320   n = lookup_neighbour (peer);
2321   if (n != NULL)
2322   {
2323     /* connected peer switches addresses */
2324     if (is_connected (n))
2325     {
2326       GNUNET_ATS_address_update (GST_ats, peer, plugin_name, sender_address,
2327                                  sender_address_len, session, ats, ats_count);
2328       return;
2329     }
2330   }
2331
2332   /* we are not connected to this peer */
2333   /* do blacklist check */
2334   bcc =
2335       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2336                      sizeof (struct GNUNET_ATS_Information) * ats_count +
2337                      sender_address_len + strlen (plugin_name) + 1);
2338
2339   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2340
2341   bcc->ats_count = ats_count;
2342   bcc->sender_address_len = sender_address_len;
2343   bcc->session = session;
2344
2345   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2346   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2347
2348   bcc->sender_address = (char *) &bcc->ats[ats_count];
2349   memcpy (bcc->sender_address, sender_address, sender_address_len);
2350
2351   bcc->plugin_name = &bcc->sender_address[sender_address_len];
2352   strcpy (bcc->plugin_name, plugin_name);
2353
2354   GST_blacklist_test_allowed (peer, plugin_name, handle_connect_blacklist_cont,
2355                               bcc);
2356 }
2357
2358
2359 /* end of file gnunet-service-transport_neighbours.c */