clean up, asserts, FIXME
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours?
52  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
53  * send 3 keepalives in each interval, so 3 messages would need to be
54  * lost in a row for a disconnect).
55  */
56 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
57
58
59 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
60
61
62 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
63
64
65 /**
66  * Entry in neighbours.
67  */
68 struct NeighbourMapEntry;
69
70 /**
71  * Message a peer sends to another to indicate its
72  * preference for communicating via a particular
73  * session (and the desire to establish a real
74  * connection).
75  */
76 struct SessionConnectMessage
77 {
78   /**
79    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
80    */
81   struct GNUNET_MessageHeader header;
82
83   /**
84    * Always zero.
85    */
86   uint32_t reserved GNUNET_PACKED;
87
88   /**
89    * Absolute time at the sender.  Only the most recent connect
90    * message implies which session is preferred by the sender.
91    */
92   struct GNUNET_TIME_AbsoluteNBO timestamp;
93
94 };
95
96
97 struct SessionDisconnectMessage
98 {
99   /**
100    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Always zero.
106    */
107   uint32_t reserved GNUNET_PACKED;
108
109   /**
110    * Purpose of the signature.  Extends over the timestamp.
111    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
112    */
113   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
114
115   /**
116    * Absolute time at the sender.  Only the most recent connect
117    * message implies which session is preferred by the sender.
118    */
119   struct GNUNET_TIME_AbsoluteNBO timestamp;
120
121   /**
122    * Public key of the sender.
123    */
124   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
125
126   /**
127    * Signature of the peer that sends us the disconnect.  Only
128    * valid if the timestamp is AFTER the timestamp from the
129    * corresponding 'CONNECT' message.
130    */
131   struct GNUNET_CRYPTO_RsaSignature signature;
132
133 };
134
135
136 /**
137  * For each neighbour we keep a list of messages
138  * that we still want to transmit to the neighbour.
139  */
140 struct MessageQueue
141 {
142
143   /**
144    * This is a doubly linked list.
145    */
146   struct MessageQueue *next;
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *prev;
152
153   /**
154    * Once this message is actively being transmitted, which
155    * neighbour is it associated with?
156    */
157   struct NeighbourMapEntry *n;
158
159   /**
160    * Function to call once we're done.
161    */
162   GST_NeighbourSendContinuation cont;
163
164   /**
165    * Closure for 'cont'
166    */
167   void *cont_cls;
168
169   /**
170    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
171    * stuck together in memory.  Allocated at the end of this struct.
172    */
173   const char *message_buf;
174
175   /**
176    * Size of the message buf
177    */
178   size_t message_buf_size;
179
180   /**
181    * At what time should we fail?
182    */
183   struct GNUNET_TIME_Absolute timeout;
184
185 };
186
187
188 enum State
189 {
190   /**
191    * fresh peer or completely disconnected 
192    */
193   S_NOT_CONNECTED,
194
195   /**
196    * sent CONNECT message to other peer, waiting for CONNECT_ACK 
197    */
198   S_CONNECT_SENT,
199
200   /**
201    * received CONNECT message to other peer, sending CONNECT_ACK 
202    */
203   S_CONNECT_RECV,
204
205   /**
206    * received ACK or payload 
207    */
208   S_CONNECTED,
209
210   /**
211    * Disconnect in progress 
212    */
213   S_DISCONNECT
214 };
215
216
217 /**
218  * Entry in neighbours.
219  */
220 struct NeighbourMapEntry
221 {
222
223   /**
224    * Head of list of messages we would like to send to this peer;
225    * must contain at most one message per client.
226    */
227   struct MessageQueue *messages_head;
228
229   /**
230    * Tail of list of messages we would like to send to this peer; must
231    * contain at most one message per client.
232    */
233   struct MessageQueue *messages_tail;
234
235   /**
236    * Performance data for the peer.
237    */
238   //struct GNUNET_ATS_Information *ats;
239
240   /**
241    * Are we currently trying to send a message? If so, which one?
242    */
243   struct MessageQueue *is_active;
244
245   /**
246    * Active session for communicating with the peer.
247    */
248   struct Session *session;
249
250   /**
251    * Address we currently use.
252    */
253   struct GNUNET_HELLO_Address *address;
254
255   /**
256    * Identity of this neighbour.
257    */
258   struct GNUNET_PeerIdentity id;
259
260   /**
261    * ID of task scheduled to run when this peer is about to
262    * time out (will free resources associated with the peer).
263    */
264   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
265
266   /**
267    * ID of task scheduled to send keepalives.
268    */
269   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
270
271   /**
272    * ID of task scheduled to run when we should try transmitting
273    * the head of the message queue.
274    */
275   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
276
277   /**
278    * Tracker for inbound bandwidth.
279    */
280   struct GNUNET_BANDWIDTH_Tracker in_tracker;
281
282   /**
283    * Inbound bandwidth from ATS, activated when connection is up
284    */
285   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
286
287   /**
288    * Inbound bandwidth from ATS, activated when connection is up
289    */
290   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
291
292   /**
293    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
294    */
295   struct GNUNET_TIME_Absolute connect_ts;
296
297   /**
298    * Timeout for ATS
299    * We asked ATS for a new address for this peer
300    */
301   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
302
303   /**
304    * Task the resets the peer state after due to an pending
305    * unsuccessful connection setup
306    */
307   GNUNET_SCHEDULER_TaskIdentifier state_reset;
308
309   /**
310    * How often has the other peer (recently) violated the inbound
311    * traffic limit?  Incremented by 10 per violation, decremented by 1
312    * per non-violation (for each time interval).
313    */
314   unsigned int quota_violation_count;
315
316
317   /**
318    * The current state of the peer
319    * Element of enum State
320    */
321   int state;
322
323 };
324
325
326 /**
327  * All known neighbours and their HELLOs.
328  */
329 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
330
331 /**
332  * Closure for connect_notify_cb and disconnect_notify_cb
333  */
334 static void *callback_cls;
335
336 /**
337  * Function to call when we connected to a neighbour.
338  */
339 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
340
341 /**
342  * Function to call when we disconnected from a neighbour.
343  */
344 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
345
346 /**
347  * counter for connected neighbours
348  */
349 static int neighbours_connected;
350
351 /**
352  * Lookup a neighbour entry in the neighbours hash map.
353  *
354  * @param pid identity of the peer to look up
355  * @return the entry, NULL if there is no existing record
356  */
357 static struct NeighbourMapEntry *
358 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
359 {
360   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
361 }
362
363 #define change_state(n, state, ...) change (n, state, __LINE__)
364
365 static int
366 is_connecting (struct NeighbourMapEntry *n)
367 {
368   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
369     return GNUNET_YES;
370   return GNUNET_NO;
371 }
372
373 static int
374 is_connected (struct NeighbourMapEntry *n)
375 {
376   if (n->state == S_CONNECTED)
377     return GNUNET_YES;
378   return GNUNET_NO;
379 }
380
381 static int
382 is_disconnecting (struct NeighbourMapEntry *n)
383 {
384   if (n->state == S_DISCONNECT)
385     return GNUNET_YES;
386   return GNUNET_NO;
387 }
388
389 static const char *
390 print_state (int state)
391 {
392   switch (state)
393   {
394   case S_CONNECTED:
395     return "S_CONNECTED";
396     break;
397   case S_CONNECT_RECV:
398     return "S_CONNECT_RECV";
399     break;
400   case S_CONNECT_SENT:
401     return "S_CONNECT_SENT";
402     break;
403   case S_DISCONNECT:
404     return "S_DISCONNECT";
405     break;
406   case S_NOT_CONNECTED:
407     return "S_NOT_CONNECTED";
408     break;
409   default:
410     GNUNET_break (0);
411     break;
412   }
413   return NULL;
414 }
415
416 static int
417 change (struct NeighbourMapEntry *n, int state, int line);
418
419 static void
420 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
421
422 static void
423 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
424 {
425   struct NeighbourMapEntry *n = cls;
426
427   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
428
429 #if DEBUG_TRANSPORT
430 #endif
431   /* This jut a temporary debug message to check if a the value
432    * SETUP_CONNECTION_TIMEOUT was choosen to small for slow machines
433    */
434   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
435               "Information for developers: Connection to peer `%s' %s failed in state `%s', resetting connection attempt \n",
436               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
437               print_state (n->state));
438
439   GNUNET_STATISTICS_update (GST_stats,
440                             gettext_noop
441                             ("# failed connection attempts due to timeout"), 1,
442                             GNUNET_NO);
443
444   /* resetting state */
445   n->state = S_NOT_CONNECTED;
446
447   /* destroying address */
448   GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
449
450   /* request new address */
451   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
452     GNUNET_SCHEDULER_cancel (n->ats_suggest);
453   n->ats_suggest =
454       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
455                                     n);
456   GNUNET_ATS_suggest_address (GST_ats, &n->id);
457 }
458
459 static int
460 change (struct NeighbourMapEntry *n, int state, int line)
461 {
462   /* allowed transitions */
463   int allowed = GNUNET_NO;
464
465   switch (n->state)
466   {
467   case S_NOT_CONNECTED:
468     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
469         (state == S_DISCONNECT))
470       allowed = GNUNET_YES;
471     break;
472   case S_CONNECT_RECV:
473     allowed = GNUNET_YES;
474     break;
475   case S_CONNECT_SENT:
476     allowed = GNUNET_YES;
477     break;
478   case S_CONNECTED:
479     if (state == S_DISCONNECT)
480       allowed = GNUNET_YES;
481     break;
482   case S_DISCONNECT:
483     break;
484   default:
485     GNUNET_break (0);
486     break;
487   }
488   if (allowed == GNUNET_NO)
489   {
490     char *old = GNUNET_strdup (print_state (n->state));
491     char *new = GNUNET_strdup (print_state (state));
492     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
493                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
494                 new, line);
495     GNUNET_break (0);
496     GNUNET_free (old);
497     GNUNET_free (new);
498     return GNUNET_SYSERR;
499   }
500
501   n->state = state;
502 #if DEBUG_TRANSPORT
503   {
504     char *old = GNUNET_strdup (print_state (n->state));
505     char *new = GNUNET_strdup (print_state (state));
506     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
508                 GNUNET_i2s (&n->id), n, old, new, line);
509     GNUNET_free (old);
510     GNUNET_free (new);
511   }
512 #endif
513   switch (n->state)
514   {
515   case S_CONNECT_RECV:
516   case S_CONNECT_SENT:
517     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
518       GNUNET_SCHEDULER_cancel (n->state_reset);
519     n->state_reset =
520       GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
521                                     n);   
522     break;
523   case S_CONNECTED:
524   case S_NOT_CONNECTED:
525   case S_DISCONNECT:
526     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
527     {
528 #if DEBUG_TRANSPORT
529       char *old = GNUNET_strdup (print_state (n->state));
530       char *new = GNUNET_strdup (print_state (state));
531       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
533                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
534                   old, new);
535       GNUNET_free (old);
536       GNUNET_free (new);
537 #endif
538       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
539       GNUNET_SCHEDULER_cancel (n->state_reset);
540       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
541     }
542     break;
543   default:
544     GNUNET_assert (0);
545   }
546   
547
548
549   return GNUNET_OK;
550 }
551
552 static ssize_t
553 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
554                   size_t msgbuf_size, uint32_t priority,
555                   struct GNUNET_TIME_Relative timeout, struct Session *session,
556                   const struct GNUNET_HELLO_Address *address,
557                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
558                   void *cont_cls)
559 {
560   struct GNUNET_TRANSPORT_PluginFunctions *papi;
561   size_t ret = GNUNET_SYSERR;
562
563   /* FIXME : ats returns an address with all values 0 */
564   if (address == NULL)
565   {
566     if (cont != NULL)
567       cont (cont_cls, target, GNUNET_SYSERR);
568     return GNUNET_SYSERR;
569   }
570
571   if ((session == NULL) && (address == NULL))
572   {
573     if (cont != NULL)
574       cont (cont_cls, target, GNUNET_SYSERR);
575     return GNUNET_SYSERR;
576   }
577
578   papi = GST_plugins_find (address->transport_name);
579   if (papi == NULL)
580   {
581     if (cont != NULL)
582       cont (cont_cls, target, GNUNET_SYSERR);
583     return GNUNET_SYSERR;
584   }
585
586   ret =
587       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
588                   address->address, 
589                   address->address_length, GNUNET_YES, cont, cont_cls);
590
591   if (ret == -1)
592   {
593     if (cont != NULL)
594       cont (cont_cls, target, GNUNET_SYSERR);
595   }
596   return ret;
597 }
598
599 /**
600  * Task invoked to start a transmission to another peer.
601  *
602  * @param cls the 'struct NeighbourMapEntry'
603  * @param tc scheduler context
604  */
605 static void
606 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
607
608
609 /**
610  * We're done with our transmission attempt, continue processing.
611  *
612  * @param cls the 'struct MessageQueue' of the message
613  * @param receiver intended receiver
614  * @param success whether it worked or not
615  */
616 static void
617 transmit_send_continuation (void *cls,
618                             const struct GNUNET_PeerIdentity *receiver,
619                             int success)
620 {
621   struct MessageQueue *mq;
622   struct NeighbourMapEntry *n;
623
624   mq = cls;
625   n = mq->n;
626   if (NULL != n)
627   {
628     GNUNET_assert (n->is_active == mq);
629     n->is_active = NULL;
630     if (success == GNUNET_YES)
631     {
632       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
633       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
634     }
635   }
636 #if DEBUG_TRANSPORT
637   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
638               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
639               (success == GNUNET_OK) ? "successful" : "FAILED");
640 #endif
641   if (NULL != mq->cont)
642     mq->cont (mq->cont_cls, success);
643   GNUNET_free (mq);
644 }
645
646
647 /**
648  * Check the ready list for the given neighbour and if a plugin is
649  * ready for transmission (and if we have a message), do so!
650  *
651  * @param n target peer for which to transmit
652  */
653 static void
654 try_transmission_to_peer (struct NeighbourMapEntry *n)
655 {
656   struct MessageQueue *mq;
657   struct GNUNET_TIME_Relative timeout;
658   ssize_t ret;
659
660   if (n->is_active != NULL)
661   {
662     GNUNET_break (0);
663     return;                     /* transmission already pending */
664   }
665   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
666   {
667     GNUNET_break (0);
668     return;                     /* currently waiting for bandwidth */
669   }
670   while (NULL != (mq = n->messages_head))
671   {
672     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
673     if (timeout.rel_value > 0)
674       break;
675     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
676     n->is_active = mq;
677     mq->n = n;
678     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
679   }
680   if (NULL == mq)
681     return;                     /* no more messages */
682
683   if (GST_plugins_find (n->address->transport_name) == NULL)
684   {
685     GNUNET_break (0);
686     return;
687   }
688   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
689   n->is_active = mq;
690   mq->n = n;
691
692   if ( (n->session == NULL) && (NULL == n->address) )
693   {
694     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
695                 "No address for peer `%s'\n",
696                 GNUNET_i2s (&n->id));
697     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
698     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
699     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
700     return;
701   }
702
703   ret =
704       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
705                         timeout, n->session, n->address, 
706                         GNUNET_YES, &transmit_send_continuation,
707                         mq);
708   if (ret == -1)
709   {
710     /* failure, but 'send' would not call continuation in this case,
711      * so we need to do it here! */
712     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
713   }
714
715 }
716
717
718 /**
719  * Task invoked to start a transmission to another peer.
720  *
721  * @param cls the 'struct NeighbourMapEntry'
722  * @param tc scheduler context
723  */
724 static void
725 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
726 {
727   struct NeighbourMapEntry *n = cls;
728
729   GNUNET_assert (NULL != lookup_neighbour (&n->id));
730   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
731   try_transmission_to_peer (n);
732 }
733
734
735 /**
736  * Initialize the neighbours subsystem.
737  *
738  * @param cls closure for callbacks
739  * @param connect_cb function to call if we connect to a peer
740  * @param disconnect_cb function to call if we disconnect from a peer
741  */
742 void
743 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
744                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
745 {
746   callback_cls = cls;
747   connect_notify_cb = connect_cb;
748   disconnect_notify_cb = disconnect_cb;
749   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
750 }
751
752
753 static void
754 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
755                       int result)
756 {
757 #if DEBUG_TRANSPORT
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759               "Sending DISCONNECT message to peer `%4s': %i\n",
760               GNUNET_i2s (target), result);
761 #endif
762 }
763
764
765 static int
766 send_disconnect (const struct GNUNET_PeerIdentity *target,
767                  const struct GNUNET_HELLO_Address *address,
768                  struct Session *session)
769 {
770   size_t ret;
771   struct SessionDisconnectMessage disconnect_msg;
772
773 #if DEBUG_TRANSPORT
774   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775               "Sending DISCONNECT message to peer `%4s'\n",
776               GNUNET_i2s (target));
777 #endif
778
779   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
780   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
781   disconnect_msg.reserved = htonl (0);
782   disconnect_msg.purpose.size =
783       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
784              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
785              sizeof (struct GNUNET_TIME_AbsoluteNBO));
786   disconnect_msg.purpose.purpose =
787       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
788   disconnect_msg.timestamp =
789       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
790   disconnect_msg.public_key = GST_my_public_key;
791   GNUNET_assert (GNUNET_OK ==
792                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
793                                          &disconnect_msg.purpose,
794                                          &disconnect_msg.signature));
795
796   ret =
797       send_with_plugin (target, (const char *) &disconnect_msg,
798                         sizeof (disconnect_msg), UINT32_MAX,
799                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
800                         GNUNET_YES,
801                         &send_disconnect_cont, NULL);
802
803   if (ret == GNUNET_SYSERR)
804     return GNUNET_SYSERR;
805
806   GNUNET_STATISTICS_update (GST_stats,
807                             gettext_noop
808                             ("# peers disconnected due to external request"), 1,
809                             GNUNET_NO);
810   return GNUNET_OK;
811 }
812
813 /**
814  * Disconnect from the given neighbour, clean up the record.
815  *
816  * @param n neighbour to disconnect from
817  */
818 static void
819 disconnect_neighbour (struct NeighbourMapEntry *n)
820 {
821   struct MessageQueue *mq;
822   int is_connected;
823
824   is_connected = (n->state == S_CONNECTED);
825
826   /* send DISCONNECT MESSAGE */
827   if (is_connected || is_connecting (n))
828   {
829     if (GNUNET_OK ==
830         send_disconnect (&n->id, n->address,
831                          n->session))
832       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
833                   GNUNET_i2s (&n->id));
834     else
835       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836                   "Could not send DISCONNECT_MSG to `%s'\n",
837                   GNUNET_i2s (&n->id));
838   }
839
840   if (is_connected)
841   {
842      GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
843   }
844
845
846   if (is_disconnecting (n))
847     return;
848   change_state (n, S_DISCONNECT);
849   GST_validation_set_address_use (&n->id,
850                                   n->address,
851                                   n->session,
852                                   GNUNET_NO);
853
854   if (n->address != NULL)
855   {
856     struct GNUNET_TRANSPORT_PluginFunctions *papi;
857     papi = GST_plugins_find (n->address->transport_name);
858     if (papi != NULL)
859       papi->disconnect (papi->cls, &n->id);
860   }
861
862   while (NULL != (mq = n->messages_head))
863   {
864     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
865     if (NULL != mq->cont)
866       mq->cont (mq->cont_cls, GNUNET_SYSERR);
867     GNUNET_free (mq);
868   }
869   if (NULL != n->is_active)
870   {
871     n->is_active->n = NULL;
872     n->is_active = NULL;
873   }
874   if (is_connected)
875   {
876     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
877     GNUNET_SCHEDULER_cancel (n->keepalive_task);
878     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
879     GNUNET_assert (neighbours_connected > 0);
880     neighbours_connected--;
881     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
882                               GNUNET_NO);
883     disconnect_notify_cb (callback_cls, &n->id);
884   }
885   GNUNET_assert (GNUNET_YES ==
886                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
887                                                        &n->id.hashPubKey, n));
888   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
889   {
890     GNUNET_SCHEDULER_cancel (n->ats_suggest);
891     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
892   }
893   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
894   {
895     GNUNET_SCHEDULER_cancel (n->timeout_task);
896     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
897   }
898   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
899   {
900     GNUNET_SCHEDULER_cancel (n->transmission_task);
901     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
902   }
903   if (NULL != n->address)
904   {
905     GNUNET_HELLO_address_free (n->address);
906     n->address = NULL;
907   }
908   n->session = NULL;
909   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
910               GNUNET_i2s (&n->id), n);
911   GNUNET_free (n);
912 }
913
914
915 /**
916  * Peer has been idle for too long. Disconnect.
917  *
918  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
919  * @param tc scheduler context
920  */
921 static void
922 neighbour_timeout_task (void *cls,
923                         const struct GNUNET_SCHEDULER_TaskContext *tc)
924 {
925   struct NeighbourMapEntry *n = cls;
926
927   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
928
929   GNUNET_STATISTICS_update (GST_stats,
930                             gettext_noop
931                             ("# peers disconnected due to timeout"), 1,
932                             GNUNET_NO);
933   disconnect_neighbour (n);
934 }
935
936
937 /**
938  * Send another keepalive message.
939  *
940  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
941  * @param tc scheduler context
942  */
943 static void
944 neighbour_keepalive_task (void *cls,
945                           const struct GNUNET_SCHEDULER_TaskContext *tc)
946 {
947   struct NeighbourMapEntry *n = cls;
948   struct GNUNET_MessageHeader m;
949
950   n->keepalive_task =
951       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
952                                     &neighbour_keepalive_task, n);
953   GNUNET_assert (S_CONNECTED == n->state);
954   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
955                             GNUNET_NO);
956   m.size = htons (sizeof (struct GNUNET_MessageHeader));
957   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
958
959   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
960                     UINT32_MAX /* priority */ ,
961                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address, 
962                     GNUNET_YES, NULL, NULL);
963 }
964
965
966 /**
967  * Disconnect from the given neighbour.
968  *
969  * @param cls unused
970  * @param key hash of neighbour's public key (not used)
971  * @param value the 'struct NeighbourMapEntry' of the neighbour
972  */
973 static int
974 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
975 {
976   struct NeighbourMapEntry *n = value;
977
978 #if DEBUG_TRANSPORT
979   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
980               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
981 #endif
982   if (S_CONNECTED == n->state)
983     GNUNET_STATISTICS_update (GST_stats,
984                               gettext_noop
985                               ("# peers disconnected due to global disconnect"),
986                               1, GNUNET_NO);
987   disconnect_neighbour (n);
988   return GNUNET_OK;
989 }
990
991
992 static void
993 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
994 {
995   struct NeighbourMapEntry *n = cls;
996
997   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
998
999   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000               " ATS did not suggested address to connect to peer `%s'\n",
1001               GNUNET_i2s (&n->id));
1002
1003   disconnect_neighbour (n);
1004 }
1005
1006 /**
1007  * Cleanup the neighbours subsystem.
1008  */
1009 void
1010 GST_neighbours_stop ()
1011 {
1012   // This can happen during shutdown
1013   if (neighbours == NULL)
1014   {
1015     return;
1016   }
1017
1018   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1019                                          NULL);
1020   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1021   GNUNET_assert (neighbours_connected == 0);
1022   neighbours = NULL;
1023   callback_cls = NULL;
1024   connect_notify_cb = NULL;
1025   disconnect_notify_cb = NULL;
1026 }
1027
1028
1029 /**
1030  * We tried to send a SESSION_CONNECT message to another peer.  If this
1031  * succeeded, we change the state.  If it failed, we should tell
1032  * ATS to not use this address anymore (until it is re-validated).
1033  *
1034  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1035  * @param success GNUNET_OK on success
1036  */
1037 static void
1038 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1039                            int success)
1040 {
1041   struct GNUNET_HELLO_Address *address = cls;
1042   struct NeighbourMapEntry *n;
1043   
1044   if (GNUNET_YES != success)
1045     GNUNET_ATS_address_destroyed (GST_ats, address, NULL);  
1046   if ( (NULL == neighbours) ||
1047        (NULL == (n = lookup_neighbour (&address->peer))) ||
1048        (n->state == S_DISCONNECT) ||
1049        (GNUNET_YES == success) )
1050   {
1051     GNUNET_HELLO_address_free (address);
1052     return;
1053   }
1054 #if DEBUG_TRANSPORT
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1057               GNUNET_i2s (&n->id), 
1058               GST_plugins_a2s (n->address), 
1059               n->session);
1060 #endif
1061   change_state (n, S_NOT_CONNECTED);
1062   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1063     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1064   n->ats_suggest =
1065     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1066                                   n);
1067   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1068   GNUNET_HELLO_address_free (address);
1069 }
1070
1071
1072 /**
1073  * We tried to switch addresses with an peer already connected. If it failed,
1074  * we should tell ATS to not use this address anymore (until it is re-validated).
1075  *
1076  * @param cls the 'struct NeighbourMapEntry'
1077  * @param success GNUNET_OK on success
1078  */
1079 static void
1080 send_switch_address_continuation (void *cls,
1081                                   const struct GNUNET_PeerIdentity *target,
1082                                   int success)
1083 {
1084   struct NeighbourMapEntry *n = cls;
1085
1086   GNUNET_assert (n != NULL);
1087   if (is_disconnecting (n))
1088     return;                     /* neighbour is going away */
1089
1090   GNUNET_assert (n->state == S_CONNECTED);
1091   if (GNUNET_YES != success)
1092   {
1093 #if DEBUG_TRANSPORT
1094     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095                 "Failed to switch connected peer `%s' to plugin `%s' address '%s' session %X, asking ATS for new address \n",
1096                 GNUNET_i2s (&n->id), n->address->transport_name,
1097                 (n->addrlen ==
1098                  0) ? "<inbound>" : GST_plugins_a2s (n->address), n->session);
1099 #endif
1100
1101     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1102
1103     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1104       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1105     n->ats_suggest =
1106         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1107                                       n);
1108     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1109     return;
1110   }
1111   /* Tell ATS that switching addresses was successful */
1112   GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
1113 }
1114
1115 /**
1116  * We tried to send a SESSION_CONNECT message to another peer.  If this
1117  * succeeded, we change the state.  If it failed, we should tell
1118  * ATS to not use this address anymore (until it is re-validated).
1119  *
1120  * @param cls the 'struct NeighbourMapEntry'
1121  * @param success GNUNET_OK on success
1122  */
1123 static void
1124 send_connect_ack_continuation (void *cls,
1125                                const struct GNUNET_PeerIdentity *target,
1126                                int success)
1127 {
1128   struct NeighbourMapEntry *n = cls;
1129
1130   GNUNET_assert (n != NULL);
1131
1132   if (GNUNET_YES == success)
1133     return;                     /* sending successful */
1134
1135   /* sending failed, ask for next address  */
1136 #if DEBUG_TRANSPORT
1137   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138               "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1139               GNUNET_i2s (&n->id), n->address->transport_name,
1140               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->address),
1141               n->session);
1142 #endif
1143   change_state (n, S_NOT_CONNECTED);
1144
1145   GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1146
1147   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1148     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1149   n->ats_suggest =
1150       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1151                                     n);
1152   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1153 }
1154
1155
1156 /**
1157  * For an existing neighbour record, set the active connection to
1158  * the given address.
1159  *
1160  * @param peer identity of the peer to switch the address for
1161  * @param address address of the other peer, NULL if other peer
1162  *                       connected to us
1163  * @param session session to use (or NULL)
1164  * @param ats performance data
1165  * @param ats_count number of entries in ats
1166  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1167  *         connection is not up (yet)
1168  */
1169 int
1170 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1171                                        const struct GNUNET_HELLO_Address *address,
1172                                        struct Session *session,
1173                                        const struct GNUNET_ATS_Information *ats,
1174                                        uint32_t ats_count,
1175                                        struct GNUNET_BANDWIDTH_Value32NBO
1176                                        bandwidth_in,
1177                                        struct GNUNET_BANDWIDTH_Value32NBO
1178                                        bandwidth_out)
1179 {
1180   struct NeighbourMapEntry *n;
1181   struct SessionConnectMessage connect_msg;
1182   size_t msg_len;
1183   size_t ret;
1184
1185   if (neighbours == NULL)
1186   {
1187     /* This can happen during shutdown */
1188     return GNUNET_NO;
1189   }
1190   n = lookup_neighbour (peer);
1191   if (NULL == n)
1192     return GNUNET_NO;
1193   if (n->state == S_DISCONNECT)
1194   {
1195     /* We are disconnecting, nothing to do here */
1196     return GNUNET_NO;
1197   }
1198   GNUNET_assert (address->transport_name != NULL);
1199   if ( (session == NULL) && (0 == address->address_length) )
1200   {
1201     GNUNET_break_op (0);
1202     GNUNET_ATS_address_destroyed (GST_ats, address, session);
1203     GNUNET_ATS_suggest_address (GST_ats, peer);
1204     return GNUNET_NO;
1205   }
1206
1207   /* checks successful and neighbour != NULL */
1208 #if DEBUG_TRANSPORT
1209   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1210               "ATS tells us to switch to address '%s' session %p for %s peer `%s'\n",
1211               GST_plugins_a2s (address),
1212               session, (S_CONNECTED == n->state) ? "CONNECTED" : "NOT CONNECTED",
1213               GNUNET_i2s (peer));
1214 #endif
1215   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1216   {
1217     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1218     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1219   }
1220   /* do not switch addresses just update quotas */
1221   if ( (n->state == S_CONNECTED) && 
1222        (NULL != n->address) &&
1223        (0 == GNUNET_HELLO_address_cmp (address,
1224                                        n->address)) &&
1225        (n->session == session) )
1226   {
1227     struct QuotaSetMessage q_msg;
1228     
1229 #if DEBUG_TRANSPORT
1230     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1231                 "Sending outbound quota of %u Bps and inbound quota of %u Bps for peer `%s' to all clients\n",
1232                 ntohl (n->bandwidth_out.value__),
1233                 ntohl (n->bandwidth_in.value__), GNUNET_i2s (peer));
1234 #endif
1235     
1236     n->bandwidth_in = bandwidth_in;
1237     n->bandwidth_out = bandwidth_out;
1238     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1239     
1240     q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1241     q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1242     q_msg.quota = n->bandwidth_out;
1243     q_msg.peer = (*peer);
1244     GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1245     return GNUNET_NO;    
1246   }
1247   if (n->state == S_CONNECTED) 
1248   {
1249     /* mark old address as no longer used */
1250     GNUNET_assert (NULL != n->address);
1251     GST_validation_set_address_use (&n->id,
1252                                     n->address,
1253                                     n->session,
1254                                     GNUNET_NO);
1255     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1256   }
1257
1258   /* set new address */
1259   if (NULL != n->address)
1260     GNUNET_HELLO_address_free (n->address);
1261   n->address = GNUNET_HELLO_address_copy (address);
1262   n->session = session;
1263   n->bandwidth_in = bandwidth_in;
1264   n->bandwidth_out = bandwidth_out;
1265   GNUNET_SCHEDULER_cancel (n->timeout_task);
1266   n->timeout_task =
1267       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1268                                     &neighbour_timeout_task, n);
1269   switch (n->state)
1270   {
1271   case S_NOT_CONNECTED:  
1272   case S_CONNECT_SENT:
1273     msg_len = sizeof (struct SessionConnectMessage);
1274     connect_msg.header.size = htons (msg_len);
1275     connect_msg.header.type =
1276         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1277     connect_msg.reserved = htonl (0);
1278     connect_msg.timestamp =
1279         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1280     change_state (n, S_CONNECT_SENT);
1281     ret =
1282         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1283                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1284                           address, GNUNET_YES,
1285                           &send_connect_continuation, 
1286                           GNUNET_HELLO_address_copy (address));
1287     return GNUNET_NO;
1288   case S_CONNECT_RECV:
1289     /* We received a CONNECT message and asked ATS for an address */
1290     msg_len = sizeof (struct SessionConnectMessage);
1291     connect_msg.header.size = htons (msg_len);
1292     connect_msg.header.type =
1293         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1294     connect_msg.reserved = htonl (0);
1295     connect_msg.timestamp =
1296         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1297     ret =
1298         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1299                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1300                           address, GNUNET_YES,
1301                           &send_connect_ack_continuation, n);
1302     return GNUNET_NO;
1303   case S_CONNECTED:
1304     /* connected peer is switching addresses */
1305     GST_validation_set_address_use (&n->id,
1306                                     n->address,
1307                                     n->session,
1308                                     GNUNET_YES);
1309     msg_len = sizeof (struct SessionConnectMessage);
1310     connect_msg.header.size = htons (msg_len);
1311     connect_msg.header.type =
1312         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1313     connect_msg.reserved = htonl (0);
1314     connect_msg.timestamp =
1315         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1316     ret =
1317         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1318                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1319                           address, GNUNET_YES,
1320                           &send_switch_address_continuation, n);
1321     if (ret == GNUNET_SYSERR)
1322     {
1323       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1325                   GNUNET_i2s (peer), 
1326                   GST_plugins_a2s (address), session);
1327     }
1328     return GNUNET_NO;
1329   default:
1330     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1331                 "Invalid connection state to switch addresses %u \n", n->state);
1332     GNUNET_break_op (0);
1333     return GNUNET_NO;
1334   }
1335 }
1336
1337
1338 /**
1339  * Obtain current latency information for the given neighbour.
1340  *
1341  * @param peer 
1342  * @return observed latency of the address, FOREVER if the address was
1343  *         never successfully validated
1344  */
1345 struct GNUNET_TIME_Relative
1346 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1347 {
1348   struct NeighbourMapEntry *n;
1349
1350   n = lookup_neighbour (peer);
1351   if ( (NULL == n) ||
1352        ( (n->address == NULL) && (n->session == NULL) ) )
1353     return GNUNET_TIME_UNIT_FOREVER_REL;
1354   return GST_validation_get_address_latency (peer,
1355                                              n->address,
1356                                              n->session);
1357 }
1358
1359
1360 /**
1361  * Create an entry in the neighbour map for the given peer
1362  *
1363  * @param peer peer to create an entry for
1364  * @return new neighbour map entry
1365  */
1366 static struct NeighbourMapEntry *
1367 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1368 {
1369   struct NeighbourMapEntry *n;
1370
1371 #if DEBUG_TRANSPORT
1372   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1373               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1374 #endif
1375   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1376   n->id = *peer;
1377   n->state = S_NOT_CONNECTED;
1378   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1379                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1380                                  MAX_BANDWIDTH_CARRY_S);
1381   n->timeout_task =
1382       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1383                                     &neighbour_timeout_task, n);
1384   GNUNET_assert (GNUNET_OK ==
1385                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1386                                                     &n->id.hashPubKey, n,
1387                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1388   return n;
1389 }
1390
1391
1392 /**
1393  * Try to create a connection to the given target (eventually).
1394  *
1395  * @param target peer to try to connect to
1396  */
1397 void
1398 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1399 {
1400   struct NeighbourMapEntry *n;
1401
1402   // This can happen during shutdown
1403   if (neighbours == NULL)
1404   {
1405     return;
1406   }
1407 #if DEBUG_TRANSPORT
1408   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1409               GNUNET_i2s (target));
1410 #endif
1411   if (0 ==
1412       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1413   {
1414     /* my own hello */
1415     return;
1416   }
1417   n = lookup_neighbour (target);
1418
1419   if (NULL != n)
1420   {
1421     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1422       return;                   /* already connecting or connected */
1423     if (is_disconnecting (n))
1424       change_state (n, S_NOT_CONNECTED);
1425   }
1426
1427
1428   if (n == NULL)
1429     n = setup_neighbour (target);
1430 #if DEBUG_TRANSPORT
1431   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1432               "Asking ATS for suggested address to connect to peer `%s'\n",
1433               GNUNET_i2s (&n->id));
1434 #endif
1435
1436   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1437 }
1438
1439 /**
1440  * Test if we're connected to the given peer.
1441  *
1442  * @param target peer to test
1443  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1444  */
1445 int
1446 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1447 {
1448   struct NeighbourMapEntry *n;
1449
1450   // This can happen during shutdown
1451   if (neighbours == NULL)
1452   {
1453     return GNUNET_NO;
1454   }
1455
1456   n = lookup_neighbour (target);
1457
1458   if ((NULL == n) || (S_CONNECTED != n->state))
1459     return GNUNET_NO;           /* not connected */
1460   return GNUNET_YES;
1461 }
1462
1463
1464 /**
1465  * A session was terminated. Take note.
1466  *
1467  * @param peer identity of the peer where the session died
1468  * @param session session that is gone
1469  */
1470 void
1471 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1472                                    struct Session *session)
1473 {
1474   struct NeighbourMapEntry *n;
1475
1476   if (neighbours == NULL)
1477   {
1478     /* This can happen during shutdown */
1479     return;
1480   }
1481
1482 #if DEBUG_TRANSPORT
1483   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1484               session, GNUNET_i2s (peer));
1485 #endif
1486
1487   n = lookup_neighbour (peer);
1488   if (NULL == n)
1489     return;
1490   if (session != n->session)
1491     return;                     /* doesn't affect us */
1492   if (n->state == S_CONNECTED)
1493     GST_validation_set_address_use (&n->id,
1494                                     n->address,
1495                                     n->session,
1496                                     GNUNET_NO);
1497   n->session = NULL;
1498   if (NULL != n->address)
1499   {
1500     GNUNET_HELLO_address_free (n->address);
1501     n->address = NULL;
1502   }
1503   
1504   /* not connected anymore anyway, shouldn't matter */
1505   if ((S_CONNECTED != n->state) && (!is_connecting (n)))
1506     return;
1507   // FIXME: n->state = S_FAST_RECONNECT;
1508
1509   /* We are connected, so ask ATS to switch addresses */
1510   GNUNET_SCHEDULER_cancel (n->timeout_task);
1511   n->timeout_task =
1512       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1513                                     &neighbour_timeout_task, n);
1514   /* try QUICKLY to re-establish a connection, reduce timeout! */
1515   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1516     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1517   n->ats_suggest =
1518       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1519                                     n);
1520   GNUNET_ATS_suggest_address (GST_ats, peer);
1521 }
1522
1523
1524 /**
1525  * Transmit a message to the given target using the active connection.
1526  *
1527  * @param target destination
1528  * @param msg message to send
1529  * @param msg_size number of bytes in msg
1530  * @param timeout when to fail with timeout
1531  * @param cont function to call when done
1532  * @param cont_cls closure for 'cont'
1533  */
1534 void
1535 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1536                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1537                      GST_NeighbourSendContinuation cont, void *cont_cls)
1538 {
1539   struct NeighbourMapEntry *n;
1540   struct MessageQueue *mq;
1541
1542   // This can happen during shutdown
1543   if (neighbours == NULL)
1544   {
1545     return;
1546   }
1547
1548   n = lookup_neighbour (target);
1549   if ((n == NULL) || (!is_connected (n)))
1550   {
1551     GNUNET_STATISTICS_update (GST_stats,
1552                               gettext_noop
1553                               ("# messages not sent (no such peer or not connected)"),
1554                               1, GNUNET_NO);
1555 #if DEBUG_TRANSPORT
1556     if (n == NULL)
1557       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1558                   "Could not send message to peer `%s': unknown neighbour",
1559                   GNUNET_i2s (target));
1560     else if (!is_connected (n))
1561       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1562                   "Could not send message to peer `%s': not connected\n",
1563                   GNUNET_i2s (target));
1564 #endif
1565     if (NULL != cont)
1566       cont (cont_cls, GNUNET_SYSERR);
1567     return;
1568   }
1569
1570   if ((n->session == NULL) && (n->address == NULL) )
1571   {
1572     GNUNET_STATISTICS_update (GST_stats,
1573                               gettext_noop
1574                               ("# messages not sent (no such peer or not connected)"),
1575                               1, GNUNET_NO);
1576 #if DEBUG_TRANSPORT
1577     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1578                 "Could not send message to peer `%s': no address available\n",
1579                 GNUNET_i2s (target));
1580 #endif
1581
1582     if (NULL != cont)
1583       cont (cont_cls, GNUNET_SYSERR);
1584     return;
1585   }
1586
1587   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1588   GNUNET_STATISTICS_update (GST_stats,
1589                             gettext_noop
1590                             ("# bytes in message queue for other peers"),
1591                             msg_size, GNUNET_NO);
1592   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1593   mq->cont = cont;
1594   mq->cont_cls = cont_cls;
1595   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1596   memcpy (&mq[1], msg, msg_size);
1597   mq->message_buf = (const char *) &mq[1];
1598   mq->message_buf_size = msg_size;
1599   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1600   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1601
1602   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1603       (NULL == n->is_active))
1604     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1605 }
1606
1607
1608 /**
1609  * We have received a message from the given sender.  How long should
1610  * we delay before receiving more?  (Also used to keep the peer marked
1611  * as live).
1612  *
1613  * @param sender sender of the message
1614  * @param size size of the message
1615  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1616  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1617  *                   GNUNET_SYSERR if the connection is not fully up yet
1618  * @return how long to wait before reading more from this sender
1619  */
1620 struct GNUNET_TIME_Relative
1621 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1622                                         *sender, ssize_t size, int *do_forward)
1623 {
1624   struct NeighbourMapEntry *n;
1625   struct GNUNET_TIME_Relative ret;
1626
1627   // This can happen during shutdown
1628   if (neighbours == NULL)
1629   {
1630     return GNUNET_TIME_UNIT_FOREVER_REL;
1631   }
1632
1633   n = lookup_neighbour (sender);
1634   if (n == NULL)
1635   {
1636     GST_neighbours_try_connect (sender);
1637     n = lookup_neighbour (sender);
1638     if (NULL == n)
1639     {
1640       GNUNET_STATISTICS_update (GST_stats,
1641                                 gettext_noop
1642                                 ("# messages discarded due to lack of neighbour record"),
1643                                 1, GNUNET_NO);
1644       *do_forward = GNUNET_NO;
1645       return GNUNET_TIME_UNIT_ZERO;
1646     }
1647   }
1648   if (!is_connected (n))
1649   {
1650     *do_forward = GNUNET_SYSERR;
1651     return GNUNET_TIME_UNIT_ZERO;
1652   }
1653   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1654   {
1655     n->quota_violation_count++;
1656 #if DEBUG_TRANSPORT
1657     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1658                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1659                 n->in_tracker.available_bytes_per_s__,
1660                 n->quota_violation_count);
1661 #endif
1662     /* Discount 32k per violation */
1663     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1664   }
1665   else
1666   {
1667     if (n->quota_violation_count > 0)
1668     {
1669       /* try to add 32k back */
1670       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1671       n->quota_violation_count--;
1672     }
1673   }
1674   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1675   {
1676     GNUNET_STATISTICS_update (GST_stats,
1677                               gettext_noop
1678                               ("# bandwidth quota violations by other peers"),
1679                               1, GNUNET_NO);
1680     *do_forward = GNUNET_NO;
1681     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1682   }
1683   *do_forward = GNUNET_YES;
1684   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1685   if (ret.rel_value > 0)
1686   {
1687 #if DEBUG_TRANSPORT
1688     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1689                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1690                 (unsigned long long) n->in_tracker.
1691                 consumption_since_last_update__,
1692                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1693                 (unsigned long long) ret.rel_value);
1694 #endif
1695     GNUNET_STATISTICS_update (GST_stats,
1696                               gettext_noop ("# ms throttling suggested"),
1697                               (int64_t) ret.rel_value, GNUNET_NO);
1698   }
1699   return ret;
1700 }
1701
1702
1703 /**
1704  * Keep the connection to the given neighbour alive longer,
1705  * we received a KEEPALIVE (or equivalent).
1706  *
1707  * @param neighbour neighbour to keep alive
1708  */
1709 void
1710 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1711 {
1712   struct NeighbourMapEntry *n;
1713
1714   // This can happen during shutdown
1715   if (neighbours == NULL)
1716   {
1717     return;
1718   }
1719
1720   n = lookup_neighbour (neighbour);
1721   if (NULL == n)
1722   {
1723     GNUNET_STATISTICS_update (GST_stats,
1724                               gettext_noop
1725                               ("# KEEPALIVE messages discarded (not connected)"),
1726                               1, GNUNET_NO);
1727     return;
1728   }
1729   GNUNET_SCHEDULER_cancel (n->timeout_task);
1730   n->timeout_task =
1731       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1732                                     &neighbour_timeout_task, n);
1733 }
1734
1735
1736 /**
1737  * Change the incoming quota for the given peer.
1738  *
1739  * @param neighbour identity of peer to change qutoa for
1740  * @param quota new quota
1741  */
1742 void
1743 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1744                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1745 {
1746   struct NeighbourMapEntry *n;
1747
1748   // This can happen during shutdown
1749   if (neighbours == NULL)
1750   {
1751     return;
1752   }
1753
1754   n = lookup_neighbour (neighbour);
1755   if (n == NULL)
1756   {
1757     GNUNET_STATISTICS_update (GST_stats,
1758                               gettext_noop
1759                               ("# SET QUOTA messages ignored (no such peer)"),
1760                               1, GNUNET_NO);
1761     return;
1762   }
1763   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1764   if (0 != ntohl (quota.value__))
1765     return;
1766 #if DEBUG_TRANSPORT
1767   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1768               GNUNET_i2s (&n->id), "SET_QUOTA");
1769 #endif
1770   if (is_connected (n))
1771     GNUNET_STATISTICS_update (GST_stats,
1772                               gettext_noop ("# disconnects due to quota of 0"),
1773                               1, GNUNET_NO);
1774   disconnect_neighbour (n);
1775 }
1776
1777
1778 /**
1779  * Closure for the neighbours_iterate function.
1780  */
1781 struct IteratorContext
1782 {
1783   /**
1784    * Function to call on each connected neighbour.
1785    */
1786   GST_NeighbourIterator cb;
1787
1788   /**
1789    * Closure for 'cb'.
1790    */
1791   void *cb_cls;
1792 };
1793
1794
1795 /**
1796  * Call the callback from the closure for each connected neighbour.
1797  *
1798  * @param cls the 'struct IteratorContext'
1799  * @param key the hash of the public key of the neighbour
1800  * @param value the 'struct NeighbourMapEntry'
1801  * @return GNUNET_OK (continue to iterate)
1802  */
1803 static int
1804 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1805 {
1806   struct IteratorContext *ic = cls;
1807   struct NeighbourMapEntry *n = value;
1808
1809   if (!is_connected (n))
1810     return GNUNET_OK;
1811
1812   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
1813   return GNUNET_OK;
1814 }
1815
1816
1817 /**
1818  * Iterate over all connected neighbours.
1819  *
1820  * @param cb function to call
1821  * @param cb_cls closure for cb
1822  */
1823 void
1824 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1825 {
1826   struct IteratorContext ic;
1827
1828   // This can happen during shutdown
1829   if (neighbours == NULL)
1830   {
1831     return;
1832   }
1833
1834   ic.cb = cb;
1835   ic.cb_cls = cb_cls;
1836   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1837 }
1838
1839 /**
1840  * If we have an active connection to the given target, it must be shutdown.
1841  *
1842  * @param target peer to disconnect from
1843  */
1844 void
1845 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1846 {
1847   struct NeighbourMapEntry *n;
1848
1849   // This can happen during shutdown
1850   if (neighbours == NULL)
1851   {
1852     return;
1853   }
1854
1855   n = lookup_neighbour (target);
1856   if (NULL == n)
1857     return;                     /* not active */
1858   if (is_connected (n))
1859   {
1860     send_disconnect (&n->id, n->address, n->session);
1861
1862     n = lookup_neighbour (target);
1863     if (NULL == n)
1864       return;                   /* gone already */
1865   }
1866   disconnect_neighbour (n);
1867 }
1868
1869
1870 /**
1871  * We received a disconnect message from the given peer,
1872  * validate and process.
1873  *
1874  * @param peer sender of the message
1875  * @param msg the disconnect message
1876  */
1877 void
1878 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
1879                                           *peer,
1880                                           const struct GNUNET_MessageHeader
1881                                           *msg)
1882 {
1883   struct NeighbourMapEntry *n;
1884   const struct SessionDisconnectMessage *sdm;
1885   GNUNET_HashCode hc;
1886
1887 #if DEBUG_TRANSPORT
1888   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1889               "Received DISCONNECT message from peer `%s'\n",
1890               GNUNET_i2s (peer));
1891 #endif
1892
1893   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
1894   {
1895     // GNUNET_break_op (0);
1896     GNUNET_STATISTICS_update (GST_stats,
1897                               gettext_noop
1898                               ("# disconnect messages ignored (old format)"), 1,
1899                               GNUNET_NO);
1900     return;
1901   }
1902   sdm = (const struct SessionDisconnectMessage *) msg;
1903   n = lookup_neighbour (peer);
1904   if (NULL == n)
1905     return;                     /* gone already */
1906   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
1907       n->connect_ts.abs_value)
1908   {
1909     GNUNET_STATISTICS_update (GST_stats,
1910                               gettext_noop
1911                               ("# disconnect messages ignored (timestamp)"), 1,
1912                               GNUNET_NO);
1913     return;
1914   }
1915   GNUNET_CRYPTO_hash (&sdm->public_key,
1916                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1917                       &hc);
1918   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
1919   {
1920     GNUNET_break_op (0);
1921     return;
1922   }
1923   if (ntohl (sdm->purpose.size) !=
1924       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1925       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1926       sizeof (struct GNUNET_TIME_AbsoluteNBO))
1927   {
1928     GNUNET_break_op (0);
1929     return;
1930   }
1931   if (GNUNET_OK !=
1932       GNUNET_CRYPTO_rsa_verify
1933       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
1934        &sdm->signature, &sdm->public_key))
1935   {
1936     GNUNET_break_op (0);
1937     return;
1938   }
1939   GST_neighbours_force_disconnect (peer);
1940 }
1941
1942
1943 /**
1944  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
1945  * Consider switching to it.
1946  *
1947  * @param message possibly a 'struct SessionConnectMessage' (check format)
1948  * @param peer identity of the peer to switch the address for
1949  * @param address address of the other peer, NULL if other peer
1950  *                       connected to us
1951  * @param session session to use (or NULL)
1952  * @param ats performance data
1953  * @param ats_count number of entries in ats
1954  */
1955 void
1956 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
1957                                    const struct GNUNET_PeerIdentity *peer,
1958                                    const struct GNUNET_HELLO_Address *address,
1959                                    struct Session *session,
1960                                    const struct GNUNET_ATS_Information *ats,
1961                                    uint32_t ats_count)
1962 {
1963   const struct SessionConnectMessage *scm;
1964   struct QuotaSetMessage q_msg;
1965   struct GNUNET_MessageHeader msg;
1966   struct NeighbourMapEntry *n;
1967   size_t msg_len;
1968   size_t ret;
1969
1970 #if DEBUG_TRANSPORT
1971   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1972               "Received CONNECT_ACK message from peer `%s'\n",
1973               GNUNET_i2s (peer));
1974 #endif
1975   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
1976   {
1977     GNUNET_break_op (0);
1978     return;
1979   }
1980   scm = (const struct SessionConnectMessage *) message;
1981   GNUNET_break_op (ntohl (scm->reserved) == 0);
1982   n = lookup_neighbour (peer);
1983   if (NULL == n)
1984   {
1985     /* we did not send 'CONNECT' (how could we? no record for this peer!) */
1986     GNUNET_break_op (0);
1987     return;
1988   }  
1989   if (n->state != S_CONNECT_SENT)
1990   {
1991     GNUNET_STATISTICS_update (GST_stats,
1992                               gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
1993                               GNUNET_NO);
1994     return;
1995   }
1996   if (NULL != session)
1997     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1998                      "transport-ats",
1999                      "Giving ATS session %p of plugin %s for peer %s\n",
2000                      session, address->transport_name, GNUNET_i2s (peer));
2001   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2002   GNUNET_assert (NULL != n->address);
2003   change_state (n, S_CONNECTED);
2004   GST_validation_set_address_use (&n->id,
2005                                   n->address,
2006                                   n->session,
2007                                   GNUNET_YES);
2008   GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2009 #if DEBUG_TRANSPORT
2010   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2011               "Setting inbound quota of %u for peer `%s' to \n",
2012               ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id));
2013 #endif
2014   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2015
2016   /* send ACK (ACK) */
2017   msg_len = sizeof (msg);
2018   msg.size = htons (msg_len);
2019   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2020
2021   ret =
2022       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2023                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2024                         n->address, GNUNET_YES, NULL,
2025                         NULL);
2026
2027   if (ret == GNUNET_SYSERR)
2028     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2029                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2030                 GNUNET_i2s (&n->id), 
2031                 GST_plugins_a2s (n->address), n->session);
2032
2033
2034   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2035     n->keepalive_task =
2036       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2037                                     &neighbour_keepalive_task, n);
2038   
2039   neighbours_connected++;
2040   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2041                             GNUNET_NO);
2042 #if DEBUG_TRANSPORT
2043   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2044               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2045               GNUNET_i2s (&n->id),
2046               GST_plugins_a2s (n->address), n->session,
2047               __LINE__);
2048 #endif
2049   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2050
2051 #if DEBUG_TRANSPORT
2052   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2053               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2054               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2055 #endif
2056   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2057   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2058   q_msg.quota = n->bandwidth_out;
2059   q_msg.peer = (*peer);
2060   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2061
2062 }
2063
2064
2065 void
2066 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2067                            const struct GNUNET_PeerIdentity *peer,
2068                            const struct GNUNET_HELLO_Address *address,
2069                            struct Session *session,
2070                            const struct GNUNET_ATS_Information *ats,
2071                            uint32_t ats_count)
2072 {
2073   struct NeighbourMapEntry *n;
2074   struct QuotaSetMessage q_msg;
2075
2076 #if DEBUG_TRANSPORT
2077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2078               "Received ACK message from peer `%s'\n",
2079               GNUNET_i2s (peer));
2080 #endif
2081
2082   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2083   {
2084     GNUNET_break_op (0);
2085     return;
2086   }
2087   n = lookup_neighbour (peer);
2088   if (NULL == n)
2089   {
2090     send_disconnect (peer, address,
2091                      session);
2092     GNUNET_break (0);
2093     return;
2094   }
2095   if (S_CONNECTED == n->state)
2096     return;
2097   if (!is_connecting(n))
2098   {
2099     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2100                               GNUNET_NO);
2101     return;
2102   }
2103   if (NULL != session)
2104     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2105                      "transport-ats",
2106                      "Giving ATS session %p of plugin %s for peer %s\n",
2107                      session, address->transport_name, GNUNET_i2s (peer));
2108   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2109   GNUNET_assert (n->address != NULL);
2110   change_state (n, S_CONNECTED);
2111   GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2112
2113   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2114
2115   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2116     n->keepalive_task =
2117         GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2118                                       &neighbour_keepalive_task, n);
2119   GST_validation_set_address_use (&n->id,
2120                                   n->address,
2121                                   n->session,
2122                                   GNUNET_YES);
2123   neighbours_connected++;
2124   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2125                             GNUNET_NO);
2126   
2127 #if DEBUG_TRANSPORT
2128   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2129               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2130               GNUNET_i2s (&n->id),
2131               GST_plugins_a2s (n->address), n->session,
2132               __LINE__);
2133 #endif
2134   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2135 #if DEBUG_TRANSPORT
2136   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2137               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2138               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2139 #endif
2140   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2141   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2142   q_msg.quota = n->bandwidth_out;
2143   q_msg.peer = (*peer);
2144   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2145 }
2146
2147 struct BlackListCheckContext
2148 {
2149   struct GNUNET_ATS_Information *ats;
2150
2151   uint32_t ats_count;
2152
2153   struct Session *session;
2154
2155   struct GNUNET_HELLO_Address *address;
2156
2157   struct GNUNET_TIME_Absolute ts;
2158 };
2159
2160
2161 static void
2162 handle_connect_blacklist_cont (void *cls,
2163                                const struct GNUNET_PeerIdentity *peer,
2164                                int result)
2165 {
2166   struct NeighbourMapEntry *n;
2167   struct BlackListCheckContext *bcc = cls;
2168
2169 #if DEBUG_TRANSPORT
2170   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2171               "Blacklist check due to CONNECT message: `%s'\n",
2172               GNUNET_i2s (peer),
2173               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2174 #endif
2175
2176   /* not allowed */
2177   if (GNUNET_OK != result)
2178   {
2179     GNUNET_HELLO_address_free (bcc->address);
2180     GNUNET_free (bcc);
2181     return;
2182   }
2183
2184   n = lookup_neighbour (peer);
2185   if (NULL == n)
2186     n = setup_neighbour (peer);
2187
2188   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2189   {
2190     if (NULL != bcc->session)
2191       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2192                        "transport-ats",
2193                        "Giving ATS session %p of address `%s' for peer %s\n",
2194                        bcc->session, 
2195                        GST_plugins_a2s (bcc->address),
2196                        GNUNET_i2s (peer));
2197     GNUNET_ATS_address_update (GST_ats, bcc->address,
2198                                bcc->session, bcc->ats, bcc->ats_count);
2199     n->connect_ts = bcc->ts;
2200   }
2201
2202   GNUNET_free (bcc);
2203
2204   if (n->state != S_CONNECT_RECV)
2205     change_state (n, S_CONNECT_RECV);
2206
2207   /* Ask ATS for an address to connect via that address */
2208   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2209     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2210   n->ats_suggest =
2211       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2212                                     n);
2213   GNUNET_ATS_suggest_address (GST_ats, peer);
2214 }
2215
2216 /**
2217  * We received a 'SESSION_CONNECT' message from the other peer.
2218  * Consider switching to it.
2219  *
2220  * @param message possibly a 'struct SessionConnectMessage' (check format)
2221  * @param peer identity of the peer to switch the address for
2222  * @param address address of the other peer, NULL if other peer
2223  *                       connected to us
2224  * @param session session to use (or NULL)
2225  * @param ats performance data
2226  * @param ats_count number of entries in ats (excluding 0-termination)
2227  */
2228 void
2229 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2230                                const struct GNUNET_PeerIdentity *peer,
2231                                const struct GNUNET_HELLO_Address *address,
2232                                struct Session *session,
2233                                const struct GNUNET_ATS_Information *ats,
2234                                uint32_t ats_count)
2235 {
2236   const struct SessionConnectMessage *scm;
2237   struct NeighbourMapEntry *n;
2238   struct BlackListCheckContext *bcc = NULL;
2239
2240 #if DEBUG_TRANSPORT
2241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2242               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2243 #endif
2244
2245   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2246   {
2247     GNUNET_break_op (0);
2248     return;
2249   }
2250
2251   scm = (const struct SessionConnectMessage *) message;
2252   GNUNET_break_op (ntohl (scm->reserved) == 0);
2253
2254   n = lookup_neighbour (peer);
2255   if ( (n != NULL) &&
2256        (S_CONNECTED == n->state) )
2257   {
2258     /* connected peer switches addresses */
2259     GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2260     return;
2261   }
2262
2263   /* we are not connected to this peer */
2264   /* do blacklist check */
2265   bcc =
2266       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2267                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2268   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2269   bcc->ats_count = ats_count + 1;
2270   bcc->address = GNUNET_HELLO_address_copy (address);
2271   bcc->session = session;
2272   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2273   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2274   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2275   bcc->ats[ats_count].value = htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2276   GST_blacklist_test_allowed (peer, address->transport_name, handle_connect_blacklist_cont,
2277                               bcc);
2278 }
2279
2280
2281 /* end of file gnunet-service-transport_neighbours.c */