2e3523a8ea72c479918361bcd78c03243c1329d0
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours?
52  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
53  * send 3 keepalives in each interval, so 3 messages would need to be
54  * lost in a row for a disconnect).
55  */
56 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
57
58
59 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
60
61
62 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
63
64
65 /**
66  * Entry in neighbours.
67  */
68 struct NeighbourMapEntry;
69
70 /**
71  * Message a peer sends to another to indicate its
72  * preference for communicating via a particular
73  * session (and the desire to establish a real
74  * connection).
75  */
76 struct SessionConnectMessage
77 {
78   /**
79    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
80    */
81   struct GNUNET_MessageHeader header;
82
83   /**
84    * Always zero.
85    */
86   uint32_t reserved GNUNET_PACKED;
87
88   /**
89    * Absolute time at the sender.  Only the most recent connect
90    * message implies which session is preferred by the sender.
91    */
92   struct GNUNET_TIME_AbsoluteNBO timestamp;
93
94 };
95
96
97 struct SessionDisconnectMessage
98 {
99   /**
100    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Always zero.
106    */
107   uint32_t reserved GNUNET_PACKED;
108
109   /**
110    * Purpose of the signature.  Extends over the timestamp.
111    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
112    */
113   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
114
115   /**
116    * Absolute time at the sender.  Only the most recent connect
117    * message implies which session is preferred by the sender.
118    */
119   struct GNUNET_TIME_AbsoluteNBO timestamp;
120
121   /**
122    * Public key of the sender.
123    */
124   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
125
126   /**
127    * Signature of the peer that sends us the disconnect.  Only
128    * valid if the timestamp is AFTER the timestamp from the
129    * corresponding 'CONNECT' message.
130    */
131   struct GNUNET_CRYPTO_RsaSignature signature;
132
133 };
134
135
136 /**
137  * For each neighbour we keep a list of messages
138  * that we still want to transmit to the neighbour.
139  */
140 struct MessageQueue
141 {
142
143   /**
144    * This is a doubly linked list.
145    */
146   struct MessageQueue *next;
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *prev;
152
153   /**
154    * Once this message is actively being transmitted, which
155    * neighbour is it associated with?
156    */
157   struct NeighbourMapEntry *n;
158
159   /**
160    * Function to call once we're done.
161    */
162   GST_NeighbourSendContinuation cont;
163
164   /**
165    * Closure for 'cont'
166    */
167   void *cont_cls;
168
169   /**
170    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
171    * stuck together in memory.  Allocated at the end of this struct.
172    */
173   const char *message_buf;
174
175   /**
176    * Size of the message buf
177    */
178   size_t message_buf_size;
179
180   /**
181    * At what time should we fail?
182    */
183   struct GNUNET_TIME_Absolute timeout;
184
185 };
186
187
188 enum State
189 {
190   /**
191    * fresh peer or completely disconnected 
192    */
193   S_NOT_CONNECTED,
194
195   /**
196    * sent CONNECT message to other peer, waiting for CONNECT_ACK 
197    */
198   S_CONNECT_SENT,
199
200   /**
201    * received CONNECT message to other peer, sending CONNECT_ACK 
202    */
203   S_CONNECT_RECV,
204
205   /**
206    * received ACK or payload 
207    */
208   S_CONNECTED,
209
210   /**
211    * Disconnect in progress 
212    */
213   S_DISCONNECT
214 };
215
216
217 /**
218  * Entry in neighbours.
219  */
220 struct NeighbourMapEntry
221 {
222
223   /**
224    * Head of list of messages we would like to send to this peer;
225    * must contain at most one message per client.
226    */
227   struct MessageQueue *messages_head;
228
229   /**
230    * Tail of list of messages we would like to send to this peer; must
231    * contain at most one message per client.
232    */
233   struct MessageQueue *messages_tail;
234
235   /**
236    * Performance data for the peer.
237    */
238   //struct GNUNET_ATS_Information *ats;
239
240   /**
241    * Are we currently trying to send a message? If so, which one?
242    */
243   struct MessageQueue *is_active;
244
245   /**
246    * Active session for communicating with the peer.
247    */
248   struct Session *session;
249
250   /**
251    * Address we currently use.
252    */
253   struct GNUNET_HELLO_Address *address;
254
255   /**
256    * Identity of this neighbour.
257    */
258   struct GNUNET_PeerIdentity id;
259
260   /**
261    * ID of task scheduled to run when this peer is about to
262    * time out (will free resources associated with the peer).
263    */
264   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
265
266   /**
267    * ID of task scheduled to send keepalives.
268    */
269   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
270
271   /**
272    * ID of task scheduled to run when we should try transmitting
273    * the head of the message queue.
274    */
275   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
276
277   /**
278    * Tracker for inbound bandwidth.
279    */
280   struct GNUNET_BANDWIDTH_Tracker in_tracker;
281
282   /**
283    * Inbound bandwidth from ATS, activated when connection is up
284    */
285   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
286
287   /**
288    * Inbound bandwidth from ATS, activated when connection is up
289    */
290   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
291
292   /**
293    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
294    */
295   struct GNUNET_TIME_Absolute connect_ts;
296
297   /**
298    * Timeout for ATS
299    * We asked ATS for a new address for this peer
300    */
301   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
302
303   /**
304    * Task the resets the peer state after due to an pending
305    * unsuccessful connection setup
306    */
307   GNUNET_SCHEDULER_TaskIdentifier state_reset;
308
309   /**
310    * How often has the other peer (recently) violated the inbound
311    * traffic limit?  Incremented by 10 per violation, decremented by 1
312    * per non-violation (for each time interval).
313    */
314   unsigned int quota_violation_count;
315
316
317   /**
318    * The current state of the peer
319    * Element of enum State
320    */
321   int state;
322
323 };
324
325
326 /**
327  * All known neighbours and their HELLOs.
328  */
329 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
330
331 /**
332  * Closure for connect_notify_cb and disconnect_notify_cb
333  */
334 static void *callback_cls;
335
336 /**
337  * Function to call when we connected to a neighbour.
338  */
339 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
340
341 /**
342  * Function to call when we disconnected from a neighbour.
343  */
344 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
345
346 /**
347  * counter for connected neighbours
348  */
349 static int neighbours_connected;
350
351 /**
352  * Lookup a neighbour entry in the neighbours hash map.
353  *
354  * @param pid identity of the peer to look up
355  * @return the entry, NULL if there is no existing record
356  */
357 static struct NeighbourMapEntry *
358 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
359 {
360   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
361 }
362
363 #define change_state(n, state, ...) change (n, state, __LINE__)
364
365 static int
366 is_connecting (struct NeighbourMapEntry *n)
367 {
368   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
369     return GNUNET_YES;
370   return GNUNET_NO;
371 }
372
373 static int
374 is_connected (struct NeighbourMapEntry *n)
375 {
376   if (n->state == S_CONNECTED)
377     return GNUNET_YES;
378   return GNUNET_NO;
379 }
380
381 static int
382 is_disconnecting (struct NeighbourMapEntry *n)
383 {
384   if (n->state == S_DISCONNECT)
385     return GNUNET_YES;
386   return GNUNET_NO;
387 }
388
389 static const char *
390 print_state (int state)
391 {
392   switch (state)
393   {
394   case S_CONNECTED:
395     return "S_CONNECTED";
396     break;
397   case S_CONNECT_RECV:
398     return "S_CONNECT_RECV";
399     break;
400   case S_CONNECT_SENT:
401     return "S_CONNECT_SENT";
402     break;
403   case S_DISCONNECT:
404     return "S_DISCONNECT";
405     break;
406   case S_NOT_CONNECTED:
407     return "S_NOT_CONNECTED";
408     break;
409   default:
410     GNUNET_break (0);
411     break;
412   }
413   return NULL;
414 }
415
416 static int
417 change (struct NeighbourMapEntry *n, int state, int line);
418
419 static void
420 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
421
422 static void
423 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
424 {
425   struct NeighbourMapEntry *n = cls;
426
427   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
428
429 #if DEBUG_TRANSPORT
430 #endif
431   /* This jut a temporary debug message to check if a the value
432    * SETUP_CONNECTION_TIMEOUT was choosen to small for slow machines
433    */
434   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
435               "Information for developers: Connection to peer `%s' %s failed in state `%s', resetting connection attempt \n",
436               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
437               print_state (n->state));
438
439   GNUNET_STATISTICS_update (GST_stats,
440                             gettext_noop
441                             ("# failed connection attempts due to timeout"), 1,
442                             GNUNET_NO);
443
444   /* resetting state */
445   n->state = S_NOT_CONNECTED;
446
447   /* destroying address */
448   GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
449
450   /* request new address */
451   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
452     GNUNET_SCHEDULER_cancel (n->ats_suggest);
453   n->ats_suggest =
454       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
455                                     n);
456   GNUNET_ATS_suggest_address (GST_ats, &n->id);
457 }
458
459 static int
460 change (struct NeighbourMapEntry *n, int state, int line)
461 {
462   /* allowed transitions */
463   int allowed = GNUNET_NO;
464
465   switch (n->state)
466   {
467   case S_NOT_CONNECTED:
468     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
469         (state == S_DISCONNECT))
470       allowed = GNUNET_YES;
471     break;
472   case S_CONNECT_RECV:
473     allowed = GNUNET_YES;
474     break;
475   case S_CONNECT_SENT:
476     allowed = GNUNET_YES;
477     break;
478   case S_CONNECTED:
479     if (state == S_DISCONNECT)
480       allowed = GNUNET_YES;
481     break;
482   case S_DISCONNECT:
483     break;
484   default:
485     GNUNET_break (0);
486     break;
487   }
488   if (allowed == GNUNET_NO)
489   {
490     char *old = GNUNET_strdup (print_state (n->state));
491     char *new = GNUNET_strdup (print_state (state));
492     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
493                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
494                 new, line);
495     GNUNET_break (0);
496     GNUNET_free (old);
497     GNUNET_free (new);
498     return GNUNET_SYSERR;
499   }
500
501   n->state = state;
502 #if DEBUG_TRANSPORT
503   {
504     char *old = GNUNET_strdup (print_state (n->state));
505     char *new = GNUNET_strdup (print_state (state));
506     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
508                 GNUNET_i2s (&n->id), n, old, new, line);
509     GNUNET_free (old);
510     GNUNET_free (new);
511   }
512 #endif
513   switch (n->state)
514   {
515   case S_CONNECT_RECV:
516   case S_CONNECT_SENT:
517     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
518       GNUNET_SCHEDULER_cancel (n->state_reset);
519     n->state_reset =
520       GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
521                                     n);   
522     break;
523   case S_CONNECTED:
524   case S_NOT_CONNECTED:
525   case S_DISCONNECT:
526     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
527     {
528 #if DEBUG_TRANSPORT
529       char *old = GNUNET_strdup (print_state (n->state));
530       char *new = GNUNET_strdup (print_state (state));
531       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
533                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
534                   old, new);
535       GNUNET_free (old);
536       GNUNET_free (new);
537 #endif
538       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
539       GNUNET_SCHEDULER_cancel (n->state_reset);
540       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
541     }
542     break;
543   default:
544     GNUNET_assert (0);
545   }
546   
547
548
549   return GNUNET_OK;
550 }
551
552 static ssize_t
553 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
554                   size_t msgbuf_size, uint32_t priority,
555                   struct GNUNET_TIME_Relative timeout, struct Session *session,
556                   const struct GNUNET_HELLO_Address *address,
557                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
558                   void *cont_cls)
559 {
560   struct GNUNET_TRANSPORT_PluginFunctions *papi;
561   size_t ret = GNUNET_SYSERR;
562
563   /* FIXME : ats returns an address with all values 0 */
564   if (address == NULL)
565   {
566     if (cont != NULL)
567       cont (cont_cls, target, GNUNET_SYSERR);
568     return GNUNET_SYSERR;
569   }
570
571   if ((session == NULL) && (address == NULL))
572   {
573     if (cont != NULL)
574       cont (cont_cls, target, GNUNET_SYSERR);
575     return GNUNET_SYSERR;
576   }
577
578   papi = GST_plugins_find (address->transport_name);
579   if (papi == NULL)
580   {
581     if (cont != NULL)
582       cont (cont_cls, target, GNUNET_SYSERR);
583     return GNUNET_SYSERR;
584   }
585
586   ret =
587       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
588                   address->address, 
589                   address->address_length, GNUNET_YES, cont, cont_cls);
590
591   if (ret == -1)
592   {
593     if (cont != NULL)
594       cont (cont_cls, target, GNUNET_SYSERR);
595   }
596   return ret;
597 }
598
599 /**
600  * Task invoked to start a transmission to another peer.
601  *
602  * @param cls the 'struct NeighbourMapEntry'
603  * @param tc scheduler context
604  */
605 static void
606 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
607
608
609 /**
610  * We're done with our transmission attempt, continue processing.
611  *
612  * @param cls the 'struct MessageQueue' of the message
613  * @param receiver intended receiver
614  * @param success whether it worked or not
615  */
616 static void
617 transmit_send_continuation (void *cls,
618                             const struct GNUNET_PeerIdentity *receiver,
619                             int success)
620 {
621   struct MessageQueue *mq;
622   struct NeighbourMapEntry *n;
623
624   mq = cls;
625   n = mq->n;
626   if (NULL != n)
627   {
628     GNUNET_assert (n->is_active == mq);
629     n->is_active = NULL;
630     if (success == GNUNET_YES)
631     {
632       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
633       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
634     }
635   }
636 #if DEBUG_TRANSPORT
637   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
638               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
639               (success == GNUNET_OK) ? "successful" : "FAILED");
640 #endif
641   if (NULL != mq->cont)
642     mq->cont (mq->cont_cls, success);
643   GNUNET_free (mq);
644 }
645
646
647 /**
648  * Check the ready list for the given neighbour and if a plugin is
649  * ready for transmission (and if we have a message), do so!
650  *
651  * @param n target peer for which to transmit
652  */
653 static void
654 try_transmission_to_peer (struct NeighbourMapEntry *n)
655 {
656   struct MessageQueue *mq;
657   struct GNUNET_TIME_Relative timeout;
658   ssize_t ret;
659
660   if (n->is_active != NULL)
661   {
662     GNUNET_break (0);
663     return;                     /* transmission already pending */
664   }
665   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
666   {
667     GNUNET_break (0);
668     return;                     /* currently waiting for bandwidth */
669   }
670   while (NULL != (mq = n->messages_head))
671   {
672     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
673     if (timeout.rel_value > 0)
674       break;
675     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
676     n->is_active = mq;
677     mq->n = n;
678     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
679   }
680   if (NULL == mq)
681     return;                     /* no more messages */
682
683   if (GST_plugins_find (n->address->transport_name) == NULL)
684   {
685     GNUNET_break (0);
686     return;
687   }
688   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
689   n->is_active = mq;
690   mq->n = n;
691
692   if ( (n->session == NULL) && (NULL == n->address) )
693   {
694     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
695                 "No address for peer `%s'\n",
696                 GNUNET_i2s (&n->id));
697     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
698     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
699     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
700     return;
701   }
702
703   ret =
704       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
705                         timeout, n->session, n->address, 
706                         GNUNET_YES, &transmit_send_continuation,
707                         mq);
708   if (ret == -1)
709   {
710     /* failure, but 'send' would not call continuation in this case,
711      * so we need to do it here! */
712     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
713   }
714
715 }
716
717
718 /**
719  * Task invoked to start a transmission to another peer.
720  *
721  * @param cls the 'struct NeighbourMapEntry'
722  * @param tc scheduler context
723  */
724 static void
725 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
726 {
727   struct NeighbourMapEntry *n = cls;
728
729   GNUNET_assert (NULL != lookup_neighbour (&n->id));
730   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
731   try_transmission_to_peer (n);
732 }
733
734
735 /**
736  * Initialize the neighbours subsystem.
737  *
738  * @param cls closure for callbacks
739  * @param connect_cb function to call if we connect to a peer
740  * @param disconnect_cb function to call if we disconnect from a peer
741  */
742 void
743 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
744                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
745 {
746   callback_cls = cls;
747   connect_notify_cb = connect_cb;
748   disconnect_notify_cb = disconnect_cb;
749   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
750 }
751
752
753 static void
754 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
755                       int result)
756 {
757 #if DEBUG_TRANSPORT
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759               "Sending DISCONNECT message to peer `%4s': %i\n",
760               GNUNET_i2s (target), result);
761 #endif
762 }
763
764
765 static int
766 send_disconnect (const struct GNUNET_PeerIdentity *target,
767                  const struct GNUNET_HELLO_Address *address,
768                  struct Session *session)
769 {
770   size_t ret;
771   struct SessionDisconnectMessage disconnect_msg;
772
773 #if DEBUG_TRANSPORT
774   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775               "Sending DISCONNECT message to peer `%4s'\n",
776               GNUNET_i2s (target));
777 #endif
778
779   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
780   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
781   disconnect_msg.reserved = htonl (0);
782   disconnect_msg.purpose.size =
783       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
784              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
785              sizeof (struct GNUNET_TIME_AbsoluteNBO));
786   disconnect_msg.purpose.purpose =
787       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
788   disconnect_msg.timestamp =
789       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
790   disconnect_msg.public_key = GST_my_public_key;
791   GNUNET_assert (GNUNET_OK ==
792                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
793                                          &disconnect_msg.purpose,
794                                          &disconnect_msg.signature));
795
796   ret =
797       send_with_plugin (target, (const char *) &disconnect_msg,
798                         sizeof (disconnect_msg), UINT32_MAX,
799                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
800                         GNUNET_YES,
801                         &send_disconnect_cont, NULL);
802
803   if (ret == GNUNET_SYSERR)
804     return GNUNET_SYSERR;
805
806   GNUNET_STATISTICS_update (GST_stats,
807                             gettext_noop
808                             ("# peers disconnected due to external request"), 1,
809                             GNUNET_NO);
810   return GNUNET_OK;
811 }
812
813 /**
814  * Disconnect from the given neighbour, clean up the record.
815  *
816  * @param n neighbour to disconnect from
817  */
818 static void
819 disconnect_neighbour (struct NeighbourMapEntry *n)
820 {
821   struct MessageQueue *mq;
822   int is_connected;
823
824   is_connected = (n->state == S_CONNECTED);
825
826   /* send DISCONNECT MESSAGE */
827   if (is_connected || is_connecting (n))
828   {
829     if (GNUNET_OK ==
830         send_disconnect (&n->id, n->address,
831                          n->session))
832       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
833                   GNUNET_i2s (&n->id));
834     else
835       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836                   "Could not send DISCONNECT_MSG to `%s'\n",
837                   GNUNET_i2s (&n->id));
838   }
839
840   if (is_connected)
841   {
842      GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
843   }
844
845
846   if (is_disconnecting (n))
847     return;
848   change_state (n, S_DISCONNECT);
849   GST_validation_set_address_use (&n->id,
850                                   n->address,
851                                   n->session,
852                                   GNUNET_NO);
853
854   if (n->address != NULL)
855   {
856     struct GNUNET_TRANSPORT_PluginFunctions *papi;
857     papi = GST_plugins_find (n->address->transport_name);
858     if (papi != NULL)
859       papi->disconnect (papi->cls, &n->id);
860   }
861
862   while (NULL != (mq = n->messages_head))
863   {
864     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
865     if (NULL != mq->cont)
866       mq->cont (mq->cont_cls, GNUNET_SYSERR);
867     GNUNET_free (mq);
868   }
869   if (NULL != n->is_active)
870   {
871     n->is_active->n = NULL;
872     n->is_active = NULL;
873   }
874   if (is_connected)
875   {
876     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
877     GNUNET_SCHEDULER_cancel (n->keepalive_task);
878     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
879     GNUNET_assert (neighbours_connected > 0);
880     neighbours_connected--;
881     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
882                               GNUNET_NO);
883     disconnect_notify_cb (callback_cls, &n->id);
884   }
885   GNUNET_assert (GNUNET_YES ==
886                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
887                                                        &n->id.hashPubKey, n));
888   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
889   {
890     GNUNET_SCHEDULER_cancel (n->ats_suggest);
891     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
892   }
893   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
894   {
895     GNUNET_SCHEDULER_cancel (n->timeout_task);
896     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
897   }
898   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
899   {
900     GNUNET_SCHEDULER_cancel (n->transmission_task);
901     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
902   }
903   if (NULL != n->address)
904   {
905     GNUNET_HELLO_address_free (n->address);
906     n->address = NULL;
907   }
908   n->session = NULL;
909   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
910               GNUNET_i2s (&n->id), n);
911   GNUNET_free (n);
912 }
913
914
915 /**
916  * Peer has been idle for too long. Disconnect.
917  *
918  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
919  * @param tc scheduler context
920  */
921 static void
922 neighbour_timeout_task (void *cls,
923                         const struct GNUNET_SCHEDULER_TaskContext *tc)
924 {
925   struct NeighbourMapEntry *n = cls;
926
927   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
928
929   GNUNET_STATISTICS_update (GST_stats,
930                             gettext_noop
931                             ("# peers disconnected due to timeout"), 1,
932                             GNUNET_NO);
933   disconnect_neighbour (n);
934 }
935
936
937 /**
938  * Send another keepalive message.
939  *
940  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
941  * @param tc scheduler context
942  */
943 static void
944 neighbour_keepalive_task (void *cls,
945                           const struct GNUNET_SCHEDULER_TaskContext *tc)
946 {
947   struct NeighbourMapEntry *n = cls;
948   struct GNUNET_MessageHeader m;
949
950   n->keepalive_task =
951       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
952                                     &neighbour_keepalive_task, n);
953   GNUNET_assert (S_CONNECTED == n->state);
954   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
955                             GNUNET_NO);
956   m.size = htons (sizeof (struct GNUNET_MessageHeader));
957   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
958
959   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
960                     UINT32_MAX /* priority */ ,
961                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address, 
962                     GNUNET_YES, NULL, NULL);
963 }
964
965
966 /**
967  * Disconnect from the given neighbour.
968  *
969  * @param cls unused
970  * @param key hash of neighbour's public key (not used)
971  * @param value the 'struct NeighbourMapEntry' of the neighbour
972  */
973 static int
974 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
975 {
976   struct NeighbourMapEntry *n = value;
977
978 #if DEBUG_TRANSPORT
979   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
980               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
981 #endif
982   if (S_CONNECTED == n->state)
983     GNUNET_STATISTICS_update (GST_stats,
984                               gettext_noop
985                               ("# peers disconnected due to global disconnect"),
986                               1, GNUNET_NO);
987   disconnect_neighbour (n);
988   return GNUNET_OK;
989 }
990
991
992 static void
993 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
994 {
995   struct NeighbourMapEntry *n = cls;
996
997   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
998
999   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000               " ATS did not suggested address to connect to peer `%s'\n",
1001               GNUNET_i2s (&n->id));
1002
1003   disconnect_neighbour (n);
1004 }
1005
1006 /**
1007  * Cleanup the neighbours subsystem.
1008  */
1009 void
1010 GST_neighbours_stop ()
1011 {
1012   // This can happen during shutdown
1013   if (neighbours == NULL)
1014   {
1015     return;
1016   }
1017
1018   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1019                                          NULL);
1020   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1021   GNUNET_assert (neighbours_connected == 0);
1022   neighbours = NULL;
1023   callback_cls = NULL;
1024   connect_notify_cb = NULL;
1025   disconnect_notify_cb = NULL;
1026 }
1027
1028
1029 /**
1030  * We tried to send a SESSION_CONNECT message to another peer.  If this
1031  * succeeded, we change the state.  If it failed, we should tell
1032  * ATS to not use this address anymore (until it is re-validated).
1033  *
1034  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1035  * @param success GNUNET_OK on success
1036  */
1037 static void
1038 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1039                            int success)
1040 {
1041   struct GNUNET_HELLO_Address *address = cls;
1042   struct NeighbourMapEntry *n;
1043   
1044   if (GNUNET_YES != success)
1045     GNUNET_ATS_address_destroyed (GST_ats, address, NULL);  
1046   if ( (NULL == neighbours) ||
1047        (NULL == (n = lookup_neighbour (&address->peer))) ||
1048        (n->state == S_DISCONNECT) ||
1049        (GNUNET_YES == success) )
1050   {
1051     GNUNET_HELLO_address_free (address);
1052     return;
1053   }
1054 #if DEBUG_TRANSPORT
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1057               GNUNET_i2s (&n->id), 
1058               GST_plugins_a2s (n->address), 
1059               n->session);
1060 #endif
1061   change_state (n, S_NOT_CONNECTED);
1062   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1063     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1064   n->ats_suggest =
1065     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1066                                   n);
1067   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1068   GNUNET_HELLO_address_free (address);
1069 }
1070
1071
1072 /**
1073  * We tried to switch addresses with an peer already connected. If it failed,
1074  * we should tell ATS to not use this address anymore (until it is re-validated).
1075  *
1076  * @param cls the 'struct NeighbourMapEntry'
1077  * @param success GNUNET_OK on success
1078  */
1079 static void
1080 send_switch_address_continuation (void *cls,
1081                                   const struct GNUNET_PeerIdentity *target,
1082                                   int success)
1083 {
1084   struct NeighbourMapEntry *n = cls;
1085
1086   GNUNET_assert (n != NULL);
1087   if (is_disconnecting (n))
1088     return;                     /* neighbour is going away */
1089
1090   GNUNET_assert (n->state == S_CONNECTED);
1091   if (GNUNET_YES != success)
1092   {
1093 #if DEBUG_TRANSPORT
1094     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095                 "Failed to switch connected peer `%s' to plugin `%s' address '%s' session %X, asking ATS for new address \n",
1096                 GNUNET_i2s (&n->id), n->address->transport_name,
1097                 (n->addrlen ==
1098                  0) ? "<inbound>" : GST_plugins_a2s (n->address), n->session);
1099 #endif
1100
1101     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1102
1103     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1104       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1105     n->ats_suggest =
1106         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1107                                       n);
1108     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1109     return;
1110   }
1111   /* Tell ATS that switching addresses was successful */
1112   GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
1113 }
1114
1115 /**
1116  * We tried to send a SESSION_CONNECT message to another peer.  If this
1117  * succeeded, we change the state.  If it failed, we should tell
1118  * ATS to not use this address anymore (until it is re-validated).
1119  *
1120  * @param cls the 'struct NeighbourMapEntry'
1121  * @param success GNUNET_OK on success
1122  */
1123 static void
1124 send_connect_ack_continuation (void *cls,
1125                                const struct GNUNET_PeerIdentity *target,
1126                                int success)
1127 {
1128   struct NeighbourMapEntry *n = cls;
1129
1130   GNUNET_assert (n != NULL);
1131
1132   if (GNUNET_YES == success)
1133     return;                     /* sending successful */
1134
1135   /* sending failed, ask for next address  */
1136 #if DEBUG_TRANSPORT
1137   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138               "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1139               GNUNET_i2s (&n->id), n->address->transport_name,
1140               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->address),
1141               n->session);
1142 #endif
1143   change_state (n, S_NOT_CONNECTED);
1144
1145   GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1146
1147   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1148     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1149   n->ats_suggest =
1150       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1151                                     n);
1152   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1153 }
1154
1155
1156 /**
1157  * For an existing neighbour record, set the active connection to
1158  * the given address.
1159  *
1160  * @param peer identity of the peer to switch the address for
1161  * @param address address of the other peer, NULL if other peer
1162  *                       connected to us
1163  * @param session session to use (or NULL)
1164  * @param ats performance data
1165  * @param ats_count number of entries in ats
1166  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1167  *         connection is not up (yet)
1168  */
1169 int
1170 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1171                                        const struct GNUNET_HELLO_Address *address,
1172                                        struct Session *session,
1173                                        const struct GNUNET_ATS_Information *ats,
1174                                        uint32_t ats_count,
1175                                        struct GNUNET_BANDWIDTH_Value32NBO
1176                                        bandwidth_in,
1177                                        struct GNUNET_BANDWIDTH_Value32NBO
1178                                        bandwidth_out)
1179 {
1180   struct NeighbourMapEntry *n;
1181   struct SessionConnectMessage connect_msg;
1182   size_t msg_len;
1183   size_t ret;
1184
1185   if (neighbours == NULL)
1186   {
1187     /* This can happen during shutdown */
1188     return GNUNET_NO;
1189   }
1190   n = lookup_neighbour (peer);
1191   if (NULL == n)
1192     return GNUNET_NO;
1193   if (n->state == S_DISCONNECT)
1194   {
1195     /* We are disconnecting, nothing to do here */
1196     return GNUNET_NO;
1197   }
1198   GNUNET_assert (address->transport_name != NULL);
1199   if ( (session == NULL) && (0 == address->address_length) )
1200   {
1201     GNUNET_break_op (0);
1202     GNUNET_ATS_address_destroyed (GST_ats, address, session);
1203     GNUNET_ATS_suggest_address (GST_ats, peer);
1204     return GNUNET_NO;
1205   }
1206
1207   /* checks successful and neighbour != NULL */
1208 #if DEBUG_TRANSPORT
1209   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1210               "ATS tells us to switch to address '%s' session %p for %s peer `%s'\n",
1211               GST_plugins_a2s (address),
1212               session, (S_CONNECTED == n->state) ? "CONNECTED" : "NOT CONNECTED",
1213               GNUNET_i2s (peer));
1214 #endif
1215   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1216   {
1217     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1218     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1219   }
1220   /* do not switch addresses just update quotas */
1221   if ( (n->state == S_CONNECTED) && 
1222        (NULL != n->address) &&
1223        (0 == GNUNET_HELLO_address_cmp (address,
1224                                        n->address)) &&
1225        (n->session == session) )
1226   {
1227     struct QuotaSetMessage q_msg;
1228     
1229 #if DEBUG_TRANSPORT
1230     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1231                 "Sending outbound quota of %u Bps and inbound quota of %u Bps for peer `%s' to all clients\n",
1232                 ntohl (n->bandwidth_out.value__),
1233                 ntohl (n->bandwidth_in.value__), GNUNET_i2s (peer));
1234 #endif
1235     
1236     n->bandwidth_in = bandwidth_in;
1237     n->bandwidth_out = bandwidth_out;
1238     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1239     
1240     q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1241     q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1242     q_msg.quota = n->bandwidth_out;
1243     q_msg.peer = (*peer);
1244     GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1245     return GNUNET_NO;    
1246   }
1247   if (n->state == S_CONNECTED) 
1248   {
1249     /* mark old address as no longer used */
1250     GST_validation_set_address_use (&n->id,
1251                                     n->address,
1252                                     n->session,
1253                                     GNUNET_NO);
1254     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1255   }
1256
1257   /* set new address */
1258   if (NULL != n->address)
1259     GNUNET_HELLO_address_free (n->address);
1260   n->address = GNUNET_HELLO_address_copy (address);
1261   n->session = session;
1262   n->bandwidth_in = bandwidth_in;
1263   n->bandwidth_out = bandwidth_out;
1264   GNUNET_SCHEDULER_cancel (n->timeout_task);
1265   n->timeout_task =
1266       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1267                                     &neighbour_timeout_task, n);
1268   switch (n->state)
1269   {
1270   case S_NOT_CONNECTED:  
1271   case S_CONNECT_SENT:
1272     msg_len = sizeof (struct SessionConnectMessage);
1273     connect_msg.header.size = htons (msg_len);
1274     connect_msg.header.type =
1275         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1276     connect_msg.reserved = htonl (0);
1277     connect_msg.timestamp =
1278         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1279     change_state (n, S_CONNECT_SENT);
1280     ret =
1281         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1282                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1283                           address, GNUNET_YES,
1284                           &send_connect_continuation, 
1285                           GNUNET_HELLO_address_copy (address));
1286     return GNUNET_NO;
1287   case S_CONNECT_RECV:
1288     /* We received a CONNECT message and asked ATS for an address */
1289     msg_len = sizeof (struct SessionConnectMessage);
1290     connect_msg.header.size = htons (msg_len);
1291     connect_msg.header.type =
1292         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1293     connect_msg.reserved = htonl (0);
1294     connect_msg.timestamp =
1295         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1296     ret =
1297         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1298                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1299                           address, GNUNET_YES,
1300                           &send_connect_ack_continuation, n);
1301     return GNUNET_NO;
1302   case S_CONNECTED:
1303     /* connected peer is switching addresses */
1304     GST_validation_set_address_use (&n->id,
1305                                     n->address,
1306                                     n->session,
1307                                     GNUNET_YES);
1308     msg_len = sizeof (struct SessionConnectMessage);
1309     connect_msg.header.size = htons (msg_len);
1310     connect_msg.header.type =
1311         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1312     connect_msg.reserved = htonl (0);
1313     connect_msg.timestamp =
1314         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1315     ret =
1316         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1317                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1318                           address, GNUNET_YES,
1319                           &send_switch_address_continuation, n);
1320     if (ret == GNUNET_SYSERR)
1321     {
1322       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1323                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1324                   GNUNET_i2s (peer), 
1325                   GST_plugins_a2s (address), session);
1326     }
1327     return GNUNET_NO;
1328   default:
1329     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1330                 "Invalid connection state to switch addresses %u \n", n->state);
1331     GNUNET_break_op (0);
1332     return GNUNET_NO;
1333   }
1334 }
1335
1336
1337 /**
1338  * Obtain current latency information for the given neighbour.
1339  *
1340  * @param peer 
1341  * @return observed latency of the address, FOREVER if the address was
1342  *         never successfully validated
1343  */
1344 struct GNUNET_TIME_Relative
1345 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1346 {
1347   struct NeighbourMapEntry *n;
1348
1349   n = lookup_neighbour (peer);
1350   if ( (NULL == n) ||
1351        ( (n->address == NULL) && (n->session == NULL) ) )
1352     return GNUNET_TIME_UNIT_FOREVER_REL;
1353   return GST_validation_get_address_latency (peer,
1354                                              n->address,
1355                                              n->session);
1356 }
1357
1358
1359 /**
1360  * Create an entry in the neighbour map for the given peer
1361  *
1362  * @param peer peer to create an entry for
1363  * @return new neighbour map entry
1364  */
1365 static struct NeighbourMapEntry *
1366 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1367 {
1368   struct NeighbourMapEntry *n;
1369
1370 #if DEBUG_TRANSPORT
1371   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1372               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1373 #endif
1374   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1375   n->id = *peer;
1376   n->state = S_NOT_CONNECTED;
1377   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1378                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1379                                  MAX_BANDWIDTH_CARRY_S);
1380   n->timeout_task =
1381       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1382                                     &neighbour_timeout_task, n);
1383   GNUNET_assert (GNUNET_OK ==
1384                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1385                                                     &n->id.hashPubKey, n,
1386                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1387   return n;
1388 }
1389
1390
1391 /**
1392  * Try to create a connection to the given target (eventually).
1393  *
1394  * @param target peer to try to connect to
1395  */
1396 void
1397 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1398 {
1399   struct NeighbourMapEntry *n;
1400
1401   // This can happen during shutdown
1402   if (neighbours == NULL)
1403   {
1404     return;
1405   }
1406 #if DEBUG_TRANSPORT
1407   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1408               GNUNET_i2s (target));
1409 #endif
1410   if (0 ==
1411       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1412   {
1413     /* my own hello */
1414     return;
1415   }
1416   n = lookup_neighbour (target);
1417
1418   if (NULL != n)
1419   {
1420     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1421       return;                   /* already connecting or connected */
1422     if (is_disconnecting (n))
1423       change_state (n, S_NOT_CONNECTED);
1424   }
1425
1426
1427   if (n == NULL)
1428     n = setup_neighbour (target);
1429 #if DEBUG_TRANSPORT
1430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1431               "Asking ATS for suggested address to connect to peer `%s'\n",
1432               GNUNET_i2s (&n->id));
1433 #endif
1434
1435   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1436 }
1437
1438 /**
1439  * Test if we're connected to the given peer.
1440  *
1441  * @param target peer to test
1442  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1443  */
1444 int
1445 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1446 {
1447   struct NeighbourMapEntry *n;
1448
1449   // This can happen during shutdown
1450   if (neighbours == NULL)
1451   {
1452     return GNUNET_NO;
1453   }
1454
1455   n = lookup_neighbour (target);
1456
1457   if ((NULL == n) || (S_CONNECTED != n->state))
1458     return GNUNET_NO;           /* not connected */
1459   return GNUNET_YES;
1460 }
1461
1462
1463 /**
1464  * A session was terminated. Take note.
1465  *
1466  * @param peer identity of the peer where the session died
1467  * @param session session that is gone
1468  */
1469 void
1470 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1471                                    struct Session *session)
1472 {
1473   struct NeighbourMapEntry *n;
1474
1475   // This can happen during shutdown
1476   if (neighbours == NULL)
1477   {
1478     return;
1479   }
1480
1481 #if DEBUG_TRANSPORT
1482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1483               session, GNUNET_i2s (peer));
1484 #endif
1485
1486   n = lookup_neighbour (peer);
1487   if (NULL == n)
1488     return;
1489   if (session != n->session)
1490     return;                     /* doesn't affect us */
1491
1492   n->session = NULL;
1493   if (NULL != n->address)
1494   {
1495     GNUNET_HELLO_address_free (n->address);
1496     n->address = NULL;
1497   }
1498
1499   /* not connected anymore anyway, shouldn't matter */
1500   if ((S_CONNECTED != n->state) && (!is_connecting (n)))
1501     return;
1502
1503   /* We are connected, so ask ATS to switch addresses */
1504   GNUNET_SCHEDULER_cancel (n->timeout_task);
1505   n->timeout_task =
1506       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1507                                     &neighbour_timeout_task, n);
1508   /* try QUICKLY to re-establish a connection, reduce timeout! */
1509   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1510     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1511   n->ats_suggest =
1512       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1513                                     n);
1514   GNUNET_ATS_suggest_address (GST_ats, peer);
1515 }
1516
1517
1518 /**
1519  * Transmit a message to the given target using the active connection.
1520  *
1521  * @param target destination
1522  * @param msg message to send
1523  * @param msg_size number of bytes in msg
1524  * @param timeout when to fail with timeout
1525  * @param cont function to call when done
1526  * @param cont_cls closure for 'cont'
1527  */
1528 void
1529 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1530                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1531                      GST_NeighbourSendContinuation cont, void *cont_cls)
1532 {
1533   struct NeighbourMapEntry *n;
1534   struct MessageQueue *mq;
1535
1536   // This can happen during shutdown
1537   if (neighbours == NULL)
1538   {
1539     return;
1540   }
1541
1542   n = lookup_neighbour (target);
1543   if ((n == NULL) || (!is_connected (n)))
1544   {
1545     GNUNET_STATISTICS_update (GST_stats,
1546                               gettext_noop
1547                               ("# messages not sent (no such peer or not connected)"),
1548                               1, GNUNET_NO);
1549 #if DEBUG_TRANSPORT
1550     if (n == NULL)
1551       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1552                   "Could not send message to peer `%s': unknown neighbour",
1553                   GNUNET_i2s (target));
1554     else if (!is_connected (n))
1555       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1556                   "Could not send message to peer `%s': not connected\n",
1557                   GNUNET_i2s (target));
1558 #endif
1559     if (NULL != cont)
1560       cont (cont_cls, GNUNET_SYSERR);
1561     return;
1562   }
1563
1564   if ((n->session == NULL) && (n->address == NULL) )
1565   {
1566     GNUNET_STATISTICS_update (GST_stats,
1567                               gettext_noop
1568                               ("# messages not sent (no such peer or not connected)"),
1569                               1, GNUNET_NO);
1570 #if DEBUG_TRANSPORT
1571     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1572                 "Could not send message to peer `%s': no address available\n",
1573                 GNUNET_i2s (target));
1574 #endif
1575
1576     if (NULL != cont)
1577       cont (cont_cls, GNUNET_SYSERR);
1578     return;
1579   }
1580
1581   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1582   GNUNET_STATISTICS_update (GST_stats,
1583                             gettext_noop
1584                             ("# bytes in message queue for other peers"),
1585                             msg_size, GNUNET_NO);
1586   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1587   mq->cont = cont;
1588   mq->cont_cls = cont_cls;
1589   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1590   memcpy (&mq[1], msg, msg_size);
1591   mq->message_buf = (const char *) &mq[1];
1592   mq->message_buf_size = msg_size;
1593   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1594   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1595
1596   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1597       (NULL == n->is_active))
1598     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1599 }
1600
1601
1602 /**
1603  * We have received a message from the given sender.  How long should
1604  * we delay before receiving more?  (Also used to keep the peer marked
1605  * as live).
1606  *
1607  * @param sender sender of the message
1608  * @param size size of the message
1609  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1610  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1611  *                   GNUNET_SYSERR if the connection is not fully up yet
1612  * @return how long to wait before reading more from this sender
1613  */
1614 struct GNUNET_TIME_Relative
1615 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1616                                         *sender, ssize_t size, int *do_forward)
1617 {
1618   struct NeighbourMapEntry *n;
1619   struct GNUNET_TIME_Relative ret;
1620
1621   // This can happen during shutdown
1622   if (neighbours == NULL)
1623   {
1624     return GNUNET_TIME_UNIT_FOREVER_REL;
1625   }
1626
1627   n = lookup_neighbour (sender);
1628   if (n == NULL)
1629   {
1630     GST_neighbours_try_connect (sender);
1631     n = lookup_neighbour (sender);
1632     if (NULL == n)
1633     {
1634       GNUNET_STATISTICS_update (GST_stats,
1635                                 gettext_noop
1636                                 ("# messages discarded due to lack of neighbour record"),
1637                                 1, GNUNET_NO);
1638       *do_forward = GNUNET_NO;
1639       return GNUNET_TIME_UNIT_ZERO;
1640     }
1641   }
1642   if (!is_connected (n))
1643   {
1644     *do_forward = GNUNET_SYSERR;
1645     return GNUNET_TIME_UNIT_ZERO;
1646   }
1647   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1648   {
1649     n->quota_violation_count++;
1650 #if DEBUG_TRANSPORT
1651     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1652                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1653                 n->in_tracker.available_bytes_per_s__,
1654                 n->quota_violation_count);
1655 #endif
1656     /* Discount 32k per violation */
1657     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1658   }
1659   else
1660   {
1661     if (n->quota_violation_count > 0)
1662     {
1663       /* try to add 32k back */
1664       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1665       n->quota_violation_count--;
1666     }
1667   }
1668   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1669   {
1670     GNUNET_STATISTICS_update (GST_stats,
1671                               gettext_noop
1672                               ("# bandwidth quota violations by other peers"),
1673                               1, GNUNET_NO);
1674     *do_forward = GNUNET_NO;
1675     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1676   }
1677   *do_forward = GNUNET_YES;
1678   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1679   if (ret.rel_value > 0)
1680   {
1681 #if DEBUG_TRANSPORT
1682     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1683                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1684                 (unsigned long long) n->in_tracker.
1685                 consumption_since_last_update__,
1686                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1687                 (unsigned long long) ret.rel_value);
1688 #endif
1689     GNUNET_STATISTICS_update (GST_stats,
1690                               gettext_noop ("# ms throttling suggested"),
1691                               (int64_t) ret.rel_value, GNUNET_NO);
1692   }
1693   return ret;
1694 }
1695
1696
1697 /**
1698  * Keep the connection to the given neighbour alive longer,
1699  * we received a KEEPALIVE (or equivalent).
1700  *
1701  * @param neighbour neighbour to keep alive
1702  */
1703 void
1704 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1705 {
1706   struct NeighbourMapEntry *n;
1707
1708   // This can happen during shutdown
1709   if (neighbours == NULL)
1710   {
1711     return;
1712   }
1713
1714   n = lookup_neighbour (neighbour);
1715   if (NULL == n)
1716   {
1717     GNUNET_STATISTICS_update (GST_stats,
1718                               gettext_noop
1719                               ("# KEEPALIVE messages discarded (not connected)"),
1720                               1, GNUNET_NO);
1721     return;
1722   }
1723   GNUNET_SCHEDULER_cancel (n->timeout_task);
1724   n->timeout_task =
1725       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1726                                     &neighbour_timeout_task, n);
1727 }
1728
1729
1730 /**
1731  * Change the incoming quota for the given peer.
1732  *
1733  * @param neighbour identity of peer to change qutoa for
1734  * @param quota new quota
1735  */
1736 void
1737 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1738                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1739 {
1740   struct NeighbourMapEntry *n;
1741
1742   // This can happen during shutdown
1743   if (neighbours == NULL)
1744   {
1745     return;
1746   }
1747
1748   n = lookup_neighbour (neighbour);
1749   if (n == NULL)
1750   {
1751     GNUNET_STATISTICS_update (GST_stats,
1752                               gettext_noop
1753                               ("# SET QUOTA messages ignored (no such peer)"),
1754                               1, GNUNET_NO);
1755     return;
1756   }
1757   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1758   if (0 != ntohl (quota.value__))
1759     return;
1760 #if DEBUG_TRANSPORT
1761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1762               GNUNET_i2s (&n->id), "SET_QUOTA");
1763 #endif
1764   if (is_connected (n))
1765     GNUNET_STATISTICS_update (GST_stats,
1766                               gettext_noop ("# disconnects due to quota of 0"),
1767                               1, GNUNET_NO);
1768   disconnect_neighbour (n);
1769 }
1770
1771
1772 /**
1773  * Closure for the neighbours_iterate function.
1774  */
1775 struct IteratorContext
1776 {
1777   /**
1778    * Function to call on each connected neighbour.
1779    */
1780   GST_NeighbourIterator cb;
1781
1782   /**
1783    * Closure for 'cb'.
1784    */
1785   void *cb_cls;
1786 };
1787
1788
1789 /**
1790  * Call the callback from the closure for each connected neighbour.
1791  *
1792  * @param cls the 'struct IteratorContext'
1793  * @param key the hash of the public key of the neighbour
1794  * @param value the 'struct NeighbourMapEntry'
1795  * @return GNUNET_OK (continue to iterate)
1796  */
1797 static int
1798 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1799 {
1800   struct IteratorContext *ic = cls;
1801   struct NeighbourMapEntry *n = value;
1802
1803   if (!is_connected (n))
1804     return GNUNET_OK;
1805
1806   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
1807   return GNUNET_OK;
1808 }
1809
1810
1811 /**
1812  * Iterate over all connected neighbours.
1813  *
1814  * @param cb function to call
1815  * @param cb_cls closure for cb
1816  */
1817 void
1818 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1819 {
1820   struct IteratorContext ic;
1821
1822   // This can happen during shutdown
1823   if (neighbours == NULL)
1824   {
1825     return;
1826   }
1827
1828   ic.cb = cb;
1829   ic.cb_cls = cb_cls;
1830   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1831 }
1832
1833 /**
1834  * If we have an active connection to the given target, it must be shutdown.
1835  *
1836  * @param target peer to disconnect from
1837  */
1838 void
1839 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1840 {
1841   struct NeighbourMapEntry *n;
1842
1843   // This can happen during shutdown
1844   if (neighbours == NULL)
1845   {
1846     return;
1847   }
1848
1849   n = lookup_neighbour (target);
1850   if (NULL == n)
1851     return;                     /* not active */
1852   if (is_connected (n))
1853   {
1854     send_disconnect (&n->id, n->address, n->session);
1855
1856     n = lookup_neighbour (target);
1857     if (NULL == n)
1858       return;                   /* gone already */
1859   }
1860   disconnect_neighbour (n);
1861 }
1862
1863
1864 /**
1865  * We received a disconnect message from the given peer,
1866  * validate and process.
1867  *
1868  * @param peer sender of the message
1869  * @param msg the disconnect message
1870  */
1871 void
1872 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
1873                                           *peer,
1874                                           const struct GNUNET_MessageHeader
1875                                           *msg)
1876 {
1877   struct NeighbourMapEntry *n;
1878   const struct SessionDisconnectMessage *sdm;
1879   GNUNET_HashCode hc;
1880
1881 #if DEBUG_TRANSPORT
1882   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1883               "Received DISCONNECT message from peer `%s'\n",
1884               GNUNET_i2s (peer));
1885 #endif
1886
1887   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
1888   {
1889     // GNUNET_break_op (0);
1890     GNUNET_STATISTICS_update (GST_stats,
1891                               gettext_noop
1892                               ("# disconnect messages ignored (old format)"), 1,
1893                               GNUNET_NO);
1894     return;
1895   }
1896   sdm = (const struct SessionDisconnectMessage *) msg;
1897   n = lookup_neighbour (peer);
1898   if (NULL == n)
1899     return;                     /* gone already */
1900   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
1901       n->connect_ts.abs_value)
1902   {
1903     GNUNET_STATISTICS_update (GST_stats,
1904                               gettext_noop
1905                               ("# disconnect messages ignored (timestamp)"), 1,
1906                               GNUNET_NO);
1907     return;
1908   }
1909   GNUNET_CRYPTO_hash (&sdm->public_key,
1910                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1911                       &hc);
1912   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
1913   {
1914     GNUNET_break_op (0);
1915     return;
1916   }
1917   if (ntohl (sdm->purpose.size) !=
1918       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1919       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1920       sizeof (struct GNUNET_TIME_AbsoluteNBO))
1921   {
1922     GNUNET_break_op (0);
1923     return;
1924   }
1925   if (GNUNET_OK !=
1926       GNUNET_CRYPTO_rsa_verify
1927       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
1928        &sdm->signature, &sdm->public_key))
1929   {
1930     GNUNET_break_op (0);
1931     return;
1932   }
1933   GST_neighbours_force_disconnect (peer);
1934 }
1935
1936
1937 /**
1938  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
1939  * Consider switching to it.
1940  *
1941  * @param message possibly a 'struct SessionConnectMessage' (check format)
1942  * @param peer identity of the peer to switch the address for
1943  * @param address address of the other peer, NULL if other peer
1944  *                       connected to us
1945  * @param session session to use (or NULL)
1946  * @param ats performance data
1947  * @param ats_count number of entries in ats
1948  */
1949 void
1950 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
1951                                    const struct GNUNET_PeerIdentity *peer,
1952                                    const struct GNUNET_HELLO_Address *address,
1953                                    struct Session *session,
1954                                    const struct GNUNET_ATS_Information *ats,
1955                                    uint32_t ats_count)
1956 {
1957   const struct SessionConnectMessage *scm;
1958   struct QuotaSetMessage q_msg;
1959   struct GNUNET_MessageHeader msg;
1960   struct NeighbourMapEntry *n;
1961   size_t msg_len;
1962   size_t ret;
1963
1964 #if DEBUG_TRANSPORT
1965   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1966               "Received CONNECT_ACK message from peer `%s'\n",
1967               GNUNET_i2s (peer));
1968 #endif
1969   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
1970   {
1971     GNUNET_break_op (0);
1972     return;
1973   }
1974   scm = (const struct SessionConnectMessage *) message;
1975   GNUNET_break_op (ntohl (scm->reserved) == 0);
1976   n = lookup_neighbour (peer);
1977   if (NULL == n)
1978   {
1979     /* we did not send 'CONNECT' (how could we? no record for this peer!) */
1980     GNUNET_break_op (0);
1981     return;
1982   }  
1983   if (n->state != S_CONNECT_SENT)
1984   {
1985     GNUNET_STATISTICS_update (GST_stats,
1986                               gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
1987                               GNUNET_NO);
1988     return;
1989   }
1990   if (NULL != session)
1991     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1992                      "transport-ats",
1993                      "Giving ATS session %p of plugin %s for peer %s\n",
1994                      session, address->transport_name, GNUNET_i2s (peer));
1995   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
1996   GNUNET_assert (NULL != n->address);
1997   change_state (n, S_CONNECTED);
1998   GST_validation_set_address_use (&n->id,
1999                                   n->address,
2000                                   n->session,
2001                                   GNUNET_YES);
2002   GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2003 #if DEBUG_TRANSPORT
2004   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2005               "Setting inbound quota of %u for peer `%s' to \n",
2006               ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id));
2007 #endif
2008   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2009
2010   /* send ACK (ACK) */
2011   msg_len = sizeof (msg);
2012   msg.size = htons (msg_len);
2013   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2014
2015   ret =
2016       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2017                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2018                         n->address, GNUNET_YES, NULL,
2019                         NULL);
2020
2021   if (ret == GNUNET_SYSERR)
2022     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2023                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2024                 GNUNET_i2s (&n->id), 
2025                 GST_plugins_a2s (n->address), n->session);
2026
2027
2028   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2029     n->keepalive_task =
2030       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2031                                     &neighbour_keepalive_task, n);
2032   
2033   neighbours_connected++;
2034   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2035                             GNUNET_NO);
2036 #if DEBUG_TRANSPORT
2037   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2038               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2039               GNUNET_i2s (&n->id),
2040               GST_plugins_a2s (n->address), n->session,
2041               __LINE__);
2042 #endif
2043   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2044
2045 #if DEBUG_TRANSPORT
2046   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2047               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2048               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2049 #endif
2050   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2051   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2052   q_msg.quota = n->bandwidth_out;
2053   q_msg.peer = (*peer);
2054   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2055
2056 }
2057
2058
2059 void
2060 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2061                            const struct GNUNET_PeerIdentity *peer,
2062                            const struct GNUNET_HELLO_Address *address,
2063                            struct Session *session,
2064                            const struct GNUNET_ATS_Information *ats,
2065                            uint32_t ats_count)
2066 {
2067   struct NeighbourMapEntry *n;
2068   struct QuotaSetMessage q_msg;
2069
2070 #if DEBUG_TRANSPORT
2071   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2072               "Received ACK message from peer `%s'\n",
2073               GNUNET_i2s (peer));
2074 #endif
2075
2076   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2077   {
2078     GNUNET_break_op (0);
2079     return;
2080   }
2081   n = lookup_neighbour (peer);
2082   if (NULL == n)
2083   {
2084     send_disconnect (peer, address,
2085                      session);
2086     GNUNET_break (0);
2087     return;
2088   }
2089   if (S_CONNECTED == n->state)
2090     return;
2091   if (!is_connecting(n))
2092   {
2093     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2094                               GNUNET_NO);
2095     return;
2096   }
2097   if (NULL != session)
2098     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2099                      "transport-ats",
2100                      "Giving ATS session %p of plugin %s for peer %s\n",
2101                      session, address->transport_name, GNUNET_i2s (peer));
2102   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2103   GNUNET_assert (n->address != NULL);
2104   change_state (n, S_CONNECTED);
2105   GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2106
2107   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2108
2109   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2110     n->keepalive_task =
2111         GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2112                                       &neighbour_keepalive_task, n);
2113   GST_validation_set_address_use (&n->id,
2114                                   n->address,
2115                                   n->session,
2116                                   GNUNET_YES);
2117   neighbours_connected++;
2118   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2119                             GNUNET_NO);
2120   
2121 #if DEBUG_TRANSPORT
2122   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2123               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2124               GNUNET_i2s (&n->id),
2125               GST_plugins_a2s (n->address), n->session,
2126               __LINE__);
2127 #endif
2128   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2129 #if DEBUG_TRANSPORT
2130   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2131               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2132               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2133 #endif
2134   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2135   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2136   q_msg.quota = n->bandwidth_out;
2137   q_msg.peer = (*peer);
2138   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2139 }
2140
2141 struct BlackListCheckContext
2142 {
2143   struct GNUNET_ATS_Information *ats;
2144
2145   uint32_t ats_count;
2146
2147   struct Session *session;
2148
2149   struct GNUNET_HELLO_Address *address;
2150
2151   struct GNUNET_TIME_Absolute ts;
2152 };
2153
2154
2155 static void
2156 handle_connect_blacklist_cont (void *cls,
2157                                const struct GNUNET_PeerIdentity *peer,
2158                                int result)
2159 {
2160   struct NeighbourMapEntry *n;
2161   struct BlackListCheckContext *bcc = cls;
2162
2163 #if DEBUG_TRANSPORT
2164   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2165               "Blacklist check due to CONNECT message: `%s'\n",
2166               GNUNET_i2s (peer),
2167               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2168 #endif
2169
2170   /* not allowed */
2171   if (GNUNET_OK != result)
2172   {
2173     GNUNET_HELLO_address_free (bcc->address);
2174     GNUNET_free (bcc);
2175     return;
2176   }
2177
2178   n = lookup_neighbour (peer);
2179   if (NULL == n)
2180     n = setup_neighbour (peer);
2181
2182   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2183   {
2184     if (NULL != bcc->session)
2185       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2186                        "transport-ats",
2187                        "Giving ATS session %p of address `%s' for peer %s\n",
2188                        bcc->session, 
2189                        GST_plugins_a2s (bcc->address),
2190                        GNUNET_i2s (peer));
2191     GNUNET_ATS_address_update (GST_ats, bcc->address,
2192                                bcc->session, bcc->ats, bcc->ats_count);
2193     n->connect_ts = bcc->ts;
2194   }
2195
2196   GNUNET_free (bcc);
2197
2198   if (n->state != S_CONNECT_RECV)
2199     change_state (n, S_CONNECT_RECV);
2200
2201   /* Ask ATS for an address to connect via that address */
2202   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2203     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2204   n->ats_suggest =
2205       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2206                                     n);
2207   GNUNET_ATS_suggest_address (GST_ats, peer);
2208 }
2209
2210 /**
2211  * We received a 'SESSION_CONNECT' message from the other peer.
2212  * Consider switching to it.
2213  *
2214  * @param message possibly a 'struct SessionConnectMessage' (check format)
2215  * @param peer identity of the peer to switch the address for
2216  * @param address address of the other peer, NULL if other peer
2217  *                       connected to us
2218  * @param session session to use (or NULL)
2219  * @param ats performance data
2220  * @param ats_count number of entries in ats (excluding 0-termination)
2221  */
2222 void
2223 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2224                                const struct GNUNET_PeerIdentity *peer,
2225                                const struct GNUNET_HELLO_Address *address,
2226                                struct Session *session,
2227                                const struct GNUNET_ATS_Information *ats,
2228                                uint32_t ats_count)
2229 {
2230   const struct SessionConnectMessage *scm;
2231   struct NeighbourMapEntry *n;
2232   struct BlackListCheckContext *bcc = NULL;
2233
2234 #if DEBUG_TRANSPORT
2235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2236               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2237 #endif
2238
2239   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2240   {
2241     GNUNET_break_op (0);
2242     return;
2243   }
2244
2245   scm = (const struct SessionConnectMessage *) message;
2246   GNUNET_break_op (ntohl (scm->reserved) == 0);
2247
2248   n = lookup_neighbour (peer);
2249   if ( (n != NULL) &&
2250        (S_CONNECTED == n->state) )
2251   {
2252     /* connected peer switches addresses */
2253     GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2254     return;
2255   }
2256
2257   /* we are not connected to this peer */
2258   /* do blacklist check */
2259   bcc =
2260       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2261                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2262   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2263   bcc->ats_count = ats_count + 1;
2264   bcc->address = GNUNET_HELLO_address_copy (address);
2265   bcc->session = session;
2266   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2267   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2268   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2269   bcc->ats[ats_count].value = htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2270   GST_blacklist_test_allowed (peer, address->transport_name, handle_connect_blacklist_cont,
2271                               bcc);
2272 }
2273
2274
2275 /* end of file gnunet-service-transport_neighbours.c */