fix
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours?
52  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
53  * send 3 keepalives in each interval, so 3 messages would need to be
54  * lost in a row for a disconnect).
55  */
56 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
57
58
59 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
60
61
62 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15)
63
64
65 /**
66  * Entry in neighbours.
67  */
68 struct NeighbourMapEntry;
69
70 /**
71  * Message a peer sends to another to indicate its
72  * preference for communicating via a particular
73  * session (and the desire to establish a real
74  * connection).
75  */
76 struct SessionConnectMessage
77 {
78   /**
79    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
80    */
81   struct GNUNET_MessageHeader header;
82
83   /**
84    * Always zero.
85    */
86   uint32_t reserved GNUNET_PACKED;
87
88   /**
89    * Absolute time at the sender.  Only the most recent connect
90    * message implies which session is preferred by the sender.
91    */
92   struct GNUNET_TIME_AbsoluteNBO timestamp;
93
94 };
95
96
97 struct SessionDisconnectMessage
98 {
99   /**
100    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Always zero.
106    */
107   uint32_t reserved GNUNET_PACKED;
108
109   /**
110    * Purpose of the signature.  Extends over the timestamp.
111    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
112    */
113   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
114
115   /**
116    * Absolute time at the sender.  Only the most recent connect
117    * message implies which session is preferred by the sender.
118    */
119   struct GNUNET_TIME_AbsoluteNBO timestamp;
120
121   /**
122    * Public key of the sender.
123    */
124   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
125
126   /**
127    * Signature of the peer that sends us the disconnect.  Only
128    * valid if the timestamp is AFTER the timestamp from the
129    * corresponding 'CONNECT' message.
130    */
131   struct GNUNET_CRYPTO_RsaSignature signature;
132
133 };
134
135
136 /**
137  * For each neighbour we keep a list of messages
138  * that we still want to transmit to the neighbour.
139  */
140 struct MessageQueue
141 {
142
143   /**
144    * This is a doubly linked list.
145    */
146   struct MessageQueue *next;
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *prev;
152
153   /**
154    * Once this message is actively being transmitted, which
155    * neighbour is it associated with?
156    */
157   struct NeighbourMapEntry *n;
158
159   /**
160    * Function to call once we're done.
161    */
162   GST_NeighbourSendContinuation cont;
163
164   /**
165    * Closure for 'cont'
166    */
167   void *cont_cls;
168
169   /**
170    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
171    * stuck together in memory.  Allocated at the end of this struct.
172    */
173   const char *message_buf;
174
175   /**
176    * Size of the message buf
177    */
178   size_t message_buf_size;
179
180   /**
181    * At what time should we fail?
182    */
183   struct GNUNET_TIME_Absolute timeout;
184
185 };
186
187
188 enum State
189 {
190   /**
191    * fresh peer or completely disconnected 
192    */
193   S_NOT_CONNECTED,
194
195   /**
196    * sent CONNECT message to other peer, waiting for CONNECT_ACK 
197    */
198   S_CONNECT_SENT,
199
200   /**
201    * received CONNECT message to other peer, sending CONNECT_ACK 
202    */
203   S_CONNECT_RECV,
204
205   /**
206    * received ACK or payload 
207    */
208   S_CONNECTED,
209
210   /**
211    * Disconnect in progress 
212    */
213   S_DISCONNECT
214 };
215
216
217 /**
218  * Entry in neighbours.
219  */
220 struct NeighbourMapEntry
221 {
222
223   /**
224    * Head of list of messages we would like to send to this peer;
225    * must contain at most one message per client.
226    */
227   struct MessageQueue *messages_head;
228
229   /**
230    * Tail of list of messages we would like to send to this peer; must
231    * contain at most one message per client.
232    */
233   struct MessageQueue *messages_tail;
234
235   /**
236    * Performance data for the peer.
237    */
238   //struct GNUNET_ATS_Information *ats;
239
240   /**
241    * Are we currently trying to send a message? If so, which one?
242    */
243   struct MessageQueue *is_active;
244
245   /**
246    * Active session for communicating with the peer.
247    */
248   struct Session *session;
249
250   /**
251    * Address we currently use.
252    */
253   struct GNUNET_HELLO_Address *address;
254
255   /**
256    * Identity of this neighbour.
257    */
258   struct GNUNET_PeerIdentity id;
259
260   /**
261    * ID of task scheduled to run when this peer is about to
262    * time out (will free resources associated with the peer).
263    */
264   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
265
266   /**
267    * ID of task scheduled to send keepalives.
268    */
269   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
270
271   /**
272    * ID of task scheduled to run when we should try transmitting
273    * the head of the message queue.
274    */
275   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
276
277   /**
278    * Tracker for inbound bandwidth.
279    */
280   struct GNUNET_BANDWIDTH_Tracker in_tracker;
281
282   /**
283    * Inbound bandwidth from ATS, activated when connection is up
284    */
285   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
286
287   /**
288    * Inbound bandwidth from ATS, activated when connection is up
289    */
290   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
291
292   /**
293    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
294    */
295   struct GNUNET_TIME_Absolute connect_ts;
296
297   /**
298    * Timeout for ATS
299    * We asked ATS for a new address for this peer
300    */
301   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
302
303   /**
304    * Task the resets the peer state after due to an pending
305    * unsuccessful connection setup
306    */
307   GNUNET_SCHEDULER_TaskIdentifier state_reset;
308
309   /**
310    * How often has the other peer (recently) violated the inbound
311    * traffic limit?  Incremented by 10 per violation, decremented by 1
312    * per non-violation (for each time interval).
313    */
314   unsigned int quota_violation_count;
315
316
317   /**
318    * The current state of the peer
319    * Element of enum State
320    */
321   int state;
322
323 };
324
325
326 /**
327  * All known neighbours and their HELLOs.
328  */
329 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
330
331 /**
332  * Closure for connect_notify_cb and disconnect_notify_cb
333  */
334 static void *callback_cls;
335
336 /**
337  * Function to call when we connected to a neighbour.
338  */
339 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
340
341 /**
342  * Function to call when we disconnected from a neighbour.
343  */
344 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
345
346 /**
347  * counter for connected neighbours
348  */
349 static int neighbours_connected;
350
351 /**
352  * Lookup a neighbour entry in the neighbours hash map.
353  *
354  * @param pid identity of the peer to look up
355  * @return the entry, NULL if there is no existing record
356  */
357 static struct NeighbourMapEntry *
358 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
359 {
360   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
361 }
362
363 #define change_state(n, state, ...) change (n, state, __LINE__)
364
365 static int
366 is_connecting (struct NeighbourMapEntry *n)
367 {
368   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
369     return GNUNET_YES;
370   return GNUNET_NO;
371 }
372
373 static int
374 is_connected (struct NeighbourMapEntry *n)
375 {
376   if (n->state == S_CONNECTED)
377     return GNUNET_YES;
378   return GNUNET_NO;
379 }
380
381 static int
382 is_disconnecting (struct NeighbourMapEntry *n)
383 {
384   if (n->state == S_DISCONNECT)
385     return GNUNET_YES;
386   return GNUNET_NO;
387 }
388
389 static const char *
390 print_state (int state)
391 {
392   switch (state)
393   {
394   case S_CONNECTED:
395     return "S_CONNECTED";
396     break;
397   case S_CONNECT_RECV:
398     return "S_CONNECT_RECV";
399     break;
400   case S_CONNECT_SENT:
401     return "S_CONNECT_SENT";
402     break;
403   case S_DISCONNECT:
404     return "S_DISCONNECT";
405     break;
406   case S_NOT_CONNECTED:
407     return "S_NOT_CONNECTED";
408     break;
409   default:
410     GNUNET_break (0);
411     break;
412   }
413   return NULL;
414 }
415
416 static int
417 change (struct NeighbourMapEntry *n, int state, int line);
418
419 static void
420 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
421
422 static void
423 reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
424 {
425   struct NeighbourMapEntry *n = cls;
426
427   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
428
429 #if DEBUG_TRANSPORT
430 #endif
431   /* This jut a temporary debug message to check if a the value
432    * SETUP_CONNECTION_TIMEOUT was choosen to small for slow machines
433    */
434   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
435               "Information for developers: Connection to peer `%s' %s failed in state `%s', resetting connection attempt \n",
436               GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
437               print_state (n->state));
438
439   GNUNET_STATISTICS_update (GST_stats,
440                             gettext_noop
441                             ("# failed connection attempts due to timeout"), 1,
442                             GNUNET_NO);
443
444   /* resetting state */
445   n->state = S_NOT_CONNECTED;
446
447   /* destroying address */
448   GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
449
450   /* request new address */
451   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
452     GNUNET_SCHEDULER_cancel (n->ats_suggest);
453   n->ats_suggest =
454       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
455                                     n);
456   GNUNET_ATS_suggest_address (GST_ats, &n->id);
457 }
458
459 static int
460 change (struct NeighbourMapEntry *n, int state, int line)
461 {
462   /* allowed transitions */
463   int allowed = GNUNET_NO;
464
465   switch (n->state)
466   {
467   case S_NOT_CONNECTED:
468     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
469         (state == S_DISCONNECT))
470       allowed = GNUNET_YES;
471     break;
472   case S_CONNECT_RECV:
473     allowed = GNUNET_YES;
474     break;
475   case S_CONNECT_SENT:
476     allowed = GNUNET_YES;
477     break;
478   case S_CONNECTED:
479     if (state == S_DISCONNECT)
480       allowed = GNUNET_YES;
481     break;
482   case S_DISCONNECT:
483     break;
484   default:
485     GNUNET_break (0);
486     break;
487   }
488   if (allowed == GNUNET_NO)
489   {
490     char *old = GNUNET_strdup (print_state (n->state));
491     char *new = GNUNET_strdup (print_state (state));
492     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
493                 "Illegal state transition from `%s' to `%s' in line %u \n", old,
494                 new, line);
495     GNUNET_break (0);
496     GNUNET_free (old);
497     GNUNET_free (new);
498     return GNUNET_SYSERR;
499   }
500
501   n->state = state;
502 #if DEBUG_TRANSPORT
503   {
504     char *old = GNUNET_strdup (print_state (n->state));
505     char *new = GNUNET_strdup (print_state (state));
506     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
507                 "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
508                 GNUNET_i2s (&n->id), n, old, new, line);
509     GNUNET_free (old);
510     GNUNET_free (new);
511   }
512 #endif
513   switch (n->state)
514   {
515   case S_CONNECT_RECV:
516   case S_CONNECT_SENT:
517     if (n->state_reset != GNUNET_SCHEDULER_NO_TASK)
518       GNUNET_SCHEDULER_cancel (n->state_reset);
519     n->state_reset =
520       GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task,
521                                     n);   
522     break;
523   case S_CONNECTED:
524   case S_NOT_CONNECTED:
525   case S_DISCONNECT:
526     if (GNUNET_SCHEDULER_NO_TASK != n->state_reset)
527     {
528 #if DEBUG_TRANSPORT
529       char *old = GNUNET_strdup (print_state (n->state));
530       char *new = GNUNET_strdup (print_state (state));
531       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532                   "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
533                   GNUNET_i2s (&n->id), GST_plugins_a2s (n->address),
534                   old, new);
535       GNUNET_free (old);
536       GNUNET_free (new);
537 #endif
538       GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
539       GNUNET_SCHEDULER_cancel (n->state_reset);
540       n->state_reset = GNUNET_SCHEDULER_NO_TASK;
541     }
542     break;
543   default:
544     GNUNET_assert (0);
545   }
546   
547
548
549   return GNUNET_OK;
550 }
551
552 static ssize_t
553 send_with_plugin (const struct GNUNET_PeerIdentity *target, const char *msgbuf,
554                   size_t msgbuf_size, uint32_t priority,
555                   struct GNUNET_TIME_Relative timeout, struct Session *session,
556                   const struct GNUNET_HELLO_Address *address,
557                   int force_address, GNUNET_TRANSPORT_TransmitContinuation cont,
558                   void *cont_cls)
559 {
560   struct GNUNET_TRANSPORT_PluginFunctions *papi;
561   size_t ret = GNUNET_SYSERR;
562
563   /* FIXME : ats returns an address with all values 0 */
564   if (address == NULL)
565   {
566     if (cont != NULL)
567       cont (cont_cls, target, GNUNET_SYSERR);
568     return GNUNET_SYSERR;
569   }
570
571   if ((session == NULL) && (address == NULL))
572   {
573     if (cont != NULL)
574       cont (cont_cls, target, GNUNET_SYSERR);
575     return GNUNET_SYSERR;
576   }
577
578   papi = GST_plugins_find (address->transport_name);
579   if (papi == NULL)
580   {
581     if (cont != NULL)
582       cont (cont_cls, target, GNUNET_SYSERR);
583     return GNUNET_SYSERR;
584   }
585
586   ret =
587       papi->send (papi->cls, target, msgbuf, msgbuf_size, 0, timeout, session,
588                   address->address, 
589                   address->address_length, GNUNET_YES, cont, cont_cls);
590
591   if (ret == -1)
592   {
593     if (cont != NULL)
594       cont (cont_cls, target, GNUNET_SYSERR);
595   }
596   return ret;
597 }
598
599 /**
600  * Task invoked to start a transmission to another peer.
601  *
602  * @param cls the 'struct NeighbourMapEntry'
603  * @param tc scheduler context
604  */
605 static void
606 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
607
608
609 /**
610  * We're done with our transmission attempt, continue processing.
611  *
612  * @param cls the 'struct MessageQueue' of the message
613  * @param receiver intended receiver
614  * @param success whether it worked or not
615  */
616 static void
617 transmit_send_continuation (void *cls,
618                             const struct GNUNET_PeerIdentity *receiver,
619                             int success)
620 {
621   struct MessageQueue *mq;
622   struct NeighbourMapEntry *n;
623
624   mq = cls;
625   n = mq->n;
626   if (NULL != n)
627   {
628     GNUNET_assert (n->is_active == mq);
629     n->is_active = NULL;
630     if (success == GNUNET_YES)
631     {
632       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
633       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
634     }
635   }
636 #if DEBUG_TRANSPORT
637   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
638               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
639               (success == GNUNET_OK) ? "successful" : "FAILED");
640 #endif
641   if (NULL != mq->cont)
642     mq->cont (mq->cont_cls, success);
643   GNUNET_free (mq);
644 }
645
646
647 /**
648  * Check the ready list for the given neighbour and if a plugin is
649  * ready for transmission (and if we have a message), do so!
650  *
651  * @param n target peer for which to transmit
652  */
653 static void
654 try_transmission_to_peer (struct NeighbourMapEntry *n)
655 {
656   struct MessageQueue *mq;
657   struct GNUNET_TIME_Relative timeout;
658   ssize_t ret;
659
660   if (n->is_active != NULL)
661   {
662     GNUNET_break (0);
663     return;                     /* transmission already pending */
664   }
665   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
666   {
667     GNUNET_break (0);
668     return;                     /* currently waiting for bandwidth */
669   }
670   while (NULL != (mq = n->messages_head))
671   {
672     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
673     if (timeout.rel_value > 0)
674       break;
675     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
676     n->is_active = mq;
677     mq->n = n;
678     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
679   }
680   if (NULL == mq)
681     return;                     /* no more messages */
682
683   if (GST_plugins_find (n->address->transport_name) == NULL)
684   {
685     GNUNET_break (0);
686     return;
687   }
688   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
689   n->is_active = mq;
690   mq->n = n;
691
692   if ( (n->session == NULL) && (NULL == n->address) )
693   {
694     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
695                 "No address for peer `%s'\n",
696                 GNUNET_i2s (&n->id));
697     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
698     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
699     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
700     return;
701   }
702
703   ret =
704       send_with_plugin (&n->id, mq->message_buf, mq->message_buf_size, 0,
705                         timeout, n->session, n->address, 
706                         GNUNET_YES, &transmit_send_continuation,
707                         mq);
708   if (ret == -1)
709   {
710     /* failure, but 'send' would not call continuation in this case,
711      * so we need to do it here! */
712     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
713   }
714
715 }
716
717
718 /**
719  * Task invoked to start a transmission to another peer.
720  *
721  * @param cls the 'struct NeighbourMapEntry'
722  * @param tc scheduler context
723  */
724 static void
725 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
726 {
727   struct NeighbourMapEntry *n = cls;
728
729   GNUNET_assert (NULL != lookup_neighbour (&n->id));
730   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
731   try_transmission_to_peer (n);
732 }
733
734
735 /**
736  * Initialize the neighbours subsystem.
737  *
738  * @param cls closure for callbacks
739  * @param connect_cb function to call if we connect to a peer
740  * @param disconnect_cb function to call if we disconnect from a peer
741  */
742 void
743 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
744                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
745 {
746   callback_cls = cls;
747   connect_notify_cb = connect_cb;
748   disconnect_notify_cb = disconnect_cb;
749   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
750 }
751
752
753 static void
754 send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target,
755                       int result)
756 {
757 #if DEBUG_TRANSPORT
758   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
759               "Sending DISCONNECT message to peer `%4s': %i\n",
760               GNUNET_i2s (target), result);
761 #endif
762 }
763
764
765 static int
766 send_disconnect (const struct GNUNET_PeerIdentity *target,
767                  const struct GNUNET_HELLO_Address *address,
768                  struct Session *session)
769 {
770   size_t ret;
771   struct SessionDisconnectMessage disconnect_msg;
772
773 #if DEBUG_TRANSPORT
774   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
775               "Sending DISCONNECT message to peer `%4s'\n",
776               GNUNET_i2s (target));
777 #endif
778
779   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
780   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
781   disconnect_msg.reserved = htonl (0);
782   disconnect_msg.purpose.size =
783       htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
784              sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
785              sizeof (struct GNUNET_TIME_AbsoluteNBO));
786   disconnect_msg.purpose.purpose =
787       htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
788   disconnect_msg.timestamp =
789       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
790   disconnect_msg.public_key = GST_my_public_key;
791   GNUNET_assert (GNUNET_OK ==
792                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
793                                          &disconnect_msg.purpose,
794                                          &disconnect_msg.signature));
795
796   ret =
797       send_with_plugin (target, (const char *) &disconnect_msg,
798                         sizeof (disconnect_msg), UINT32_MAX,
799                         GNUNET_TIME_UNIT_FOREVER_REL, session, address,
800                         GNUNET_YES,
801                         &send_disconnect_cont, NULL);
802
803   if (ret == GNUNET_SYSERR)
804     return GNUNET_SYSERR;
805
806   GNUNET_STATISTICS_update (GST_stats,
807                             gettext_noop
808                             ("# peers disconnected due to external request"), 1,
809                             GNUNET_NO);
810   return GNUNET_OK;
811 }
812
813 /**
814  * Disconnect from the given neighbour, clean up the record.
815  *
816  * @param n neighbour to disconnect from
817  */
818 static void
819 disconnect_neighbour (struct NeighbourMapEntry *n)
820 {
821   struct MessageQueue *mq;
822   int is_connected;
823
824   is_connected = (n->state == S_CONNECTED);
825
826   /* send DISCONNECT MESSAGE */
827   if (is_connected || is_connecting (n))
828   {
829     if (GNUNET_OK ==
830         send_disconnect (&n->id, n->address,
831                          n->session))
832       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
833                   GNUNET_i2s (&n->id));
834     else
835       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836                   "Could not send DISCONNECT_MSG to `%s'\n",
837                   GNUNET_i2s (&n->id));
838   }
839
840   if (is_connected)
841   {
842      GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
843   }
844
845
846   if (is_disconnecting (n))
847     return;
848   change_state (n, S_DISCONNECT);
849   GST_validation_set_address_use (&n->id,
850                                   n->address,
851                                   n->session,
852                                   GNUNET_NO);
853
854   if (n->address != NULL)
855   {
856     struct GNUNET_TRANSPORT_PluginFunctions *papi;
857     papi = GST_plugins_find (n->address->transport_name);
858     if (papi != NULL)
859       papi->disconnect (papi->cls, &n->id);
860   }
861
862   while (NULL != (mq = n->messages_head))
863   {
864     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
865     if (NULL != mq->cont)
866       mq->cont (mq->cont_cls, GNUNET_SYSERR);
867     GNUNET_free (mq);
868   }
869   if (NULL != n->is_active)
870   {
871     n->is_active->n = NULL;
872     n->is_active = NULL;
873   }
874   if (is_connected)
875   {
876     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
877     GNUNET_SCHEDULER_cancel (n->keepalive_task);
878     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
879     GNUNET_assert (neighbours_connected > 0);
880     neighbours_connected--;
881     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
882                               GNUNET_NO);
883     disconnect_notify_cb (callback_cls, &n->id);
884   }
885   GNUNET_assert (GNUNET_YES ==
886                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
887                                                        &n->id.hashPubKey, n));
888   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
889   {
890     GNUNET_SCHEDULER_cancel (n->ats_suggest);
891     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
892   }
893   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
894   {
895     GNUNET_SCHEDULER_cancel (n->timeout_task);
896     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
897   }
898   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
899   {
900     GNUNET_SCHEDULER_cancel (n->transmission_task);
901     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
902   }
903   if (NULL != n->address)
904   {
905     GNUNET_HELLO_address_free (n->address);
906     n->address = NULL;
907   }
908   n->session = NULL;
909   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
910               GNUNET_i2s (&n->id), n);
911   GNUNET_free (n);
912 }
913
914
915 /**
916  * Peer has been idle for too long. Disconnect.
917  *
918  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
919  * @param tc scheduler context
920  */
921 static void
922 neighbour_timeout_task (void *cls,
923                         const struct GNUNET_SCHEDULER_TaskContext *tc)
924 {
925   struct NeighbourMapEntry *n = cls;
926
927   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
928
929   GNUNET_STATISTICS_update (GST_stats,
930                             gettext_noop
931                             ("# peers disconnected due to timeout"), 1,
932                             GNUNET_NO);
933   disconnect_neighbour (n);
934 }
935
936
937 /**
938  * Send another keepalive message.
939  *
940  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
941  * @param tc scheduler context
942  */
943 static void
944 neighbour_keepalive_task (void *cls,
945                           const struct GNUNET_SCHEDULER_TaskContext *tc)
946 {
947   struct NeighbourMapEntry *n = cls;
948   struct GNUNET_MessageHeader m;
949
950   n->keepalive_task =
951       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
952                                     &neighbour_keepalive_task, n);
953   GNUNET_assert (S_CONNECTED == n->state);
954   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1,
955                             GNUNET_NO);
956   m.size = htons (sizeof (struct GNUNET_MessageHeader));
957   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
958
959   send_with_plugin (&n->id, (const void *) &m, sizeof (m),
960                     UINT32_MAX /* priority */ ,
961                     GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->address, 
962                     GNUNET_YES, NULL, NULL);
963 }
964
965
966 /**
967  * Disconnect from the given neighbour.
968  *
969  * @param cls unused
970  * @param key hash of neighbour's public key (not used)
971  * @param value the 'struct NeighbourMapEntry' of the neighbour
972  */
973 static int
974 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
975 {
976   struct NeighbourMapEntry *n = value;
977
978 #if DEBUG_TRANSPORT
979   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
980               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
981 #endif
982   if (S_CONNECTED == n->state)
983     GNUNET_STATISTICS_update (GST_stats,
984                               gettext_noop
985                               ("# peers disconnected due to global disconnect"),
986                               1, GNUNET_NO);
987   disconnect_neighbour (n);
988   return GNUNET_OK;
989 }
990
991
992 static void
993 ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
994 {
995   struct NeighbourMapEntry *n = cls;
996
997   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
998
999   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000               " ATS did not suggested address to connect to peer `%s'\n",
1001               GNUNET_i2s (&n->id));
1002
1003   disconnect_neighbour (n);
1004 }
1005
1006 /**
1007  * Cleanup the neighbours subsystem.
1008  */
1009 void
1010 GST_neighbours_stop ()
1011 {
1012   // This can happen during shutdown
1013   if (neighbours == NULL)
1014   {
1015     return;
1016   }
1017
1018   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1019                                          NULL);
1020   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1021   GNUNET_assert (neighbours_connected == 0);
1022   neighbours = NULL;
1023   callback_cls = NULL;
1024   connect_notify_cb = NULL;
1025   disconnect_notify_cb = NULL;
1026 }
1027
1028
1029 /**
1030  * We tried to send a SESSION_CONNECT message to another peer.  If this
1031  * succeeded, we change the state.  If it failed, we should tell
1032  * ATS to not use this address anymore (until it is re-validated).
1033  *
1034  * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried
1035  * @param success GNUNET_OK on success
1036  */
1037 static void
1038 send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target,
1039                            int success)
1040 {
1041   struct GNUNET_HELLO_Address *address = cls;
1042   struct NeighbourMapEntry *n;
1043   
1044   if (GNUNET_YES != success)
1045     GNUNET_ATS_address_destroyed (GST_ats, address, NULL);  
1046   if ( (NULL == neighbours) ||
1047        (NULL == (n = lookup_neighbour (&address->peer))) ||
1048        (n->state == S_DISCONNECT) ||
1049        (GNUNET_YES == success) )
1050   {
1051     GNUNET_HELLO_address_free (address);
1052     return;
1053   }
1054 #if DEBUG_TRANSPORT
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n",
1057               GNUNET_i2s (&n->id), 
1058               GST_plugins_a2s (n->address), 
1059               n->session);
1060 #endif
1061   change_state (n, S_NOT_CONNECTED);
1062   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1063     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1064   n->ats_suggest =
1065     GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1066                                   n);
1067   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1068   GNUNET_HELLO_address_free (address);
1069 }
1070
1071
1072 /**
1073  * We tried to switch addresses with an peer already connected. If it failed,
1074  * we should tell ATS to not use this address anymore (until it is re-validated).
1075  *
1076  * @param cls the 'struct NeighbourMapEntry'
1077  * @param success GNUNET_OK on success
1078  */
1079 static void
1080 send_switch_address_continuation (void *cls,
1081                                   const struct GNUNET_PeerIdentity *target,
1082                                   int success)
1083 {
1084   struct NeighbourMapEntry *n = cls;
1085
1086   GNUNET_assert (n != NULL);
1087   if (is_disconnecting (n))
1088     return;                     /* neighbour is going away */
1089
1090   GNUNET_assert (n->state == S_CONNECTED);
1091   if (GNUNET_YES != success)
1092   {
1093 #if DEBUG_TRANSPORT
1094     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095                 "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n",
1096                 GNUNET_i2s (&n->id), 
1097                 GST_plugins_a2s (n->address), n->session);
1098 #endif
1099
1100     GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1101
1102     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1103       GNUNET_SCHEDULER_cancel (n->ats_suggest);
1104     n->ats_suggest =
1105         GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1106                                       n);
1107     GNUNET_ATS_suggest_address (GST_ats, &n->id);
1108     return;
1109   }
1110   /* Tell ATS that switching addresses was successful */
1111   GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
1112 }
1113
1114 /**
1115  * We tried to send a SESSION_CONNECT message to another peer.  If this
1116  * succeeded, we change the state.  If it failed, we should tell
1117  * ATS to not use this address anymore (until it is re-validated).
1118  *
1119  * @param cls the 'struct NeighbourMapEntry'
1120  * @param success GNUNET_OK on success
1121  */
1122 static void
1123 send_connect_ack_continuation (void *cls,
1124                                const struct GNUNET_PeerIdentity *target,
1125                                int success)
1126 {
1127   struct NeighbourMapEntry *n = cls;
1128
1129   GNUNET_assert (n != NULL);
1130
1131   if (GNUNET_YES == success)
1132     return;                     /* sending successful */
1133
1134   /* sending failed, ask for next address  */
1135 #if DEBUG_TRANSPORT
1136   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137               "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n",
1138               GNUNET_i2s (&n->id), 
1139               GST_plugins_a2s (n->address),
1140               n->session);
1141 #endif
1142   change_state (n, S_NOT_CONNECTED);
1143
1144   GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL);
1145
1146   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1147     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1148   n->ats_suggest =
1149       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
1150                                     n);
1151   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1152 }
1153
1154
1155 /**
1156  * For an existing neighbour record, set the active connection to
1157  * the given address.
1158  *
1159  * @param peer identity of the peer to switch the address for
1160  * @param address address of the other peer, NULL if other peer
1161  *                       connected to us
1162  * @param session session to use (or NULL)
1163  * @param ats performance data
1164  * @param ats_count number of entries in ats
1165  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1166  *         connection is not up (yet)
1167  */
1168 int
1169 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1170                                        const struct GNUNET_HELLO_Address *address,
1171                                        struct Session *session,
1172                                        const struct GNUNET_ATS_Information *ats,
1173                                        uint32_t ats_count,
1174                                        struct GNUNET_BANDWIDTH_Value32NBO
1175                                        bandwidth_in,
1176                                        struct GNUNET_BANDWIDTH_Value32NBO
1177                                        bandwidth_out)
1178 {
1179   struct NeighbourMapEntry *n;
1180   struct SessionConnectMessage connect_msg;
1181   size_t msg_len;
1182   size_t ret;
1183
1184   if (neighbours == NULL)
1185   {
1186     /* This can happen during shutdown */
1187     return GNUNET_NO;
1188   }
1189   n = lookup_neighbour (peer);
1190   if (NULL == n)
1191     return GNUNET_NO;
1192   if (n->state == S_DISCONNECT)
1193   {
1194     /* We are disconnecting, nothing to do here */
1195     return GNUNET_NO;
1196   }
1197   GNUNET_assert (address->transport_name != NULL);
1198   if ( (session == NULL) && (0 == address->address_length) )
1199   {
1200     GNUNET_break_op (0);
1201     GNUNET_ATS_address_destroyed (GST_ats, address, session);
1202     GNUNET_ATS_suggest_address (GST_ats, peer);
1203     return GNUNET_NO;
1204   }
1205
1206   /* checks successful and neighbour != NULL */
1207 #if DEBUG_TRANSPORT
1208   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1209               "ATS tells us to switch to address '%s' session %p for %s peer `%s'\n",
1210               GST_plugins_a2s (address),
1211               session, (S_CONNECTED == n->state) ? "CONNECTED" : "NOT CONNECTED",
1212               GNUNET_i2s (peer));
1213 #endif
1214   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1215   {
1216     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1217     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1218   }
1219   /* do not switch addresses just update quotas */
1220   if ( (n->state == S_CONNECTED) && 
1221        (NULL != n->address) &&
1222        (0 == GNUNET_HELLO_address_cmp (address,
1223                                        n->address)) &&
1224        (n->session == session) )
1225   {
1226     struct QuotaSetMessage q_msg;
1227     
1228 #if DEBUG_TRANSPORT
1229     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230                 "Sending outbound quota of %u Bps and inbound quota of %u Bps for peer `%s' to all clients\n",
1231                 ntohl (n->bandwidth_out.value__),
1232                 ntohl (n->bandwidth_in.value__), GNUNET_i2s (peer));
1233 #endif
1234     
1235     n->bandwidth_in = bandwidth_in;
1236     n->bandwidth_out = bandwidth_out;
1237     GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
1238     
1239     q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1240     q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1241     q_msg.quota = n->bandwidth_out;
1242     q_msg.peer = (*peer);
1243     GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1244     return GNUNET_NO;    
1245   }
1246   if (n->state == S_CONNECTED) 
1247   {
1248     /* mark old address as no longer used */
1249     GNUNET_assert (NULL != n->address);
1250     GST_validation_set_address_use (&n->id,
1251                                     n->address,
1252                                     n->session,
1253                                     GNUNET_NO);
1254     GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO);
1255   }
1256
1257   /* set new address */
1258   if (NULL != n->address)
1259     GNUNET_HELLO_address_free (n->address);
1260   n->address = GNUNET_HELLO_address_copy (address);
1261   n->session = session;
1262   n->bandwidth_in = bandwidth_in;
1263   n->bandwidth_out = bandwidth_out;
1264   GNUNET_SCHEDULER_cancel (n->timeout_task);
1265   n->timeout_task =
1266       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1267                                     &neighbour_timeout_task, n);
1268   switch (n->state)
1269   {
1270   case S_NOT_CONNECTED:  
1271   case S_CONNECT_SENT:
1272     msg_len = sizeof (struct SessionConnectMessage);
1273     connect_msg.header.size = htons (msg_len);
1274     connect_msg.header.type =
1275         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1276     connect_msg.reserved = htonl (0);
1277     connect_msg.timestamp =
1278         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1279     change_state (n, S_CONNECT_SENT);
1280     ret =
1281         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1282                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1283                           address, GNUNET_YES,
1284                           &send_connect_continuation, 
1285                           GNUNET_HELLO_address_copy (address));
1286     return GNUNET_NO;
1287   case S_CONNECT_RECV:
1288     /* We received a CONNECT message and asked ATS for an address */
1289     msg_len = sizeof (struct SessionConnectMessage);
1290     connect_msg.header.size = htons (msg_len);
1291     connect_msg.header.type =
1292         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1293     connect_msg.reserved = htonl (0);
1294     connect_msg.timestamp =
1295         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1296     ret =
1297         send_with_plugin (&n->id, (const void *) &connect_msg, msg_len,
1298                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1299                           address, GNUNET_YES,
1300                           &send_connect_ack_continuation, n);
1301     return GNUNET_NO;
1302   case S_CONNECTED:
1303     /* connected peer is switching addresses */
1304     GST_validation_set_address_use (&n->id,
1305                                     n->address,
1306                                     n->session,
1307                                     GNUNET_YES);
1308     msg_len = sizeof (struct SessionConnectMessage);
1309     connect_msg.header.size = htons (msg_len);
1310     connect_msg.header.type =
1311         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1312     connect_msg.reserved = htonl (0);
1313     connect_msg.timestamp =
1314         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1315     ret =
1316         send_with_plugin (peer, (const char *) &connect_msg, msg_len,
1317                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, session,
1318                           address, GNUNET_YES,
1319                           &send_switch_address_continuation, n);
1320     if (ret == GNUNET_SYSERR)
1321     {
1322       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1323                   "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n",
1324                   GNUNET_i2s (peer), 
1325                   GST_plugins_a2s (address), session);
1326     }
1327     return GNUNET_NO;
1328   default:
1329     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1330                 "Invalid connection state to switch addresses %u \n", n->state);
1331     GNUNET_break_op (0);
1332     return GNUNET_NO;
1333   }
1334 }
1335
1336
1337 /**
1338  * Obtain current latency information for the given neighbour.
1339  *
1340  * @param peer 
1341  * @return observed latency of the address, FOREVER if the address was
1342  *         never successfully validated
1343  */
1344 struct GNUNET_TIME_Relative
1345 GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer)
1346 {
1347   struct NeighbourMapEntry *n;
1348
1349   n = lookup_neighbour (peer);
1350   if ( (NULL == n) ||
1351        ( (n->address == NULL) && (n->session == NULL) ) )
1352     return GNUNET_TIME_UNIT_FOREVER_REL;
1353   return GST_validation_get_address_latency (peer,
1354                                              n->address,
1355                                              n->session);
1356 }
1357
1358
1359 /**
1360  * Create an entry in the neighbour map for the given peer
1361  *
1362  * @param peer peer to create an entry for
1363  * @return new neighbour map entry
1364  */
1365 static struct NeighbourMapEntry *
1366 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1367 {
1368   struct NeighbourMapEntry *n;
1369
1370 #if DEBUG_TRANSPORT
1371   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1372               "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer));
1373 #endif
1374   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1375   n->id = *peer;
1376   n->state = S_NOT_CONNECTED;
1377   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1378                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1379                                  MAX_BANDWIDTH_CARRY_S);
1380   n->timeout_task =
1381       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1382                                     &neighbour_timeout_task, n);
1383   GNUNET_assert (GNUNET_OK ==
1384                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1385                                                     &n->id.hashPubKey, n,
1386                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1387   return n;
1388 }
1389
1390
1391 /**
1392  * Try to create a connection to the given target (eventually).
1393  *
1394  * @param target peer to try to connect to
1395  */
1396 void
1397 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1398 {
1399   struct NeighbourMapEntry *n;
1400
1401   // This can happen during shutdown
1402   if (neighbours == NULL)
1403   {
1404     return;
1405   }
1406 #if DEBUG_TRANSPORT
1407   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1408               GNUNET_i2s (target));
1409 #endif
1410   if (0 ==
1411       memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity)))
1412   {
1413     /* my own hello */
1414     return;
1415   }
1416   n = lookup_neighbour (target);
1417
1418   if (NULL != n)
1419   {
1420     if ((S_CONNECTED == n->state) || (is_connecting (n)))
1421       return;                   /* already connecting or connected */
1422     if (is_disconnecting (n))
1423       change_state (n, S_NOT_CONNECTED);
1424   }
1425
1426
1427   if (n == NULL)
1428     n = setup_neighbour (target);
1429 #if DEBUG_TRANSPORT
1430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1431               "Asking ATS for suggested address to connect to peer `%s'\n",
1432               GNUNET_i2s (&n->id));
1433 #endif
1434
1435   GNUNET_ATS_suggest_address (GST_ats, &n->id);
1436 }
1437
1438 /**
1439  * Test if we're connected to the given peer.
1440  *
1441  * @param target peer to test
1442  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1443  */
1444 int
1445 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1446 {
1447   struct NeighbourMapEntry *n;
1448
1449   // This can happen during shutdown
1450   if (neighbours == NULL)
1451   {
1452     return GNUNET_NO;
1453   }
1454
1455   n = lookup_neighbour (target);
1456
1457   if ((NULL == n) || (S_CONNECTED != n->state))
1458     return GNUNET_NO;           /* not connected */
1459   return GNUNET_YES;
1460 }
1461
1462
1463 /**
1464  * A session was terminated. Take note.
1465  *
1466  * @param peer identity of the peer where the session died
1467  * @param session session that is gone
1468  */
1469 void
1470 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1471                                    struct Session *session)
1472 {
1473   struct NeighbourMapEntry *n;
1474
1475   if (neighbours == NULL)
1476   {
1477     /* This can happen during shutdown */
1478     return;
1479   }
1480
1481 #if DEBUG_TRANSPORT
1482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n",
1483               session, GNUNET_i2s (peer));
1484 #endif
1485
1486   n = lookup_neighbour (peer);
1487   if (NULL == n)
1488     return;
1489   if (session != n->session)
1490     return;                     /* doesn't affect us */
1491   if (n->state == S_CONNECTED)
1492     GST_validation_set_address_use (&n->id,
1493                                     n->address,
1494                                     n->session,
1495                                     GNUNET_NO);
1496   n->session = NULL;
1497   if (NULL != n->address)
1498   {
1499     GNUNET_HELLO_address_free (n->address);
1500     n->address = NULL;
1501   }
1502   
1503   /* not connected anymore anyway, shouldn't matter */
1504   if ((S_CONNECTED != n->state) && (!is_connecting (n)))
1505     return;
1506   // FIXME: n->state = S_FAST_RECONNECT;
1507
1508   /* We are connected, so ask ATS to switch addresses */
1509   GNUNET_SCHEDULER_cancel (n->timeout_task);
1510   n->timeout_task =
1511       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1512                                     &neighbour_timeout_task, n);
1513   /* try QUICKLY to re-establish a connection, reduce timeout! */
1514   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1515     GNUNET_SCHEDULER_cancel (n->ats_suggest);
1516   n->ats_suggest =
1517       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel,
1518                                     n);
1519   GNUNET_ATS_suggest_address (GST_ats, peer);
1520 }
1521
1522
1523 /**
1524  * Transmit a message to the given target using the active connection.
1525  *
1526  * @param target destination
1527  * @param msg message to send
1528  * @param msg_size number of bytes in msg
1529  * @param timeout when to fail with timeout
1530  * @param cont function to call when done
1531  * @param cont_cls closure for 'cont'
1532  */
1533 void
1534 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1535                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1536                      GST_NeighbourSendContinuation cont, void *cont_cls)
1537 {
1538   struct NeighbourMapEntry *n;
1539   struct MessageQueue *mq;
1540
1541   // This can happen during shutdown
1542   if (neighbours == NULL)
1543   {
1544     return;
1545   }
1546
1547   n = lookup_neighbour (target);
1548   if ((n == NULL) || (!is_connected (n)))
1549   {
1550     GNUNET_STATISTICS_update (GST_stats,
1551                               gettext_noop
1552                               ("# messages not sent (no such peer or not connected)"),
1553                               1, GNUNET_NO);
1554 #if DEBUG_TRANSPORT
1555     if (n == NULL)
1556       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1557                   "Could not send message to peer `%s': unknown neighbour",
1558                   GNUNET_i2s (target));
1559     else if (!is_connected (n))
1560       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1561                   "Could not send message to peer `%s': not connected\n",
1562                   GNUNET_i2s (target));
1563 #endif
1564     if (NULL != cont)
1565       cont (cont_cls, GNUNET_SYSERR);
1566     return;
1567   }
1568
1569   if ((n->session == NULL) && (n->address == NULL) )
1570   {
1571     GNUNET_STATISTICS_update (GST_stats,
1572                               gettext_noop
1573                               ("# messages not sent (no such peer or not connected)"),
1574                               1, GNUNET_NO);
1575 #if DEBUG_TRANSPORT
1576     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1577                 "Could not send message to peer `%s': no address available\n",
1578                 GNUNET_i2s (target));
1579 #endif
1580
1581     if (NULL != cont)
1582       cont (cont_cls, GNUNET_SYSERR);
1583     return;
1584   }
1585
1586   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1587   GNUNET_STATISTICS_update (GST_stats,
1588                             gettext_noop
1589                             ("# bytes in message queue for other peers"),
1590                             msg_size, GNUNET_NO);
1591   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1592   mq->cont = cont;
1593   mq->cont_cls = cont_cls;
1594   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1595   memcpy (&mq[1], msg, msg_size);
1596   mq->message_buf = (const char *) &mq[1];
1597   mq->message_buf_size = msg_size;
1598   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1599   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1600
1601   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1602       (NULL == n->is_active))
1603     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1604 }
1605
1606
1607 /**
1608  * We have received a message from the given sender.  How long should
1609  * we delay before receiving more?  (Also used to keep the peer marked
1610  * as live).
1611  *
1612  * @param sender sender of the message
1613  * @param size size of the message
1614  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1615  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1616  *                   GNUNET_SYSERR if the connection is not fully up yet
1617  * @return how long to wait before reading more from this sender
1618  */
1619 struct GNUNET_TIME_Relative
1620 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1621                                         *sender, ssize_t size, int *do_forward)
1622 {
1623   struct NeighbourMapEntry *n;
1624   struct GNUNET_TIME_Relative ret;
1625
1626   // This can happen during shutdown
1627   if (neighbours == NULL)
1628   {
1629     return GNUNET_TIME_UNIT_FOREVER_REL;
1630   }
1631
1632   n = lookup_neighbour (sender);
1633   if (n == NULL)
1634   {
1635     GST_neighbours_try_connect (sender);
1636     n = lookup_neighbour (sender);
1637     if (NULL == n)
1638     {
1639       GNUNET_STATISTICS_update (GST_stats,
1640                                 gettext_noop
1641                                 ("# messages discarded due to lack of neighbour record"),
1642                                 1, GNUNET_NO);
1643       *do_forward = GNUNET_NO;
1644       return GNUNET_TIME_UNIT_ZERO;
1645     }
1646   }
1647   if (!is_connected (n))
1648   {
1649     *do_forward = GNUNET_SYSERR;
1650     return GNUNET_TIME_UNIT_ZERO;
1651   }
1652   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1653   {
1654     n->quota_violation_count++;
1655 #if DEBUG_TRANSPORT
1656     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1657                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1658                 n->in_tracker.available_bytes_per_s__,
1659                 n->quota_violation_count);
1660 #endif
1661     /* Discount 32k per violation */
1662     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1663   }
1664   else
1665   {
1666     if (n->quota_violation_count > 0)
1667     {
1668       /* try to add 32k back */
1669       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1670       n->quota_violation_count--;
1671     }
1672   }
1673   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1674   {
1675     GNUNET_STATISTICS_update (GST_stats,
1676                               gettext_noop
1677                               ("# bandwidth quota violations by other peers"),
1678                               1, GNUNET_NO);
1679     *do_forward = GNUNET_NO;
1680     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1681   }
1682   *do_forward = GNUNET_YES;
1683   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1684   if (ret.rel_value > 0)
1685   {
1686 #if DEBUG_TRANSPORT
1687     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1688                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1689                 (unsigned long long) n->in_tracker.
1690                 consumption_since_last_update__,
1691                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1692                 (unsigned long long) ret.rel_value);
1693 #endif
1694     GNUNET_STATISTICS_update (GST_stats,
1695                               gettext_noop ("# ms throttling suggested"),
1696                               (int64_t) ret.rel_value, GNUNET_NO);
1697   }
1698   return ret;
1699 }
1700
1701
1702 /**
1703  * Keep the connection to the given neighbour alive longer,
1704  * we received a KEEPALIVE (or equivalent).
1705  *
1706  * @param neighbour neighbour to keep alive
1707  */
1708 void
1709 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1710 {
1711   struct NeighbourMapEntry *n;
1712
1713   // This can happen during shutdown
1714   if (neighbours == NULL)
1715   {
1716     return;
1717   }
1718
1719   n = lookup_neighbour (neighbour);
1720   if (NULL == n)
1721   {
1722     GNUNET_STATISTICS_update (GST_stats,
1723                               gettext_noop
1724                               ("# KEEPALIVE messages discarded (not connected)"),
1725                               1, GNUNET_NO);
1726     return;
1727   }
1728   GNUNET_SCHEDULER_cancel (n->timeout_task);
1729   n->timeout_task =
1730       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1731                                     &neighbour_timeout_task, n);
1732 }
1733
1734
1735 /**
1736  * Change the incoming quota for the given peer.
1737  *
1738  * @param neighbour identity of peer to change qutoa for
1739  * @param quota new quota
1740  */
1741 void
1742 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1743                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1744 {
1745   struct NeighbourMapEntry *n;
1746
1747   // This can happen during shutdown
1748   if (neighbours == NULL)
1749   {
1750     return;
1751   }
1752
1753   n = lookup_neighbour (neighbour);
1754   if (n == NULL)
1755   {
1756     GNUNET_STATISTICS_update (GST_stats,
1757                               gettext_noop
1758                               ("# SET QUOTA messages ignored (no such peer)"),
1759                               1, GNUNET_NO);
1760     return;
1761   }
1762   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1763   if (0 != ntohl (quota.value__))
1764     return;
1765 #if DEBUG_TRANSPORT
1766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1767               GNUNET_i2s (&n->id), "SET_QUOTA");
1768 #endif
1769   if (is_connected (n))
1770     GNUNET_STATISTICS_update (GST_stats,
1771                               gettext_noop ("# disconnects due to quota of 0"),
1772                               1, GNUNET_NO);
1773   disconnect_neighbour (n);
1774 }
1775
1776
1777 /**
1778  * Closure for the neighbours_iterate function.
1779  */
1780 struct IteratorContext
1781 {
1782   /**
1783    * Function to call on each connected neighbour.
1784    */
1785   GST_NeighbourIterator cb;
1786
1787   /**
1788    * Closure for 'cb'.
1789    */
1790   void *cb_cls;
1791 };
1792
1793
1794 /**
1795  * Call the callback from the closure for each connected neighbour.
1796  *
1797  * @param cls the 'struct IteratorContext'
1798  * @param key the hash of the public key of the neighbour
1799  * @param value the 'struct NeighbourMapEntry'
1800  * @return GNUNET_OK (continue to iterate)
1801  */
1802 static int
1803 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1804 {
1805   struct IteratorContext *ic = cls;
1806   struct NeighbourMapEntry *n = value;
1807
1808   if (!is_connected (n))
1809     return GNUNET_OK;
1810
1811   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address);
1812   return GNUNET_OK;
1813 }
1814
1815
1816 /**
1817  * Iterate over all connected neighbours.
1818  *
1819  * @param cb function to call
1820  * @param cb_cls closure for cb
1821  */
1822 void
1823 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1824 {
1825   struct IteratorContext ic;
1826
1827   // This can happen during shutdown
1828   if (neighbours == NULL)
1829   {
1830     return;
1831   }
1832
1833   ic.cb = cb;
1834   ic.cb_cls = cb_cls;
1835   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1836 }
1837
1838 /**
1839  * If we have an active connection to the given target, it must be shutdown.
1840  *
1841  * @param target peer to disconnect from
1842  */
1843 void
1844 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1845 {
1846   struct NeighbourMapEntry *n;
1847
1848   // This can happen during shutdown
1849   if (neighbours == NULL)
1850   {
1851     return;
1852   }
1853
1854   n = lookup_neighbour (target);
1855   if (NULL == n)
1856     return;                     /* not active */
1857   if (is_connected (n))
1858   {
1859     send_disconnect (&n->id, n->address, n->session);
1860
1861     n = lookup_neighbour (target);
1862     if (NULL == n)
1863       return;                   /* gone already */
1864   }
1865   disconnect_neighbour (n);
1866 }
1867
1868
1869 /**
1870  * We received a disconnect message from the given peer,
1871  * validate and process.
1872  *
1873  * @param peer sender of the message
1874  * @param msg the disconnect message
1875  */
1876 void
1877 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity
1878                                           *peer,
1879                                           const struct GNUNET_MessageHeader
1880                                           *msg)
1881 {
1882   struct NeighbourMapEntry *n;
1883   const struct SessionDisconnectMessage *sdm;
1884   GNUNET_HashCode hc;
1885
1886 #if DEBUG_TRANSPORT
1887   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1888               "Received DISCONNECT message from peer `%s'\n",
1889               GNUNET_i2s (peer));
1890 #endif
1891
1892   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
1893   {
1894     // GNUNET_break_op (0);
1895     GNUNET_STATISTICS_update (GST_stats,
1896                               gettext_noop
1897                               ("# disconnect messages ignored (old format)"), 1,
1898                               GNUNET_NO);
1899     return;
1900   }
1901   sdm = (const struct SessionDisconnectMessage *) msg;
1902   n = lookup_neighbour (peer);
1903   if (NULL == n)
1904     return;                     /* gone already */
1905   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
1906       n->connect_ts.abs_value)
1907   {
1908     GNUNET_STATISTICS_update (GST_stats,
1909                               gettext_noop
1910                               ("# disconnect messages ignored (timestamp)"), 1,
1911                               GNUNET_NO);
1912     return;
1913   }
1914   GNUNET_CRYPTO_hash (&sdm->public_key,
1915                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1916                       &hc);
1917   if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity)))
1918   {
1919     GNUNET_break_op (0);
1920     return;
1921   }
1922   if (ntohl (sdm->purpose.size) !=
1923       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1924       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1925       sizeof (struct GNUNET_TIME_AbsoluteNBO))
1926   {
1927     GNUNET_break_op (0);
1928     return;
1929   }
1930   if (GNUNET_OK !=
1931       GNUNET_CRYPTO_rsa_verify
1932       (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose,
1933        &sdm->signature, &sdm->public_key))
1934   {
1935     GNUNET_break_op (0);
1936     return;
1937   }
1938   GST_neighbours_force_disconnect (peer);
1939 }
1940
1941
1942 /**
1943  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
1944  * Consider switching to it.
1945  *
1946  * @param message possibly a 'struct SessionConnectMessage' (check format)
1947  * @param peer identity of the peer to switch the address for
1948  * @param address address of the other peer, NULL if other peer
1949  *                       connected to us
1950  * @param session session to use (or NULL)
1951  * @param ats performance data
1952  * @param ats_count number of entries in ats
1953  */
1954 void
1955 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
1956                                    const struct GNUNET_PeerIdentity *peer,
1957                                    const struct GNUNET_HELLO_Address *address,
1958                                    struct Session *session,
1959                                    const struct GNUNET_ATS_Information *ats,
1960                                    uint32_t ats_count)
1961 {
1962   const struct SessionConnectMessage *scm;
1963   struct QuotaSetMessage q_msg;
1964   struct GNUNET_MessageHeader msg;
1965   struct NeighbourMapEntry *n;
1966   size_t msg_len;
1967   size_t ret;
1968
1969 #if DEBUG_TRANSPORT
1970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1971               "Received CONNECT_ACK message from peer `%s'\n",
1972               GNUNET_i2s (peer));
1973 #endif
1974   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
1975   {
1976     GNUNET_break_op (0);
1977     return;
1978   }
1979   scm = (const struct SessionConnectMessage *) message;
1980   GNUNET_break_op (ntohl (scm->reserved) == 0);
1981   n = lookup_neighbour (peer);
1982   if (NULL == n)
1983   {
1984     /* we did not send 'CONNECT' (how could we? no record for this peer!) */
1985     GNUNET_break_op (0);
1986     return;
1987   }  
1988   if (n->state != S_CONNECT_SENT)
1989   {
1990     GNUNET_STATISTICS_update (GST_stats,
1991                               gettext_noop ("# unexpected CONNECT_ACK messages"), 1,
1992                               GNUNET_NO);
1993     return;
1994   }
1995   if (NULL != session)
1996     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1997                      "transport-ats",
1998                      "Giving ATS session %p of plugin %s for peer %s\n",
1999                      session, address->transport_name, GNUNET_i2s (peer));
2000   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2001   GNUNET_assert (NULL != n->address);
2002   change_state (n, S_CONNECTED);
2003   GST_validation_set_address_use (&n->id,
2004                                   n->address,
2005                                   n->session,
2006                                   GNUNET_YES);
2007   GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2008 #if DEBUG_TRANSPORT
2009   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2010               "Setting inbound quota of %u for peer `%s' to \n",
2011               ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id));
2012 #endif
2013   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2014
2015   /* send ACK (ACK) */
2016   msg_len = sizeof (msg);
2017   msg.size = htons (msg_len);
2018   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
2019
2020   ret =
2021       send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
2022                         GNUNET_TIME_UNIT_FOREVER_REL, n->session,
2023                         n->address, GNUNET_YES, NULL,
2024                         NULL);
2025
2026   if (ret == GNUNET_SYSERR)
2027     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2028                 "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n",
2029                 GNUNET_i2s (&n->id), 
2030                 GST_plugins_a2s (n->address), n->session);
2031
2032
2033   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2034     n->keepalive_task =
2035       GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2036                                     &neighbour_keepalive_task, n);
2037   
2038   neighbours_connected++;
2039   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2040                             GNUNET_NO);
2041 #if DEBUG_TRANSPORT
2042   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2043               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2044               GNUNET_i2s (&n->id),
2045               GST_plugins_a2s (n->address), n->session,
2046               __LINE__);
2047 #endif
2048   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2049
2050 #if DEBUG_TRANSPORT
2051   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2052               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2053               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2054 #endif
2055   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2056   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2057   q_msg.quota = n->bandwidth_out;
2058   q_msg.peer = (*peer);
2059   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2060
2061 }
2062
2063
2064 void
2065 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
2066                            const struct GNUNET_PeerIdentity *peer,
2067                            const struct GNUNET_HELLO_Address *address,
2068                            struct Session *session,
2069                            const struct GNUNET_ATS_Information *ats,
2070                            uint32_t ats_count)
2071 {
2072   struct NeighbourMapEntry *n;
2073   struct QuotaSetMessage q_msg;
2074
2075 #if DEBUG_TRANSPORT
2076   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
2077               "Received ACK message from peer `%s'\n",
2078               GNUNET_i2s (peer));
2079 #endif
2080
2081   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
2082   {
2083     GNUNET_break_op (0);
2084     return;
2085   }
2086   n = lookup_neighbour (peer);
2087   if (NULL == n)
2088   {
2089     send_disconnect (peer, address,
2090                      session);
2091     GNUNET_break (0);
2092     return;
2093   }
2094   if (S_CONNECTED == n->state)
2095     return;
2096   if (!is_connecting(n))
2097   {
2098     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# unexpected ACK messages"), 1,
2099                               GNUNET_NO);
2100     return;
2101   }
2102   if (NULL != session)
2103     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2104                      "transport-ats",
2105                      "Giving ATS session %p of plugin %s for peer %s\n",
2106                      session, address->transport_name, GNUNET_i2s (peer));
2107   GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2108   GNUNET_assert (n->address != NULL);
2109   change_state (n, S_CONNECTED);
2110   GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES);
2111
2112   GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in);
2113
2114   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2115     n->keepalive_task =
2116         GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2117                                       &neighbour_keepalive_task, n);
2118   GST_validation_set_address_use (&n->id,
2119                                   n->address,
2120                                   n->session,
2121                                   GNUNET_YES);
2122   neighbours_connected++;
2123   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2124                             GNUNET_NO);
2125   
2126 #if DEBUG_TRANSPORT
2127   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2128               "Notify about connect of `%4s' using address '%s' session %X LINE %u\n",
2129               GNUNET_i2s (&n->id),
2130               GST_plugins_a2s (n->address), n->session,
2131               __LINE__);
2132 #endif
2133   connect_notify_cb (callback_cls, &n->id, ats, ats_count);  
2134 #if DEBUG_TRANSPORT
2135   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2136               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2137               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2138 #endif
2139   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2140   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2141   q_msg.quota = n->bandwidth_out;
2142   q_msg.peer = (*peer);
2143   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2144 }
2145
2146 struct BlackListCheckContext
2147 {
2148   struct GNUNET_ATS_Information *ats;
2149
2150   uint32_t ats_count;
2151
2152   struct Session *session;
2153
2154   struct GNUNET_HELLO_Address *address;
2155
2156   struct GNUNET_TIME_Absolute ts;
2157 };
2158
2159
2160 static void
2161 handle_connect_blacklist_cont (void *cls,
2162                                const struct GNUNET_PeerIdentity *peer,
2163                                int result)
2164 {
2165   struct NeighbourMapEntry *n;
2166   struct BlackListCheckContext *bcc = cls;
2167
2168 #if DEBUG_TRANSPORT
2169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2170               "Blacklist check due to CONNECT message: `%s'\n",
2171               GNUNET_i2s (peer),
2172               (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2173 #endif
2174
2175   /* not allowed */
2176   if (GNUNET_OK != result)
2177   {
2178     GNUNET_HELLO_address_free (bcc->address);
2179     GNUNET_free (bcc);
2180     return;
2181   }
2182
2183   n = lookup_neighbour (peer);
2184   if (NULL == n)
2185     n = setup_neighbour (peer);
2186
2187   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2188   {
2189     if (NULL != bcc->session)
2190       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2191                        "transport-ats",
2192                        "Giving ATS session %p of address `%s' for peer %s\n",
2193                        bcc->session, 
2194                        GST_plugins_a2s (bcc->address),
2195                        GNUNET_i2s (peer));
2196     GNUNET_ATS_address_update (GST_ats, bcc->address,
2197                                bcc->session, bcc->ats, bcc->ats_count);
2198     n->connect_ts = bcc->ts;
2199   }
2200
2201   GNUNET_free (bcc);
2202
2203   if (n->state != S_CONNECT_RECV)
2204     change_state (n, S_CONNECT_RECV);
2205
2206   /* Ask ATS for an address to connect via that address */
2207   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2208     GNUNET_SCHEDULER_cancel (n->ats_suggest);
2209   n->ats_suggest =
2210       GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel,
2211                                     n);
2212   GNUNET_ATS_suggest_address (GST_ats, peer);
2213 }
2214
2215 /**
2216  * We received a 'SESSION_CONNECT' message from the other peer.
2217  * Consider switching to it.
2218  *
2219  * @param message possibly a 'struct SessionConnectMessage' (check format)
2220  * @param peer identity of the peer to switch the address for
2221  * @param address address of the other peer, NULL if other peer
2222  *                       connected to us
2223  * @param session session to use (or NULL)
2224  * @param ats performance data
2225  * @param ats_count number of entries in ats (excluding 0-termination)
2226  */
2227 void
2228 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2229                                const struct GNUNET_PeerIdentity *peer,
2230                                const struct GNUNET_HELLO_Address *address,
2231                                struct Session *session,
2232                                const struct GNUNET_ATS_Information *ats,
2233                                uint32_t ats_count)
2234 {
2235   const struct SessionConnectMessage *scm;
2236   struct NeighbourMapEntry *n;
2237   struct BlackListCheckContext *bcc = NULL;
2238
2239 #if DEBUG_TRANSPORT
2240   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2241               "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2242 #endif
2243
2244   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2245   {
2246     GNUNET_break_op (0);
2247     return;
2248   }
2249
2250   scm = (const struct SessionConnectMessage *) message;
2251   GNUNET_break_op (ntohl (scm->reserved) == 0);
2252
2253   n = lookup_neighbour (peer);
2254   if ( (n != NULL) &&
2255        (S_CONNECTED == n->state) )
2256   {
2257     /* connected peer switches addresses */
2258     GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count);
2259     return;
2260   }
2261
2262   /* we are not connected to this peer */
2263   /* do blacklist check */
2264   bcc =
2265       GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2266                      sizeof (struct GNUNET_ATS_Information) * (ats_count + 1));
2267   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2268   bcc->ats_count = ats_count + 1;
2269   bcc->address = GNUNET_HELLO_address_copy (address);
2270   bcc->session = session;
2271   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2272   memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count);
2273   bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY);
2274   bcc->ats[ats_count].value = htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value);
2275   GST_blacklist_test_allowed (peer, address->transport_name, handle_connect_blacklist_cont,
2276                               bcc);
2277 }
2278
2279
2280 /* end of file gnunet-service-transport_neighbours.c */