coverity 10048
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours and measure
52  * the latency with this neighbour?
53  * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we
54  * send 10 keepalives in each interval, so 10 messages would need to be
55  * lost in a row for a disconnect).
56  */
57 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
58
59
60 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
61
62 #define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
63
64 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
65
66
67 /**
68  * Entry in neighbours.
69  */
70 struct NeighbourMapEntry;
71
72 GNUNET_NETWORK_STRUCT_BEGIN
73
74 /**
75  * Message a peer sends to another to indicate its
76  * preference for communicating via a particular
77  * session (and the desire to establish a real
78  * connection).
79  */
80 struct SessionConnectMessage
81 {
82   /**
83    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
84    */
85   struct GNUNET_MessageHeader header;
86
87   /**
88    * Always zero.
89    */
90   uint32_t reserved GNUNET_PACKED;
91
92   /**
93    * Absolute time at the sender.  Only the most recent connect
94    * message implies which session is preferred by the sender.
95    */
96   struct GNUNET_TIME_AbsoluteNBO timestamp;
97
98 };
99
100
101 struct SessionDisconnectMessage
102 {
103   /**
104    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
105    */
106   struct GNUNET_MessageHeader header;
107
108   /**
109    * Always zero.
110    */
111   uint32_t reserved GNUNET_PACKED;
112
113   /**
114    * Purpose of the signature.  Extends over the timestamp.
115    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
116    */
117   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
118
119   /**
120    * Absolute time at the sender.  Only the most recent connect
121    * message implies which session is preferred by the sender.
122    */
123   struct GNUNET_TIME_AbsoluteNBO timestamp;
124
125   /**
126    * Public key of the sender.
127    */
128   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
129
130   /**
131    * Signature of the peer that sends us the disconnect.  Only
132    * valid if the timestamp is AFTER the timestamp from the
133    * corresponding 'CONNECT' message.
134    */
135   struct GNUNET_CRYPTO_RsaSignature signature;
136
137 };
138 GNUNET_NETWORK_STRUCT_END
139
140 /**
141  * For each neighbour we keep a list of messages
142  * that we still want to transmit to the neighbour.
143  */
144 struct MessageQueue
145 {
146
147   /**
148    * This is a doubly linked list.
149    */
150   struct MessageQueue *next;
151
152   /**
153    * This is a doubly linked list.
154    */
155   struct MessageQueue *prev;
156
157   /**
158    * Once this message is actively being transmitted, which
159    * neighbour is it associated with?
160    */
161   struct NeighbourMapEntry *n;
162
163   /**
164    * Function to call once we're done.
165    */
166   GST_NeighbourSendContinuation cont;
167
168   /**
169    * Closure for 'cont'
170    */
171   void *cont_cls;
172
173   /**
174    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
175    * stuck together in memory.  Allocated at the end of this struct.
176    */
177   const char *message_buf;
178
179   /**
180    * Size of the message buf
181    */
182   size_t message_buf_size;
183
184   /**
185    * At what time should we fail?
186    */
187   struct GNUNET_TIME_Absolute timeout;
188
189 };
190
191
192 enum State
193 {
194   /**
195    * fresh peer or completely disconnected
196    */
197   S_NOT_CONNECTED,
198
199   /**
200    * sent CONNECT message to other peer, waiting for CONNECT_ACK
201    */
202   S_CONNECT_SENT,
203
204   /**
205    * received CONNECT message to other peer, sending CONNECT_ACK
206    */
207   S_CONNECT_RECV,
208
209   /**
210    * received ACK or payload
211    */
212   S_CONNECTED,
213
214   /**
215    * connection ended, fast reconnect
216    */
217   S_FAST_RECONNECT,
218
219   /**
220    * Disconnect in progress
221    */
222   S_DISCONNECT
223 };
224
225 enum Address_State
226 {
227   USED,
228   UNUSED,
229   FRESH,
230 };
231
232
233 /**
234  * Entry in neighbours.
235  */
236 struct NeighbourMapEntry
237 {
238
239   /**
240    * Head of list of messages we would like to send to this peer;
241    * must contain at most one message per client.
242    */
243   struct MessageQueue *messages_head;
244
245   /**
246    * Tail of list of messages we would like to send to this peer; must
247    * contain at most one message per client.
248    */
249   struct MessageQueue *messages_tail;
250
251   /**
252    * Are we currently trying to send a message? If so, which one?
253    */
254   struct MessageQueue *is_active;
255
256   /**
257    * Active session for communicating with the peer.
258    */
259   struct Session *session;
260
261   /**
262    * Address we currently use.
263    */
264   struct GNUNET_HELLO_Address *address;
265
266   /**
267    * Identity of this neighbour.
268    */
269   struct GNUNET_PeerIdentity id;
270
271   /**
272    * ID of task scheduled to run when this peer is about to
273    * time out (will free resources associated with the peer).
274    */
275   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
276
277   /**
278    * ID of task scheduled to send keepalives.
279    */
280   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
281
282   /**
283    * ID of task scheduled to run when we should try transmitting
284    * the head of the message queue.
285    */
286   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
287
288   /**
289    * Tracker for inbound bandwidth.
290    */
291   struct GNUNET_BANDWIDTH_Tracker in_tracker;
292
293   /**
294    * Inbound bandwidth from ATS, activated when connection is up
295    */
296   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
297
298   /**
299    * Inbound bandwidth from ATS, activated when connection is up
300    */
301   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
302
303   /**
304    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
305    */
306   struct GNUNET_TIME_Absolute connect_ts;
307
308   /**
309    * When did we sent the last keep-alive message?
310    */
311   struct GNUNET_TIME_Absolute keep_alive_sent;
312
313   /**
314    * Latest calculated latency value
315    */
316   struct GNUNET_TIME_Relative latency;
317
318   /**
319    * Timeout for ATS
320    * We asked ATS for a new address for this peer
321    */
322   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
323
324   /**
325    * Task the resets the peer state after due to an pending
326    * unsuccessful connection setup
327    */
328   GNUNET_SCHEDULER_TaskIdentifier state_reset;
329
330
331   /**
332    * How often has the other peer (recently) violated the inbound
333    * traffic limit?  Incremented by 10 per violation, decremented by 1
334    * per non-violation (for each time interval).
335    */
336   unsigned int quota_violation_count;
337
338
339   /**
340    * The current state of the peer
341    * Element of enum State
342    */
343   int state;
344
345   /**
346    * Did we sent an KEEP_ALIVE message and are we expecting a response?
347    */
348   int expect_latency_response;
349   int address_state;
350 };
351
352
353 /**
354  * All known neighbours and their HELLOs.
355  */
356 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
357
358 /**
359  * Closure for connect_notify_cb and disconnect_notify_cb
360  */
361 static void *callback_cls;
362
363 /**
364  * Function to call when we connected to a neighbour.
365  */
366 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
367
368 /**
369  * Function to call when we disconnected from a neighbour.
370  */
371 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
372
373 /**
374  * counter for connected neighbours
375  */
376 static int neighbours_connected;
377
378 /**
379  * Lookup a neighbour entry in the neighbours hash map.
380  *
381  * @param pid identity of the peer to look up
382  * @return the entry, NULL if there is no existing record
383  */
384 static struct NeighbourMapEntry *
385 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
386 {
387   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
388 }
389
390 #define change_state(n, state, ...) change (n, state, __LINE__)
391
392 static int
393 is_connecting (struct NeighbourMapEntry *n)
394 {
395   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
396     return GNUNET_YES;
397   return GNUNET_NO;
398 }
399
400 static int
401 is_connected (struct NeighbourMapEntry *n)
402 {
403   if (n->state == S_CONNECTED)
404     return GNUNET_YES;
405   return GNUNET_NO;
406 }
407
408 static int
409 is_disconnecting (struct NeighbourMapEntry *n)
410 {
411   if (n->state == S_DISCONNECT)
412     return GNUNET_YES;
413   return GNUNET_NO;
414 }
415
416 static const char *
417 print_state (int state)
418 {
419   switch (state)
420   {
421   case S_CONNECTED:
422     return "S_CONNECTED";
423     break;
424   case S_CONNECT_RECV:
425     return "S_CONNECT_RECV";
426     break;
427   case S_CONNECT_SENT:
428     return "S_CONNECT_SENT";
429     break;
430   case S_DISCONNECT:
431     return "S_DISCONNECT";
432     break;
433   case S_NOT_CONNECTED:
434     return "S_NOT_CONNECTED";
435     break;
436   case S_FAST_RECONNECT:
437     return "S_FAST_RECONNECT";
438     break;
439   default:
440     GNUNET_break (0);
441     break;
442   }
443   return NULL;
444 }
445
446 static int
447 change (struct NeighbourMapEntry *n, int state, int line);
448
449 static void
450 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
451
452
453 static void
454 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
455 {
456   struct NeighbourMapEntry *n = cls;
457
458   if (n == NULL)
459     return;
460
461   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
462   if (n->state == S_CONNECTED)
463     return;
464
465 #if DEBUG_TRANSPORT
466   GNUNET_STATISTICS_update (GST_stats,
467                             gettext_noop
468                             ("# failed connection attempts due to timeout"), 1,
469                             GNUNET_NO);
470 #endif
471
472   /* resetting state */
473   n->state = S_NOT_CONNECTED;
474
475   /* destroying address */
476   if (n->address != NULL)
477   {
478     GNUNET_assert (strlen (n->address->transport_name) > 0);
479     GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session);
480   }
481
482   /* request new address */
483   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
484     GNUNET_SCHEDULER_cancel (n->ats_suggest);
485   n->ats_suggest =
486       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
487                                     n);
488   GNUNET_ATS_suggest_address (GST_ats, &n->id);
489 }
490
491 static int
492 change (struct NeighbourMapEntry *n, int state, int line)
493 {
494   /* allowed transitions */
495   int allowed = GNUNET_NO;
496
497   switch (n->state)
498   {
499   case S_NOT_CONNECTED:
500     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
501         (state == S_DISCONNECT))
502       allowed = GNUNET_YES;
503     break;
504   case S_CONNECT_RECV:
505     allowed = GNUNET_YES;
506     break;
507   case S_CONNECT_SENT:
508     allowed = GNUNET_YES;
509     break;
510   case S_CONNECTED:
511     if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT))
512       allowed = GNUNET_YES;
513     break;
514   case S_DISCONNECT:
515     break;
516   case S_FAST_RECONNECT:
517     if ((state == S_CONNECTED) || (state == S_DISCONNECT))
518       allowed = GNUNET_YES;
519     break;
520   default:
521     GNUNET_break (0);
522     break;
523   }
524   if (allowed == GNUNET_NO)
525   {
526     char *old = GNUNET_strdup (print_state (n->state));
527     char *new = GNUNET_strdup (print_state (state));
528
529     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
530                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
531                 new, line);
532     GNUNET_break (0);
533     GNUNET_free (old);
534     GNUNET_free (new);
535     return GNUNET_SYSERR;
536   }
537 #if DEBUG_TRANSPORT
538   {
539     char *old = GNUNET_strdup (print_state (n->state));
540     char *new = GNUNET_strdup (print_state (state));
541
542     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
544                 GNUNET_i2s (&n->id), n, old, new, line);
545     GNUNET_free (old);
546     GNUNET_free (new);
547   }
548 #endif
549   n->state = state;
550
551   switch (n->state)
552   {
553   case S_FAST_RECONNECT:
554   case S_CONNECT_RECV:
555   case S_CONNECT_SENT:
556     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
557       GNUNET_SCHEDULER_cancel (n->state_reset);
558     n->state_reset =
559         GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
560     break;
561   case S_CONNECTED:
562   case S_NOT_CONNECTED:
563   case S_DISCONNECT:
564     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
565     {
566 #if DEBUG_TRANSPORT
567       char *old = GNUNET_strdup (print_state (n->state));
568       char *new = GNUNET_strdup (print_state (state));
569
570       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
571                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
572                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new);
573       GNUNET_free (old);
574       GNUNET_free (new);
575 #endif
576       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
577       GNUNET_SCHEDULER_cancel (n->state_reset);
578       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
579     }
580     break;
581
582   default:
583     GNUNET_assert (0);
584   }
585
586
587
588   return GNUNET_OK;
589 }
590
591 static ssize_t
592 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
593                   size_t msgbuf_size, uint32_t priority,
594                   struct GNUNET_TIME_Relative timeout, struct Session *session,
595                   const struct GNUNET_HELLO_Address *address, int force_address,
596                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
597 {
598   struct GNUNET_TRANSPORT_PluginFunctions *papi;
599   size_t ret = GNUNET_SYSERR;
600
601   /* FIXME : ats returns an address with all values 0 */
602   if (address == NULL)
603   {
604     if (cont != NULL)
605       cont (cont_cls, target, GNUNET_SYSERR);
606     return GNUNET_SYSERR;
607   }
608
609   if ((session == NULL) && (address->address_length == 0))
610   {
611     if (cont != NULL)
612       cont (cont_cls, target, GNUNET_SYSERR);
613     return GNUNET_SYSERR;
614   }
615
616   papi = GST_plugins_find (address->transport_name);
617   if (papi == NULL)
618   {
619     if (cont != NULL)
620       cont (cont_cls, target, GNUNET_SYSERR);
621     return GNUNET_SYSERR;
622   }
623
624   ret =
625       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
626                   address->address, address->address_length, GNUNET_YES, cont,
627                   cont_cls);
628
629   if (ret == -1)
630   {
631     if (cont != NULL)
632       cont (cont_cls, target, GNUNET_SYSERR);
633   }
634   return ret;
635 }
636
637
638 /**
639  * Task invoked to start a transmission to another peer.
640  *
641  * @param cls the 'struct NeighbourMapEntry'
642  * @param tc scheduler context
643  */
644 static void
645 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
646
647
648 /**
649  * We're done with our transmission attempt, continue processing.
650  *
651  * @param cls the 'struct MessageQueue' of the message
652  * @param receiver intended receiver
653  * @param success whether it worked or not
654  */
655 static void
656 transmit_send_continuation (void *cls,
657                             const struct GNUNET_PeerIdentity *receiver,
658                             int success)
659 {
660   struct MessageQueue *mq = cls;
661   struct NeighbourMapEntry *n;
662   struct NeighbourMapEntry *tmp;
663
664   tmp = lookup_neighbour (receiver);
665   n = mq->n;
666   if ((NULL != n) && (tmp != NULL) && (tmp == n))
667   {
668     GNUNET_assert (n->is_active == mq);
669     n->is_active = NULL;
670     if (success == GNUNET_YES)
671     {
672       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
673       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
674     }
675   }
676 #if DEBUG_TRANSPORT
677   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
678               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
679               (success == GNUNET_OK) ? "successful" : "FAILED");
680 #endif
681   if (NULL != mq->cont)
682     mq->cont (mq->cont_cls, success);
683   GNUNET_free (mq);
684 }
685
686
687 /**
688  * Check the ready list for the given neighbour and if a plugin is
689  * ready for transmission (and if we have a message), do so!
690  *
691  * @param n target peer for which to transmit
692  */
693 static void
694 try_transmission_to_peer (struct NeighbourMapEntry *n)
695 {
696   struct MessageQueue *mq;
697   struct GNUNET_TIME_Relative timeout;
698   ssize_t ret;
699
700   if (n->is_active != NULL)
701   {
702     GNUNET_break (0);
703     return;                     /* transmission already pending */
704   }
705   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
706   {
707     GNUNET_break (0);
708     return;                     /* currently waiting for bandwidth */
709   }
710   while (NULL != (mq = n->messages_head))
711   {
712     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
713     if (timeout.rel_value > 0)
714       break;
715     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
716     n->is_active = mq;
717     mq->n = n;
718     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
719   }
720   if (NULL == mq)
721     return;                     /* no more messages */
722
723   if (n->address == NULL)
724   {
725 #if DEBUG_TRANSPORT
726     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
727                 GNUNET_i2s (&n->id));
728 #endif
729     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
730     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
731     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
732     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
733     return;
734   }
735
736   if (GST_plugins_find (n->address->transport_name) == NULL)
737   {
738     GNUNET_break (0);
739     return;
740   }
741   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
742   n->is_active = mq;
743   mq->n = n;
744
745   if ((n->address->address_length == 0) && (n->session == NULL))
746   {
747     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
748                 GNUNET_i2s (&n->id));
749     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
750     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
751     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
752     return;
753   }
754
755   ret =
756       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
757                         timeout, n->session, n->address, GNUNET_YES,
758                         &transmit_send_continuation, mq);
759   if (ret == -1)
760   {
761     /* failure, but 'send' would not call continuation in this case,
762      * so we need to do it here! */
763     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
764   }
765
766 }
767
768
769 /**
770  * Task invoked to start a transmission to another peer.
771  *
772  * @param cls the 'struct NeighbourMapEntry'
773  * @param tc scheduler context
774  */
775 static void
776 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
777 {
778   struct NeighbourMapEntry *n = cls;
779
780   GNUNET_assert (NULL != lookup_neighbour (&n->id));
781   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
782   try_transmission_to_peer (n);
783 }
784
785
786 /**
787  * Initialize the neighbours subsystem.
788  *
789  * @param cls closure for callbacks
790  * @param connect_cb function to call if we connect to a peer
791  * @param disconnect_cb function to call if we disconnect from a peer
792  */
793 void
794 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
795                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
796 {
797   callback_cls = cls;
798   connect_notify_cb = connect_cb;
799   disconnect_notify_cb = disconnect_cb;
800   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
801 }
802
803
804 static void
805 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
806                       int result)
807 {
808 #if DEBUG_TRANSPORT
809   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810               "Sending DISCONNECT message to peer `%4s': %i\n",
811               GNUNET_i2s (target), result);
812 #endif
813 }
814
815
816 static int
817 send_disconnect (const struct GNUNET_PeerIdentity *target,
818                  const struct GNUNET_HELLO_Address *address,
819                  struct Session *session)
820 {
821   size_t ret;
822   struct SessionDisconnectMessage disconnect_msg;
823
824 #if DEBUG_TRANSPORT
825   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
826               "Sending DISCONNECT message to peer `%4s'\n",
827               GNUNET_i2s (target));
828 #endif
829
830   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
831   disconnect_msg.header.type =
832       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
833   disconnect_msg.reserved = htonl (0);
834   disconnect_msg.purpose.size =
835       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
836              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
837              sizeof (struct GNUNET_TIME_AbsoluteNBO));
838   disconnect_msg.purpose.purpose =
839       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
840   disconnect_msg.timestamp =
841       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
842   disconnect_msg.public_key = GST_my_public_key;
843   GNUNET_assert (GNUNET_OK ==
844                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
845                                          &disconnect_msg.purpose,
846                                          &disconnect_msg.signature));
847
848   ret =
849       send_with_plugin (target, (const char *) &disconnect_msg,
850                         sizeof (disconnect_msg), UINT32_MAX,
851                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
852                         GNUNET_YES, &send_disconnect_cont, NULL);
853
854   if (ret == GNUNET_SYSERR)
855     return GNUNET_SYSERR;
856
857   GNUNET_STATISTICS_update (GST_stats,
858                             gettext_noop
859                             ("# peers disconnected due to external request"), 1,
860                             GNUNET_NO);
861   return GNUNET_OK;
862 }
863
864
865 /**
866  * Disconnect from the given neighbour, clean up the record.
867  *
868  * @param n neighbour to disconnect from
869  */
870 static void
871 disconnect_neighbour (struct NeighbourMapEntry *n)
872 {
873   struct MessageQueue *mq;
874   int previous_state;
875
876   previous_state = n->state;
877
878   if (is_disconnecting (n))
879     return;
880
881   /* send DISCONNECT MESSAGE */
882   if (previous_state == S_CONNECTED)
883   {
884     if (GNUNET_OK == send_disconnect (&n->id, n->address, n->session))
885       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
886                   GNUNET_i2s (&n->id));
887     else
888       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
889                   "Could not send DISCONNECT_MSG to `%s'\n",
890                   GNUNET_i2s (&n->id));
891   }
892
893   change_state (n, S_DISCONNECT);
894
895   if (previous_state == S_CONNECTED)
896   {
897     GNUNET_assert (NULL != n->address);
898     if (n->address_state == USED)
899     {
900       GST_validation_set_address_use (n->address, n->session, GNUNET_NO);
901       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
902       n->address_state = UNUSED;
903     }
904   }
905
906   if (n->address != NULL)
907   {
908     struct GNUNET_TRANSPORT_PluginFunctions *papi;
909
910     papi = GST_plugins_find (n->address->transport_name);
911     if (papi != NULL)
912       papi->disconnect (papi->cls, &n->id);
913   }
914   while (NULL != (mq = n->messages_head))
915   {
916     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
917     if (NULL != mq->cont)
918       mq->cont (mq->cont_cls, GNUNET_SYSERR);
919     GNUNET_free (mq);
920   }
921   if (NULL != n->is_active)
922   {
923     n->is_active->n = NULL;
924     n->is_active = NULL;
925   }
926
927   switch (previous_state)
928   {
929   case S_CONNECTED:
930     GNUNET_assert (neighbours_connected > 0);
931     neighbours_connected--;
932     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
933     GNUNET_SCHEDULER_cancel (n->keepalive_task);
934     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
935     n->expect_latency_response = GNUNET_NO;
936     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
937                               GNUNET_NO);
938     disconnect_notify_cb (callback_cls, &n->id);
939     break;
940   case S_FAST_RECONNECT:
941     GNUNET_STATISTICS_update (GST_stats,
942                               gettext_noop ("# fast reconnects failed"), 1,
943                               GNUNET_NO);
944     disconnect_notify_cb (callback_cls, &n->id);
945     break;
946   default:
947     break;
948   }
949
950   GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id);
951
952   GNUNET_assert (GNUNET_YES ==
953                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
954                                                        &n->id.hashPubKey, n));
955   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
956   {
957     GNUNET_SCHEDULER_cancel (n->ats_suggest);
958     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
959   }
960   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
961   {
962     GNUNET_SCHEDULER_cancel (n->timeout_task);
963     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
964   }
965   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
966   {
967     GNUNET_SCHEDULER_cancel (n->transmission_task);
968     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
969   }
970   if (NULL != n->address)
971   {
972     GNUNET_HELLO_address_free (n->address);
973     n->address = NULL;
974   }
975   n->session = NULL;
976   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
977               GNUNET_i2s (&n->id), n);
978   GNUNET_free (n);
979 }
980
981
982 /**
983  * Peer has been idle for too long. Disconnect.
984  *
985  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
986  * @param tc scheduler context
987  */
988 static void
989 neighbour_timeout_task (void *cls,
990                         const struct GNUNET_SCHEDULER_TaskContext *tc)
991 {
992   struct NeighbourMapEntry *n = cls;
993
994   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
995
996   GNUNET_STATISTICS_update (GST_stats,
997                             gettext_noop
998                             ("# peers disconnected due to timeout"), 1,
999                             GNUNET_NO);
1000   disconnect_neighbour (n);
1001 }
1002
1003
1004 /**
1005  * Send another keepalive message.
1006  *
1007  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
1008  * @param tc scheduler context
1009  */
1010 static void
1011 neighbour_keepalive_task (void *cls,
1012                           const struct GNUNET_SCHEDULER_TaskContext *tc)
1013 {
1014   struct NeighbourMapEntry *n = cls;
1015   struct GNUNET_MessageHeader m;
1016   int ret;
1017
1018   GNUNET_assert (S_CONNECTED == n->state);
1019   n->keepalive_task =
1020       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1021                                     &neighbour_keepalive_task, n);
1022
1023   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
1024                             GNUNET_NO);
1025   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1026   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
1027
1028
1029   ret =
1030       send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1031                         UINT32_MAX /* priority */ ,
1032                         GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1033                         GNUNET_YES, NULL, NULL);
1034
1035   n->expect_latency_response = GNUNET_NO;
1036   n->keep_alive_sent = GNUNET_TIME_absolute_get_zero ();
1037   if (ret != GNUNET_SYSERR)
1038   {
1039     n->expect_latency_response = GNUNET_YES;
1040     n->keep_alive_sent = GNUNET_TIME_absolute_get ();
1041   }
1042
1043 }
1044
1045
1046 /**
1047  * Disconnect from the given neighbour.
1048  *
1049  * @param cls unused
1050  * @param key hash of neighbour's public key (not used)
1051  * @param value the 'struct NeighbourMapEntry' of the neighbour
1052  */
1053 static int
1054 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
1055 {
1056   struct NeighbourMapEntry *n = value;
1057
1058 #if DEBUG_TRANSPORT
1059   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
1060               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
1061 #endif
1062   if (S_CONNECTED == n->state)
1063     GNUNET_STATISTICS_update (GST_stats,
1064                               gettext_noop
1065                               ("# peers disconnected due to global disconnect"),
1066                               1, GNUNET_NO);
1067   disconnect_neighbour (n);
1068   return GNUNET_OK;
1069 }
1070
1071
1072 static void
1073 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1074 {
1075   struct NeighbourMapEntry *n = cls;
1076
1077   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1078
1079   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1080               "ATS did not suggested address to connect to peer `%s'\n",
1081               GNUNET_i2s (&n->id));
1082
1083   disconnect_neighbour (n);
1084 }
1085
1086 /**
1087  * Cleanup the neighbours subsystem.
1088  */
1089 void
1090 GST_neighbours_stop ()
1091 {
1092   // This can happen during shutdown
1093   if (neighbours == NULL)
1094   {
1095     return;
1096   }
1097
1098   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1099                                          NULL);
1100   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1101 //  GNUNET_assert (neighbours_connected == 0);
1102   neighbours = NULL;
1103   callback_cls = NULL;
1104   connect_notify_cb = NULL;
1105   disconnect_notify_cb = NULL;
1106 }
1107
1108 struct ContinutionContext
1109 {
1110   struct GNUNET_HELLO_Address *address;
1111
1112   struct Session *session;
1113 };
1114
1115 static void
1116 send_outbound_quota (const struct GNUNET_PeerIdentity *target,
1117                      struct GNUNET_BANDWIDTH_Value32NBO quota)
1118 {
1119   struct QuotaSetMessage q_msg;
1120
1121 #if DEBUG_TRANSPORT
1122   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1124               ntohl (quota.value__), GNUNET_i2s (target));
1125 #endif
1126   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1127   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1128   q_msg.quota = quota;
1129   q_msg.peer = (*target);
1130   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1131 }
1132
1133 /**
1134  * We tried to send a SESSION_CONNECT message to another peer.  If this
1135  * succeeded, we change the state.  If it failed, we should tell
1136  * ATS to not use this address anymore (until it is re-validated).
1137  *
1138  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1139  * @param target peer to send the message to
1140  * @param success GNUNET_OK on success
1141  */
1142 static void
1143 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1144                            int success)
1145 {
1146   struct ContinutionContext *cc = cls;
1147   struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer);
1148
1149   if (GNUNET_YES != success)
1150   {
1151     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1152     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1153   }
1154   if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT))
1155   {
1156     GNUNET_HELLO_address_free (cc->address);
1157     GNUNET_free (cc);
1158     return;
1159   }
1160
1161   if ((GNUNET_YES == success) &&
1162       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1163   {
1164     change_state (n, S_CONNECT_SENT);
1165     GNUNET_HELLO_address_free (cc->address);
1166     GNUNET_free (cc);
1167     return;
1168   }
1169
1170   if ((GNUNET_NO == success) &&
1171       ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT)))
1172   {
1173 #if DEBUG_TRANSPORT
1174     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175                 "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1176                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1177 #endif
1178     change_state (n, S_NOT_CONNECTED);
1179     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1180       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1181     n->ats_suggest =
1182         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1183                                       n);
1184     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1185   }
1186   GNUNET_HELLO_address_free (cc->address);
1187   GNUNET_free (cc);
1188 }
1189
1190
1191 /**
1192  * We tried to switch addresses with an peer already connected. If it failed,
1193  * we should tell ATS to not use this address anymore (until it is re-validated).
1194  *
1195  * @param cls the 'struct NeighbourMapEntry'
1196  * @param target peer to send the message to
1197  * @param success GNUNET_OK on success
1198  */
1199 static void
1200 send_switch_address_continuation (void *cls,
1201                                   const struct GNUNET_PeerIdentity *target,
1202                                   int success)
1203 {
1204   struct ContinutionContext *cc = cls;
1205   struct NeighbourMapEntry *n;
1206
1207   if (neighbours == NULL)
1208   {
1209     GNUNET_HELLO_address_free (cc->address);
1210     GNUNET_free (cc);
1211     return;                     /* neighbour is going away */
1212   }
1213
1214   n = lookup_neighbour (&cc->address->peer);
1215   if ((n == NULL) || (is_disconnecting (n)))
1216   {
1217     GNUNET_HELLO_address_free (cc->address);
1218     GNUNET_free (cc);
1219     return;                     /* neighbour is going away */
1220   }
1221
1222   GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT));
1223   if (GNUNET_YES != success)
1224   {
1225 #if DEBUG_TRANSPORT
1226     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227                 "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n",
1228                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1229 #endif
1230     GNUNET_assert (strlen (cc->address->transport_name) > 0);
1231     GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1232
1233     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1234       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1235     n->ats_suggest =
1236         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1237                                       n);
1238     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1239     GNUNET_HELLO_address_free (cc->address);
1240     GNUNET_free (cc);
1241     return;
1242   }
1243   /* Tell ATS that switching addresses was successful */
1244   switch (n->state)
1245   {
1246   case S_CONNECTED:
1247     if (n->address_state == FRESH)
1248     {
1249       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES);
1250       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1251       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1252       n->address_state = USED;
1253     }
1254     break;
1255   case S_FAST_RECONNECT:
1256 #if DEBUG_TRANSPORT
1257     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258                 "Successful fast reconnect to peer `%s'\n",
1259                 GNUNET_i2s (&n->id));
1260 #endif
1261     change_state (n, S_CONNECTED);
1262     neighbours_connected++;
1263     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1264                               GNUNET_NO);
1265
1266     if (n->address_state == FRESH)
1267     {
1268       GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES);
1269       GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0);
1270       GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES);
1271       n->address_state = USED;
1272     }
1273
1274     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1275       n->keepalive_task =
1276           GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
1277
1278     /* Updating quotas */
1279     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1280     send_outbound_quota (target, n->bandwidth_out);
1281
1282   default:
1283     break;
1284   }
1285   GNUNET_HELLO_address_free (cc->address);
1286   GNUNET_free (cc);
1287 }
1288
1289
1290 /**
1291  * We tried to send a SESSION_CONNECT message to another peer.  If this
1292  * succeeded, we change the state.  If it failed, we should tell
1293  * ATS to not use this address anymore (until it is re-validated).
1294  *
1295  * @param cls the 'struct NeighbourMapEntry'
1296  * @param target peer to send the message to
1297  * @param success GNUNET_OK on success
1298  */
1299 static void
1300 send_connect_ack_continuation (void *cls,
1301                                const struct GNUNET_PeerIdentity *target,
1302                                int success)
1303 {
1304   struct ContinutionContext *cc = cls;
1305   struct NeighbourMapEntry *n;
1306
1307   if (neighbours == NULL)
1308   {
1309     GNUNET_HELLO_address_free (cc->address);
1310     GNUNET_free (cc);
1311     return;                     /* neighbour is going away */
1312   }
1313
1314   n = lookup_neighbour (&cc->address->peer);
1315   if ((n == NULL) || (is_disconnecting (n)))
1316   {
1317     GNUNET_HELLO_address_free (cc->address);
1318     GNUNET_free (cc);
1319     return;                     /* neighbour is going away */
1320   }
1321
1322   if (GNUNET_YES == success)
1323   {
1324     GNUNET_HELLO_address_free (cc->address);
1325     GNUNET_free (cc);
1326     return;                     /* sending successful */
1327   }
1328
1329   /* sending failed, ask for next address  */
1330 #if DEBUG_TRANSPORT
1331   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1332               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1333               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
1334 #endif
1335   change_state (n, S_NOT_CONNECTED);
1336   GNUNET_assert (strlen (cc->address->transport_name) > 0);
1337   GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session);
1338
1339   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1340     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1341   n->ats_suggest =
1342       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1343                                     n);
1344   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1345   GNUNET_HELLO_address_free (cc->address);
1346   GNUNET_free (cc);
1347 }
1348
1349
1350 /**
1351  * For an existing neighbour record, set the active connection to
1352  * the given address.
1353  *
1354  * @param peer identity of the peer to switch the address for
1355  * @param address address of the other peer, NULL if other peer
1356  *                       connected to us
1357  * @param session session to use (or NULL)
1358  * @param ats performance data
1359  * @param ats_count number of entries in ats
1360  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1361  *         connection is not up (yet)
1362  */
1363 int
1364 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1365                                        const struct GNUNET_HELLO_Address
1366                                        *address, struct Session *session,
1367                                        const struct GNUNET_ATS_Information *ats,
1368                                        uint32_t ats_count,
1369                                        struct GNUNET_BANDWIDTH_Value32NBO
1370                                        bandwidth_in,
1371                                        struct GNUNET_BANDWIDTH_Value32NBO
1372                                        bandwidth_out)
1373 {
1374   struct NeighbourMapEntry *n;
1375   struct SessionConnectMessage connect_msg;
1376   struct ContinutionContext *cc;
1377   size_t msg_len;
1378   size_t ret;
1379
1380   if (neighbours == NULL)
1381   {
1382     /* This can happen during shutdown */
1383     return GNUNET_NO;
1384   }
1385   n = lookup_neighbour (peer);
1386   if (NULL == n)
1387     return GNUNET_NO;
1388   if (n->state == S_DISCONNECT)
1389   {
1390     /* We are disconnecting, nothing to do here */
1391     return GNUNET_NO;
1392   }
1393   GNUNET_assert (address->transport_name != NULL);
1394   if ((session == NULL) && (0 == address->address_length))
1395   {
1396     GNUNET_break_op (0);
1397     /* FIXME: is this actually possible? When does this happen? */
1398     if (strlen (address->transport_name) > 0)
1399       GNUNET_ATS_address_destroyed (GST_ats, address, session);
1400     GNUNET_ATS_suggest_address (GST_ats, peer);
1401     return GNUNET_NO;
1402   }
1403
1404   /* checks successful and neighbour != NULL */
1405 #if DEBUG_TRANSPORT
1406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407               "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n",
1408               GST_plugins_a2s (address), session, GNUNET_i2s (peer),
1409               print_state (n->state));
1410 #endif
1411   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1412   {
1413     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1414     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1415   }
1416   /* do not switch addresses just update quotas */
1417   if ((n->state == S_CONNECTED) && (NULL != n->address) &&
1418       (0 == GNUNET_HELLO_address_cmp (address, n->address)) &&
1419       (n->session == session))
1420   {
1421     n->bandwidth_in = bandwidth_in;
1422     n->bandwidth_out = bandwidth_out;
1423     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1424     send_outbound_quota (peer, n->bandwidth_out);
1425     return GNUNET_NO;
1426   }
1427   if (n->state == S_CONNECTED)
1428   {
1429     /* mark old address as no longer used */
1430     GNUNET_assert (NULL != n->address);
1431     if (n->address_state == USED)
1432     {
1433       GST_validation_set_address_use (n->address, n->session, GNUNET_NO);
1434       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1435       n->address_state = UNUSED;
1436     }
1437
1438   }
1439
1440   /* set new address */
1441   if (NULL != n->address)
1442     GNUNET_HELLO_address_free (n->address);
1443   n->address = GNUNET_HELLO_address_copy (address);
1444   n->address_state = FRESH;
1445   n->session = session;
1446   n->bandwidth_in = bandwidth_in;
1447   n->bandwidth_out = bandwidth_out;
1448   GNUNET_SCHEDULER_cancel (n->timeout_task);
1449   n->timeout_task =
1450       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1451                                     &neighbour_timeout_task, n);
1452   switch (n->state)
1453   {
1454   case S_NOT_CONNECTED:
1455   case S_CONNECT_SENT:
1456     msg_len = sizeof (struct SessionConnectMessage);
1457     connect_msg.header.size = htons (msg_len);
1458     connect_msg.header.type =
1459         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1460     connect_msg.reserved = htonl (0);
1461     connect_msg.timestamp =
1462         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1463
1464     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1465     cc->session = session;
1466     cc->address = GNUNET_HELLO_address_copy (address);
1467     ret =
1468         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1469                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1470                           address, GNUNET_YES, &send_connect_continuation, cc);
1471     return GNUNET_NO;
1472   case S_CONNECT_RECV:
1473     /* We received a CONNECT message and asked ATS for an address */
1474     msg_len = sizeof (struct SessionConnectMessage);
1475     connect_msg.header.size = htons (msg_len);
1476     connect_msg.header.type =
1477         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1478     connect_msg.reserved = htonl (0);
1479     connect_msg.timestamp =
1480         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1481     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1482     cc->session = session;
1483     cc->address = GNUNET_HELLO_address_copy (address);
1484     ret =
1485         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1486                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1487                           address, GNUNET_YES, &send_connect_ack_continuation,
1488                           cc);
1489     return GNUNET_NO;
1490   case S_CONNECTED:
1491   case S_FAST_RECONNECT:
1492     /* connected peer is switching addresses or tries fast reconnect */
1493     msg_len = sizeof (struct SessionConnectMessage);
1494     connect_msg.header.size = htons (msg_len);
1495     connect_msg.header.type =
1496         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1497     connect_msg.reserved = htonl (0);
1498     connect_msg.timestamp =
1499         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1500     cc = GNUNET_malloc (sizeof (struct ContinutionContext));
1501     cc->session = session;
1502     cc->address = GNUNET_HELLO_address_copy (address);
1503     ret =
1504         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1505                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1506                           address, GNUNET_YES,
1507                           &send_switch_address_continuation, cc);
1508     if (ret == GNUNET_SYSERR)
1509     {
1510       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1511                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1512                   GNUNET_i2s (peer), GST_plugins_a2s (address), session);
1513     }
1514     return GNUNET_NO;
1515   default:
1516     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1517                 "Invalid connection state to switch addresses %u \n", n->state);
1518     GNUNET_break_op (0);
1519     return GNUNET_NO;
1520   }
1521 }
1522
1523
1524 /**
1525  * Obtain current latency information for the given neighbour.
1526  *
1527  * @param peer
1528  * @return observed latency of the address, FOREVER if the address was
1529  *         never successfully validated
1530  */
1531 struct GNUNET_TIME_Relative
1532 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1533 {
1534   struct NeighbourMapEntry *n;
1535
1536   n = lookup_neighbour (peer);
1537   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1538     return GNUNET_TIME_UNIT_FOREVER_REL;
1539
1540   return n->latency;
1541 }
1542
1543 /**
1544  * Obtain current address information for the given neighbour.
1545  *
1546  * @param peer
1547  * @return address currently used
1548  */
1549 struct GNUNET_HELLO_Address *
1550 GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer)
1551 {
1552   struct NeighbourMapEntry *n;
1553
1554   n = lookup_neighbour (peer);
1555   if ((NULL == n) || ((n->address == NULL) && (n->session == NULL)))
1556     return NULL;
1557
1558   return n->address;
1559 }
1560
1561
1562
1563 /**
1564  * Create an entry in the neighbour map for the given peer
1565  *
1566  * @param peer peer to create an entry for
1567  * @return new neighbour map entry
1568  */
1569 static struct NeighbourMapEntry *
1570 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1571 {
1572   struct NeighbourMapEntry *n;
1573
1574 #if DEBUG_TRANSPORT
1575   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1576               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1577 #endif
1578   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1579   n->id = *peer;
1580   n->state = S_NOT_CONNECTED;
1581   n->latency = GNUNET_TIME_relative_get_forever ();
1582   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1583                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1584                                  MAX_BANDWIDTH_CARRY_S);
1585   n->timeout_task =
1586       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1587                                     &neighbour_timeout_task, n);
1588   GNUNET_assert (GNUNET_OK ==
1589                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1590                                                     &n->id.hashPubKey, n,
1591                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1592   return n;
1593 }
1594
1595
1596 /**
1597  * Try to create a connection to the given target (eventually).
1598  *
1599  * @param target peer to try to connect to
1600  */
1601 void
1602 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1603 {
1604   struct NeighbourMapEntry *n;
1605
1606   // This can happen during shutdown
1607   if (neighbours == NULL)
1608   {
1609     return;
1610   }
1611 #if DEBUG_TRANSPORT
1612   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1613               GNUNET_i2s (target));
1614 #endif
1615   if (0 ==
1616       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1617   {
1618     /* my own hello */
1619     return;
1620   }
1621   n = lookup_neighbour (target);
1622
1623   if (NULL != n)
1624   {
1625     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1626       return;                   /* already connecting or connected */
1627     if (is_disconnecting (n))
1628       change_state (n, S_NOT_CONNECTED);
1629   }
1630
1631
1632   if (n == NULL)
1633     n = setup_neighbour (target);
1634 #if DEBUG_TRANSPORT
1635   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636               "Asking ATS for suggested address to connect to peer `%s'\n",
1637               GNUNET_i2s (&n->id));
1638 #endif
1639
1640   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1641 }
1642
1643 /**
1644  * Test if we're connected to the given peer.
1645  *
1646  * @param target peer to test
1647  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1648  */
1649 int
1650 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1651 {
1652   struct NeighbourMapEntry *n;
1653
1654   // This can happen during shutdown
1655   if (neighbours == NULL)
1656   {
1657     return GNUNET_NO;
1658   }
1659
1660   n = lookup_neighbour (target);
1661
1662   if ((NULL == n) || (S_CONNECTED != n->state))
1663     return GNUNET_NO;           /* not connected */
1664   return GNUNET_YES;
1665 }
1666
1667 /**
1668  * A session was terminated. Take note.
1669  *
1670  * @param peer identity of the peer where the session died
1671  * @param session session that is gone
1672  */
1673 void
1674 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1675                                    struct Session *session)
1676 {
1677   struct NeighbourMapEntry *n;
1678
1679   if (neighbours == NULL)
1680   {
1681     /* This can happen during shutdown */
1682     return;
1683   }
1684
1685 #if DEBUG_TRANSPORT
1686   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1687               session, GNUNET_i2s (peer));
1688 #endif
1689
1690   n = lookup_neighbour (peer);
1691   if (NULL == n)
1692     return;
1693   if (session != n->session)
1694     return;                     /* doesn't affect us */
1695   if (n->state == S_CONNECTED)
1696   {
1697     if (n->address_state == USED)
1698     {
1699       GST_validation_set_address_use (n->address, n->session, GNUNET_NO);
1700       GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1701       n->address_state = UNUSED;
1702     }
1703   }
1704
1705   if (NULL != n->address)
1706   {
1707     GNUNET_HELLO_address_free (n->address);
1708     n->address = NULL;
1709   }
1710   n->session = NULL;
1711
1712   /* not connected anymore anyway, shouldn't matter */
1713   if (S_CONNECTED != n->state)
1714     return;
1715
1716   if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK)
1717   {
1718     GNUNET_SCHEDULER_cancel (n->keepalive_task);
1719     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
1720     n->expect_latency_response = GNUNET_NO;
1721   }
1722
1723   /* connected, try fast reconnect */
1724   /* statistics "transport" : "# peers connected" -= 1
1725    * neighbours_connected -= 1
1726    * BUT: no disconnect_cb to notify clients about disconnect
1727    */
1728 #if DEBUG_TRANSPORT
1729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n",
1730               GNUNET_i2s (peer));
1731 #endif
1732   GNUNET_assert (neighbours_connected > 0);
1733   change_state (n, S_FAST_RECONNECT);
1734   neighbours_connected--;
1735   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
1736                             GNUNET_NO);
1737
1738
1739   /* We are connected, so ask ATS to switch addresses */
1740   GNUNET_SCHEDULER_cancel (n->timeout_task);
1741   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1742                                     &neighbour_timeout_task, n);
1743   /* try QUICKLY to re-establish a connection, reduce timeout! */
1744   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1745     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1746   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1747                                     n);
1748   GNUNET_ATS_suggest_address (GST_ats, peer);
1749 }
1750
1751
1752 /**
1753  * Transmit a message to the given target using the active connection.
1754  *
1755  * @param target destination
1756  * @param msg message to send
1757  * @param msg_size number of bytes in msg
1758  * @param timeout when to fail with timeout
1759  * @param cont function to call when done
1760  * @param cont_cls closure for 'cont'
1761  */
1762 void
1763 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1764                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1765                      GST_NeighbourSendContinuation cont, void *cont_cls)
1766 {
1767   struct NeighbourMapEntry *n;
1768   struct MessageQueue *mq;
1769
1770   // This can happen during shutdown
1771   if (neighbours == NULL)
1772   {
1773     return;
1774   }
1775
1776   n = lookup_neighbour (target);
1777   if ((n == NULL) || (!is_connected (n)))
1778   {
1779     GNUNET_STATISTICS_update (GST_stats,
1780                               gettext_noop
1781                               ("# messages not sent (no such peer or not connected)"),
1782                               1, GNUNET_NO);
1783 #if DEBUG_TRANSPORT
1784     if (n == NULL)
1785       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1786                   "Could not send message to peer `%s': unknown neighbour",
1787                   GNUNET_i2s (target));
1788     else if (!is_connected (n))
1789       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1790                   "Could not send message to peer `%s': not connected\n",
1791                   GNUNET_i2s (target));
1792 #endif
1793     if (NULL != cont)
1794       cont (cont_cls, GNUNET_SYSERR);
1795     return;
1796   }
1797
1798   if ((n->session == NULL) && (n->address == NULL))
1799   {
1800     GNUNET_STATISTICS_update (GST_stats,
1801                               gettext_noop
1802                               ("# messages not sent (no such peer or not connected)"),
1803                               1, GNUNET_NO);
1804 #if DEBUG_TRANSPORT
1805     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1806                 "Could not send message to peer `%s': no address available\n",
1807                 GNUNET_i2s (target));
1808 #endif
1809
1810     if (NULL != cont)
1811       cont (cont_cls, GNUNET_SYSERR);
1812     return;
1813   }
1814
1815   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1816   GNUNET_STATISTICS_update (GST_stats,
1817                             gettext_noop
1818                             ("# bytes in message queue for other peers"),
1819                             msg_size, GNUNET_NO);
1820   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1821   mq->cont = cont;
1822   mq->cont_cls = cont_cls;
1823   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1824   memcpy (&mq[1], msg, msg_size);
1825   mq->message_buf = (const char *) &mq[1];
1826   mq->message_buf_size = msg_size;
1827   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1828   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1829
1830   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1831       (NULL == n->is_active))
1832     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1833 }
1834
1835
1836 /**
1837  * We have received a message from the given sender.  How long should
1838  * we delay before receiving more?  (Also used to keep the peer marked
1839  * as live).
1840  *
1841  * @param sender sender of the message
1842  * @param size size of the message
1843  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1844  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1845  *                   GNUNET_SYSERR if the connection is not fully up yet
1846  * @return how long to wait before reading more from this sender
1847  */
1848 struct GNUNET_TIME_Relative
1849 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1850                                         *sender, ssize_t size, int *do_forward)
1851 {
1852   struct NeighbourMapEntry *n;
1853   struct GNUNET_TIME_Relative ret;
1854
1855   // This can happen during shutdown
1856   if (neighbours == NULL)
1857   {
1858     return GNUNET_TIME_UNIT_FOREVER_REL;
1859   }
1860
1861   n = lookup_neighbour (sender);
1862   if (n == NULL)
1863   {
1864     GST_neighbours_try_connect (sender);
1865     n = lookup_neighbour (sender);
1866     if (NULL == n)
1867     {
1868       GNUNET_STATISTICS_update (GST_stats,
1869                                 gettext_noop
1870                                 ("# messages discarded due to lack of neighbour record"),
1871                                 1, GNUNET_NO);
1872       *do_forward = GNUNET_NO;
1873       return GNUNET_TIME_UNIT_ZERO;
1874     }
1875   }
1876   if (!is_connected (n))
1877   {
1878     *do_forward = GNUNET_SYSERR;
1879     return GNUNET_TIME_UNIT_ZERO;
1880   }
1881   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1882   {
1883     n->quota_violation_count++;
1884 #if DEBUG_TRANSPORT
1885     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1886                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1887                 n->in_tracker.available_bytes_per_s__,
1888                 n->quota_violation_count);
1889 #endif
1890     /* Discount 32k per violation */
1891     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1892   }
1893   else
1894   {
1895     if (n->quota_violation_count > 0)
1896     {
1897       /* try to add 32k back */
1898       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1899       n->quota_violation_count--;
1900     }
1901   }
1902   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1903   {
1904     GNUNET_STATISTICS_update (GST_stats,
1905                               gettext_noop
1906                               ("# bandwidth quota violations by other peers"),
1907                               1, GNUNET_NO);
1908     *do_forward = GNUNET_NO;
1909     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1910   }
1911   *do_forward = GNUNET_YES;
1912   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1913   if (ret.rel_value > 0)
1914   {
1915 #if DEBUG_TRANSPORT
1916     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1918                 (unsigned long long) n->in_tracker.
1919                 consumption_since_last_update__,
1920                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1921                 (unsigned long long) ret.rel_value);
1922 #endif
1923     GNUNET_STATISTICS_update (GST_stats,
1924                               gettext_noop ("# ms throttling suggested"),
1925                               (int64_t) ret.rel_value, GNUNET_NO);
1926   }
1927   return ret;
1928 }
1929
1930
1931 /**
1932  * Keep the connection to the given neighbour alive longer,
1933  * we received a KEEPALIVE (or equivalent).
1934  *
1935  * @param neighbour neighbour to keep alive
1936  */
1937 void
1938 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1939 {
1940   struct NeighbourMapEntry *n;
1941
1942   // This can happen during shutdown
1943   if (neighbours == NULL)
1944   {
1945     return;
1946   }
1947
1948   n = lookup_neighbour (neighbour);
1949   if (NULL == n)
1950   {
1951     GNUNET_STATISTICS_update (GST_stats,
1952                               gettext_noop
1953                               ("# KEEPALIVE messages discarded (not connected)"),
1954                               1, GNUNET_NO);
1955     return;
1956   }
1957   GNUNET_SCHEDULER_cancel (n->timeout_task);
1958   n->timeout_task =
1959       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1960                                     &neighbour_timeout_task, n);
1961
1962   /* send reply to measure latency */
1963   if (S_CONNECTED != n->state)
1964     return;
1965
1966   struct GNUNET_MessageHeader m;
1967
1968   m.size = htons (sizeof (struct GNUNET_MessageHeader));
1969   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE);
1970
1971   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
1972                     UINT32_MAX /* priority */ ,
1973                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
1974                     GNUNET_YES, NULL, NULL);
1975 }
1976
1977 /**
1978  * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency
1979  * to this peer
1980  *
1981  * @param neighbour neighbour to keep alive
1982  * @param ats performance data
1983  * @param ats_count number of entries in ats
1984  */
1985 void
1986 GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour,
1987                                    const struct GNUNET_ATS_Information *ats,
1988                                    uint32_t ats_count)
1989 {
1990   struct NeighbourMapEntry *n;
1991   struct GNUNET_ATS_Information *ats_new;
1992   uint32_t latency;
1993
1994   if (neighbours == NULL)
1995   {
1996     // This can happen during shutdown
1997     return;
1998   }
1999
2000   n = lookup_neighbour (neighbour);
2001   if ((NULL == n) || (n->state != S_CONNECTED))
2002   {
2003     GNUNET_STATISTICS_update (GST_stats,
2004                               gettext_noop
2005                               ("# KEEPALIVE_RESPONSE messages discarded (not connected)"),
2006                               1, GNUNET_NO);
2007     return;
2008   }
2009   if (n->expect_latency_response != GNUNET_YES)
2010   {
2011     GNUNET_STATISTICS_update (GST_stats,
2012                               gettext_noop
2013                               ("# KEEPALIVE_RESPONSE messages discarded (not expected)"),
2014                               1, GNUNET_NO);
2015     return;
2016   }
2017   n->expect_latency_response = GNUNET_NO;
2018
2019   GNUNET_assert (n->keep_alive_sent.abs_value !=
2020                  GNUNET_TIME_absolute_get_zero ().abs_value);
2021   n->latency =
2022       GNUNET_TIME_absolute_get_difference (n->keep_alive_sent,
2023                                            GNUNET_TIME_absolute_get ());
2024 #if DEBUG_TRANSPORT
2025   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n",
2026               GNUNET_i2s (&n->id), n->latency.rel_value);
2027 #endif
2028
2029
2030   if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value)
2031   {
2032     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count);
2033   }
2034   else
2035   {
2036     ats_new =
2037         GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) *
2038                        (ats_count + 1));
2039     memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2040
2041     /* add latency */
2042     ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2043     if (n->latency.rel_value > UINT32_MAX)
2044       latency = UINT32_MAX;
2045     else
2046       latency = n->latency.rel_value;
2047     ats_new[ats_count].value = htonl (latency);
2048
2049     GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new,
2050                                ats_count + 1);
2051     GNUNET_free (ats_new);
2052   }
2053 }
2054
2055
2056 /**
2057  * Change the incoming quota for the given peer.
2058  *
2059  * @param neighbour identity of peer to change qutoa for
2060  * @param quota new quota
2061  */
2062 void
2063 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
2064                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
2065 {
2066   struct NeighbourMapEntry *n;
2067
2068   // This can happen during shutdown
2069   if (neighbours == NULL)
2070   {
2071     return;
2072   }
2073
2074   n = lookup_neighbour (neighbour);
2075   if (n == NULL)
2076   {
2077     GNUNET_STATISTICS_update (GST_stats,
2078                               gettext_noop
2079                               ("# SET QUOTA messages ignored (no such peer)"),
2080                               1, GNUNET_NO);
2081     return;
2082   }
2083 #if DEBUG_TRANSPORT
2084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2085               "Setting inbound quota of %u Bps for peer `%s' to all clients\n",
2086               ntohl (quota.value__), GNUNET_i2s (&n->id));
2087 #endif
2088   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
2089   if (0 != ntohl (quota.value__))
2090     return;
2091 #if DEBUG_TRANSPORT
2092   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
2093               GNUNET_i2s (&n->id), "SET_QUOTA");
2094 #endif
2095   if (is_connected (n))
2096     GNUNET_STATISTICS_update (GST_stats,
2097                               gettext_noop ("# disconnects due to quota of 0"),
2098                               1, GNUNET_NO);
2099   disconnect_neighbour (n);
2100 }
2101
2102
2103 /**
2104  * Closure for the neighbours_iterate function.
2105  */
2106 struct IteratorContext
2107 {
2108   /**
2109    * Function to call on each connected neighbour.
2110    */
2111   GST_NeighbourIterator cb;
2112
2113   /**
2114    * Closure for 'cb'.
2115    */
2116   void *cb_cls;
2117 };
2118
2119
2120 /**
2121  * Call the callback from the closure for each connected neighbour.
2122  *
2123  * @param cls the 'struct IteratorContext'
2124  * @param key the hash of the public key of the neighbour
2125  * @param value the 'struct NeighbourMapEntry'
2126  * @return GNUNET_OK (continue to iterate)
2127  */
2128 static int
2129 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
2130 {
2131   struct IteratorContext *ic = cls;
2132   struct NeighbourMapEntry *n = value;
2133
2134   if (!is_connected (n))
2135     return GNUNET_OK;
2136
2137   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
2138   return GNUNET_OK;
2139 }
2140
2141
2142 /**
2143  * Iterate over all connected neighbours.
2144  *
2145  * @param cb function to call
2146  * @param cb_cls closure for cb
2147  */
2148 void
2149 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
2150 {
2151   struct IteratorContext ic;
2152
2153   // This can happen during shutdown
2154   if (neighbours == NULL)
2155   {
2156     return;
2157   }
2158
2159   ic.cb = cb;
2160   ic.cb_cls = cb_cls;
2161   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
2162 }
2163
2164 /**
2165  * If we have an active connection to the given target, it must be shutdown.
2166  *
2167  * @param target peer to disconnect from
2168  */
2169 void
2170 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
2171 {
2172   struct NeighbourMapEntry *n;
2173
2174   // This can happen during shutdown
2175   if (neighbours == NULL)
2176   {
2177     return;
2178   }
2179
2180   n = lookup_neighbour (target);
2181   if (NULL == n)
2182     return;                     /* not active */
2183   disconnect_neighbour (n);
2184 }
2185
2186
2187 /**
2188  * We received a disconnect message from the given peer,
2189  * validate and process.
2190  *
2191  * @param peer sender of the message
2192  * @param msg the disconnect message
2193  */
2194 void
2195 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
2196                                           *peer,
2197                                           const struct GNUNET_MessageHeader
2198                                           *msg)
2199 {
2200   struct NeighbourMapEntry *n;
2201   const struct SessionDisconnectMessage *sdm;
2202   GNUNET_HashCode hc;
2203
2204 #if DEBUG_TRANSPORT
2205   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2206               "Received DISCONNECT message from peer `%s'\n",
2207               GNUNET_i2s (peer));
2208 #endif
2209
2210   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
2211   {
2212     // GNUNET_break_op (0);
2213     GNUNET_STATISTICS_update (GST_stats,
2214                               gettext_noop
2215                               ("# disconnect messages ignored (old format)"), 1,
2216                               GNUNET_NO);
2217     return;
2218   }
2219   sdm = (const struct SessionDisconnectMessage *) msg;
2220   n = lookup_neighbour (peer);
2221   if (NULL == n)
2222     return;                     /* gone already */
2223   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
2224       n->connect_ts.abs_value)
2225   {
2226     GNUNET_STATISTICS_update (GST_stats,
2227                               gettext_noop
2228                               ("# disconnect messages ignored (timestamp)"), 1,
2229                               GNUNET_NO);
2230     return;
2231   }
2232   GNUNET_CRYPTO_hash (&sdm->public_key,
2233                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2234                       &hc);
2235   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
2236   {
2237     GNUNET_break_op (0);
2238     return;
2239   }
2240   if (ntohl (sdm->purpose.size) !=
2241       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
2242       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
2243       sizeof (struct GNUNET_TIME_AbsoluteNBO))
2244   {
2245     GNUNET_break_op (0);
2246     return;
2247   }
2248   if (GNUNET_OK !=
2249       GNUNET_CRYPTO_rsa_verify
2250       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
2251        &sdm->signature, &sdm->public_key))
2252   {
2253     GNUNET_break_op (0);
2254     return;
2255   }
2256   GST_neighbours_force_disconnect (peer);
2257 }
2258
2259
2260 /**
2261  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
2262  * Consider switching to it.
2263  *
2264  * @param message possibly a 'struct SessionConnectMessage' (check format)
2265  * @param peer identity of the peer to switch the address for
2266  * @param address address of the other peer, NULL if other peer
2267  *                       connected to us
2268  * @param session session to use (or NULL)
2269  * @param ats performance data
2270  * @param ats_count number of entries in ats
2271  */
2272 void
2273 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
2274                                    const struct GNUNET_PeerIdentity *peer,
2275                                    const struct GNUNET_HELLO_Address *address,
2276                                    struct Session *session,
2277                                    const struct GNUNET_ATS_Information *ats,
2278                                    uint32_t ats_count)
2279 {
2280   const struct SessionConnectMessage *scm;
2281   struct GNUNET_MessageHeader msg;
2282   struct NeighbourMapEntry *n;
2283   size_t msg_len;
2284   size_t ret;
2285
2286 #if DEBUG_TRANSPORT
2287   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2288               "Received CONNECT_ACK message from peer `%s'\n",
2289               GNUNET_i2s (peer));
2290 #endif
2291
2292   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2293   {
2294     GNUNET_break_op (0);
2295     return;
2296   }
2297   scm = (const struct SessionConnectMessage *) message;
2298   GNUNET_break_op (ntohl (scm->reserved) == 0);
2299   n = lookup_neighbour (peer);
2300   if (NULL == n)
2301   {
2302     /* we did not send 'CONNECT' -- at least not recently */
2303     GNUNET_STATISTICS_update (GST_stats,
2304                               gettext_noop
2305                               ("# unexpected CONNECT_ACK messages (no peer)"),
2306                               1, GNUNET_NO);
2307     return;
2308   }
2309
2310   /* Additional check
2311    *
2312    * ((n->state != S_CONNECT_RECV) && (n->address != NULL)):
2313    *
2314    * We also received an CONNECT message, switched from SENDT to RECV and
2315    * ATS already suggested us an address after a successful blacklist check
2316    */
2317   if ((n->state != S_CONNECT_SENT) &&
2318       ((n->state != S_CONNECT_RECV) && (n->address != NULL)))
2319   {
2320     GNUNET_STATISTICS_update (GST_stats,
2321                               gettext_noop
2322                               ("# unexpected CONNECT_ACK messages"), 1,
2323                               GNUNET_NO);
2324     return;
2325   }
2326
2327   change_state (n, S_CONNECTED);
2328
2329   if (NULL != session)
2330   {
2331     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2332                      "transport-ats",
2333                      "Giving ATS session %p of plugin %s for peer %s\n",
2334                      session, address->transport_name, GNUNET_i2s (peer));
2335   }
2336   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2337   GNUNET_assert (NULL != n->address);
2338   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2339   {
2340     if (n->session == NULL)
2341       n->session = session;
2342     GST_validation_set_address_use (n->address, n->session, GNUNET_YES);
2343     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2344     n->address_state = USED;
2345
2346   }
2347
2348   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2349
2350   /* send ACK (ACK) */
2351   msg_len = sizeof (msg);
2352   msg.size = htons (msg_len);
2353   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2354
2355   ret =
2356       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2357                         GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address,
2358                         GNUNET_YES, NULL, NULL);
2359
2360   if (ret == GNUNET_SYSERR)
2361     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2362                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2363                 GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session);
2364
2365
2366   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2367     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2368
2369   neighbours_connected++;
2370   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2371                             GNUNET_NO);
2372 #if DEBUG_TRANSPORT
2373   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2374               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2375               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2376               __LINE__);
2377 #endif
2378   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2379   send_outbound_quota (peer, n->bandwidth_out);
2380
2381 }
2382
2383
2384 void
2385 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2386                            const struct GNUNET_PeerIdentity *peer,
2387                            const struct GNUNET_HELLO_Address *address,
2388                            struct Session *session,
2389                            const struct GNUNET_ATS_Information *ats,
2390                            uint32_t ats_count)
2391 {
2392   struct NeighbourMapEntry *n;
2393
2394 #if DEBUG_TRANSPORT
2395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n",
2396               GNUNET_i2s (peer));
2397 #endif
2398
2399   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2400   {
2401     GNUNET_break_op (0);
2402     return;
2403   }
2404   n = lookup_neighbour (peer);
2405   if (NULL == n)
2406   {
2407     send_disconnect (peer, address, session);
2408     GNUNET_break (0);
2409     return;
2410   }
2411   if (S_CONNECTED == n->state)
2412     return;
2413   if (!is_connecting (n))
2414   {
2415     GNUNET_STATISTICS_update (GST_stats,
2416                               gettext_noop ("# unexpected ACK messages"), 1,
2417                               GNUNET_NO);
2418     return;
2419   }
2420   change_state (n, S_CONNECTED);
2421   if (NULL != session)
2422     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2423                      "transport-ats",
2424                      "Giving ATS session %p of plugin %s for peer %s\n",
2425                      session, address->transport_name, GNUNET_i2s (peer));
2426   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2427   GNUNET_assert (n->address != NULL);
2428   if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address)))
2429   {
2430     if (n->session == NULL)
2431       n->session = session;
2432     GST_validation_set_address_use (n->address, n->session, GNUNET_YES);
2433     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2434     n->address_state = USED;
2435   }
2436
2437   neighbours_connected++;
2438   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2439                             GNUNET_NO);
2440
2441   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2442   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2443     n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n);
2444 #if DEBUG_TRANSPORT
2445   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2447               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session,
2448               __LINE__);
2449 #endif
2450   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2451   send_outbound_quota (peer, n->bandwidth_out);
2452 }
2453
2454 struct BlackListCheckContext
2455 {
2456   struct GNUNET_ATS_Information *ats;
2457
2458   uint32_t ats_count;
2459
2460   struct Session *session;
2461
2462   struct GNUNET_HELLO_Address *address;
2463
2464   struct GNUNET_TIME_Absolute ts;
2465 };
2466
2467
2468 static void
2469 handle_connect_blacklist_cont (void *cls,
2470                                const struct GNUNET_PeerIdentity *peer,
2471                                int result)
2472 {
2473   struct NeighbourMapEntry *n;
2474   struct BlackListCheckContext *bcc = cls;
2475
2476 #if DEBUG_TRANSPORT
2477   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2478               "Blacklist check due to CONNECT message: `%s'\n",
2479               GNUNET_i2s (peer),
2480               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2481 #endif
2482
2483   /* not allowed */
2484   if (GNUNET_OK != result)
2485   {
2486     GNUNET_HELLO_address_free (bcc->address);
2487     GNUNET_free (bcc);
2488     return;
2489   }
2490
2491   n = lookup_neighbour (peer);
2492   if (NULL == n)
2493     n = setup_neighbour (peer);
2494
2495   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2496   {
2497     if (NULL != bcc->session)
2498       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2499                        "transport-ats",
2500                        "Giving ATS session %p of address `%s' for peer %s\n",
2501                        bcc->session, GST_plugins_a2s (bcc->address),
2502                        GNUNET_i2s (peer));
2503     /* Tell ATS about the session, so ATS can suggest it if it likes it. */
2504
2505     GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats,
2506                                bcc->ats_count);
2507     n->connect_ts = bcc->ts;
2508   }
2509
2510   GNUNET_HELLO_address_free (bcc->address);
2511   GNUNET_free (bcc);
2512
2513   if (n->state != S_CONNECT_RECV)
2514     change_state (n, S_CONNECT_RECV);
2515
2516
2517   /* Ask ATS for an address to connect via that address */
2518   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2519     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2520   n->ats_suggest =
2521       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2522                                     n);
2523   GNUNET_ATS_suggest_address (GST_ats, peer);
2524 }
2525
2526 /**
2527  * We received a 'SESSION_CONNECT' message from the other peer.
2528  * Consider switching to it.
2529  *
2530  * @param message possibly a 'struct SessionConnectMessage' (check format)
2531  * @param peer identity of the peer to switch the address for
2532  * @param address address of the other peer, NULL if other peer
2533  *                       connected to us
2534  * @param session session to use (or NULL)
2535  * @param ats performance data
2536  * @param ats_count number of entries in ats (excluding 0-termination)
2537  */
2538 void
2539 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2540                                const struct GNUNET_PeerIdentity *peer,
2541                                const struct GNUNET_HELLO_Address *address,
2542                                struct Session *session,
2543                                const struct GNUNET_ATS_Information *ats,
2544                                uint32_t ats_count)
2545 {
2546   const struct SessionConnectMessage *scm;
2547   struct BlackListCheckContext *bcc = NULL;
2548   struct NeighbourMapEntry *n;
2549
2550 #if DEBUG_TRANSPORT
2551   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2552               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2553 #endif
2554
2555   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2556   {
2557     GNUNET_break_op (0);
2558     return;
2559   }
2560
2561   scm = (const struct SessionConnectMessage *) message;
2562   GNUNET_break_op (ntohl (scm->reserved) == 0);
2563
2564   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2565
2566   n = lookup_neighbour (peer);
2567   if ((n != NULL) && (S_CONNECTED == n->state))
2568   {
2569     /* connected peer switches addresses */
2570     return;
2571   }
2572
2573
2574   /* we are not connected to this peer */
2575   /* do blacklist check */
2576   bcc =
2577       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2578                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2579   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2580   bcc->ats_count = ats_count + 1;
2581   bcc->address = GNUNET_HELLO_address_copy (address);
2582   bcc->session = session;
2583   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2584   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2585   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2586   bcc->ats[ats_count].value =
2587       htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2588   GST_blacklist_test_allowed (peer, address->transport_name,
2589                               handle_connect_blacklist_cont, bcc);
2590 }
2591
2592
2593 /* end of file gnunet-service-transport_neighbours.c */