complete state reset functionality
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours_3way.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet-service-transport_blacklist.h"
35 #include "gnunet_constants.h"
36 #include "transport.h"
37
38
39 /**
40  * Size of the neighbour hash map.
41  */
42 #define NEIGHBOUR_TABLE_SIZE 256
43
44 /**
45  * How often must a peer violate bandwidth quotas before we start
46  * to simply drop its messages?
47  */
48 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
49
50 /**
51  * How often do we send KEEPALIVE messages to each of our neighbours?
52  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
53  * send 3 keepalives in each interval, so 3 messages would need to be
54  * lost in a row for a disconnect).
55  */
56 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
57
58
59 #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
60
61
62 #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
63
64
65 /**
66  * Entry in neighbours.
67  */
68 struct NeighbourMapEntry;
69
70 /**
71  * Message a peer sends to another to indicate its
72  * preference for communicating via a particular
73  * session (and the desire to establish a real
74  * connection).
75  */
76 struct SessionConnectMessage
77 {
78   /**
79    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
80    */
81   struct GNUNET_MessageHeader header;
82
83   /**
84    * Always zero.
85    */
86   uint32_t reserved GNUNET_PACKED;
87
88   /**
89    * Absolute time at the sender.  Only the most recent connect
90    * message implies which session is preferred by the sender.
91    */
92   struct GNUNET_TIME_AbsoluteNBO timestamp;
93
94 };
95
96
97 struct SessionDisconnectMessage
98 {
99   /**
100    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
101    */
102   struct GNUNET_MessageHeader header;
103
104   /**
105    * Always zero.
106    */
107   uint32_t reserved GNUNET_PACKED;
108
109   /**
110    * Purpose of the signature.  Extends over the timestamp.
111    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
112    */
113   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
114
115   /**
116    * Absolute time at the sender.  Only the most recent connect
117    * message implies which session is preferred by the sender.
118    */
119   struct GNUNET_TIME_AbsoluteNBO timestamp;
120
121   /**
122    * Public key of the sender.
123    */
124   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key;
125   
126   /**
127    * Signature of the peer that sends us the disconnect.  Only
128    * valid if the timestamp is AFTER the timestamp from the
129    * corresponding 'CONNECT' message.
130    */
131   struct GNUNET_CRYPTO_RsaSignature signature;
132
133 };
134
135
136 /**
137  * For each neighbour we keep a list of messages
138  * that we still want to transmit to the neighbour.
139  */
140 struct MessageQueue
141 {
142
143   /**
144    * This is a doubly linked list.
145    */
146   struct MessageQueue *next;
147
148   /**
149    * This is a doubly linked list.
150    */
151   struct MessageQueue *prev;
152
153   /**
154    * Once this message is actively being transmitted, which
155    * neighbour is it associated with?
156    */
157   struct NeighbourMapEntry *n;
158
159   /**
160    * Function to call once we're done.
161    */
162   GST_NeighbourSendContinuation cont;
163
164   /**
165    * Closure for 'cont'
166    */
167   void *cont_cls;
168
169   /**
170    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
171    * stuck together in memory.  Allocated at the end of this struct.
172    */
173   const char *message_buf;
174
175   /**
176    * Size of the message buf
177    */
178   size_t message_buf_size;
179
180   /**
181    * At what time should we fail?
182    */
183   struct GNUNET_TIME_Absolute timeout;
184
185 };
186
187 enum State
188 {
189     /* fresh peer or completely disconnected */
190     S_NOT_CONNECTED = 0,
191     /* sent CONNECT message to other peer, waiting for CONNECT_ACK */
192     S_CONNECT_SENT = 1,
193     /* received CONNECT message to other peer, sending CONNECT_ACK */
194     S_CONNECT_RECV = 4,
195     /* sent CONNECT_ACK message to other peer, wait for ACK or payload */
196     S_CONNECT_RECV_ACK_SENT = 8,
197     /* received ACK or payload */
198     S_CONNECTED = 16,
199     /* Disconnect in progress */
200     S_DISCONNECT = 32
201 };
202
203 /**
204  * Entry in neighbours.
205  */
206 struct NeighbourMapEntry
207 {
208
209   /**
210    * Head of list of messages we would like to send to this peer;
211    * must contain at most one message per client.
212    */
213   struct MessageQueue *messages_head;
214
215   /**
216    * Tail of list of messages we would like to send to this peer; must
217    * contain at most one message per client.
218    */
219   struct MessageQueue *messages_tail;
220
221   /**
222    * Performance data for the peer.
223    */
224   //struct GNUNET_ATS_Information *ats;
225
226   /**
227    * Are we currently trying to send a message? If so, which one?
228    */
229   struct MessageQueue *is_active;
230
231   /**
232    * Active session for communicating with the peer.
233    */
234   struct Session *session;
235
236   /**
237    * Name of the plugin we currently use.
238    */
239   char *plugin_name;
240
241   /**
242    * Address used for communicating with the peer, NULL for inbound connections.
243    */
244   void *addr;
245
246   /**
247    * Number of bytes in 'addr'.
248    */
249   size_t addrlen;
250
251   /**
252    * Identity of this neighbour.
253    */
254   struct GNUNET_PeerIdentity id;
255
256   /**
257    * ID of task scheduled to run when this peer is about to
258    * time out (will free resources associated with the peer).
259    */
260   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
261
262   /**
263    * ID of task scheduled to send keepalives.
264    */
265   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
266
267   /**
268    * ID of task scheduled to run when we should try transmitting
269    * the head of the message queue.
270    */
271   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
272
273   /**
274    * Tracker for inbound bandwidth.
275    */
276   struct GNUNET_BANDWIDTH_Tracker in_tracker;
277
278   /**
279    * Inbound bandwidth from ATS, activated when connection is up
280    */
281   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in;
282
283   /**
284    * Inbound bandwidth from ATS, activated when connection is up
285    */
286   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out;
287
288   /**
289    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
290    */
291   struct GNUNET_TIME_Absolute connect_ts;
292
293   /**
294    * Timeout for ATS
295    * We asked ATS for a new address for this peer
296    */
297   GNUNET_SCHEDULER_TaskIdentifier ats_suggest;
298
299   /**
300    * Task the resets the peer state after due to an pending
301    * unsuccessful connection setup
302    */
303   GNUNET_SCHEDULER_TaskIdentifier state_reset;
304
305   /**
306    * How often has the other peer (recently) violated the inbound
307    * traffic limit?  Incremented by 10 per violation, decremented by 1
308    * per non-violation (for each time interval).
309    */
310   unsigned int quota_violation_count;
311
312
313   /**
314    * The current state of the peer
315    * Element of enum State
316    */
317   int state;
318
319 };
320
321
322 /**
323  * All known neighbours and their HELLOs.
324  */
325 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
326
327 /**
328  * Closure for connect_notify_cb and disconnect_notify_cb
329  */
330 static void *callback_cls;
331
332 /**
333  * Function to call when we connected to a neighbour.
334  */
335 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
336
337 /**
338  * Function to call when we disconnected from a neighbour.
339  */
340 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
341
342 /**
343  * counter for connected neighbours
344  */
345 static int neighbours_connected;
346
347 /**
348  * Lookup a neighbour entry in the neighbours hash map.
349  *
350  * @param pid identity of the peer to look up
351  * @return the entry, NULL if there is no existing record
352  */
353 static struct NeighbourMapEntry *
354 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
355 {
356   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
357 }
358
359 #define change_state(n, state, ...) change (n, state, __LINE__)
360
361 static int
362 is_connecting (struct NeighbourMapEntry * n)
363 {
364   if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED))
365     return GNUNET_YES;
366   return GNUNET_NO;
367 }
368
369 static int
370 is_connected (struct NeighbourMapEntry * n)
371 {
372   if (n->state == S_CONNECTED)
373     return GNUNET_YES;
374   return GNUNET_NO;
375 }
376
377 static int
378 is_disconnecting (struct NeighbourMapEntry * n)
379 {
380   if (n->state == S_DISCONNECT)
381     return GNUNET_YES;
382   return GNUNET_NO;
383 }
384
385 static const char *
386 print_state (int state)
387 {
388   switch (state) {
389     case S_CONNECTED:
390         return "S_CONNECTED";
391       break;
392     case S_CONNECT_RECV:
393       return "S_CONNECT_RECV";
394       break;
395     case S_CONNECT_RECV_ACK_SENT:
396       return"S_CONNECT_RECV_ACK_SENT";
397       break;
398     case S_CONNECT_SENT:
399       return "S_CONNECT_SENT";
400       break;
401     case S_DISCONNECT:
402       return "S_DISCONNECT";
403       break;
404     case S_NOT_CONNECTED:
405       return "S_NOT_CONNECTED";
406       break;
407     default:
408       GNUNET_break (0);
409       break;
410   }
411   return NULL;
412 }
413
414 static int
415 change (struct NeighbourMapEntry * n, int state, int line);
416
417 static void
418 ats_suggest_cancel (void *cls,
419     const struct GNUNET_SCHEDULER_TaskContext *tc);
420
421 static void
422 reset_task (void *cls,
423             const struct GNUNET_SCHEDULER_TaskContext *tc)
424 {
425   struct NeighbourMapEntry * n = cls;
426
427   n->state_reset = GNUNET_SCHEDULER_NO_TASK;
428
429 #if DEBUG_TRANSPORT
430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
431       "Connection to peer `%s' %s failed in state `%s', resetting connection attempt \n",
432       GNUNET_i2s (&n->id), GST_plugins_a2s(n->plugin_name, n->addr, n->addrlen), print_state(n->state));
433 #endif
434   GNUNET_STATISTICS_update (GST_stats,
435                             gettext_noop ("# failed connection attempts due to timeout"),
436                             1,
437                             GNUNET_NO);
438
439   /* resetting state */
440   n->state = S_NOT_CONNECTED;
441
442   /* destroying address */
443   GNUNET_ATS_address_destroyed (GST_ats,
444                                 &n->id,
445                                 n->plugin_name,
446                                 n->addr,
447                                 n->addrlen,
448                                 NULL);
449
450   /* request new address */
451   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
452     GNUNET_SCHEDULER_cancel(n->ats_suggest);
453   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, n);
454   GNUNET_ATS_suggest_address(GST_ats, &n->id);
455 }
456
457 static int
458 change (struct NeighbourMapEntry * n, int state, int line)
459 {
460   char * old = strdup(print_state(n->state));
461   char * new = strdup(print_state(state));
462
463   /* allowed transitions */
464   int allowed = GNUNET_NO;
465   switch (n->state) {
466   case S_NOT_CONNECTED:
467     if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) ||
468         (state == S_DISCONNECT))
469     {
470       allowed = GNUNET_YES;
471
472       /* Schedule reset task */
473       if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) )
474       {
475         GNUNET_assert (n->state_reset == GNUNET_SCHEDULER_NO_TASK);
476         n->state_reset = GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n);
477       }
478
479       break;
480     }
481     break;
482   case S_CONNECT_RECV:
483     if ((state == S_NOT_CONNECTED) || (state == S_DISCONNECT) ||
484         (state == S_CONNECTED) || /* FIXME SENT -> RECV ISSUE!*/ (state == S_CONNECT_SENT))
485     {
486       if ((state == S_CONNECTED) || (state == S_DISCONNECT) || (state == S_NOT_CONNECTED))
487       {
488 #if DEBUG_TRANSPORT
489         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
490             "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
491             GNUNET_i2s (&n->id), GST_plugins_a2s(n->plugin_name, n->addr, n->addrlen), print_state(n->state), print_state(state));
492 #endif
493         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
494         GNUNET_SCHEDULER_cancel (n->state_reset);
495         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
496       }
497
498       allowed = GNUNET_YES;
499       break;
500     }
501     break;
502   case S_CONNECT_SENT:
503     if ((state == S_NOT_CONNECTED) || (state == S_CONNECTED) ||
504         (state == S_DISCONNECT) || /* FIXME SENT -> RECV ISSUE!*/ (state == S_CONNECT_RECV))
505     {
506       if ((state == S_CONNECTED) || (state == S_DISCONNECT) || (state == S_NOT_CONNECTED))
507       {
508 #if DEBUG_TRANSPORT
509         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510             "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n",
511             GNUNET_i2s (&n->id), GST_plugins_a2s(n->plugin_name, n->addr, n->addrlen), print_state(n->state), print_state(state));
512 #endif
513         GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK);
514         GNUNET_SCHEDULER_cancel (n->state_reset);
515         n->state_reset = GNUNET_SCHEDULER_NO_TASK;
516       }
517
518       allowed = GNUNET_YES;
519       break;
520     }
521     break;
522   case S_CONNECTED:
523     if (state == S_DISCONNECT)
524     {
525       allowed = GNUNET_YES;
526       break;
527     }
528     break;
529   case S_DISCONNECT:
530     /*
531     if (state == S_NOT_CONNECTED)
532     {
533       allowed = GNUNET_YES;
534       break;
535     }*/
536     break;
537   default:
538     GNUNET_break (0);
539     break;
540
541   }
542
543   if (allowed == GNUNET_NO)
544   {
545     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
546         "Illegal state transition from `%s' to `%s' in line %u \n",
547         old, new, line);
548     GNUNET_break (0);
549     GNUNET_free (old);
550     GNUNET_free (new);
551     return GNUNET_SYSERR;
552   }
553
554   n->state = state;
555 #if DEBUG_TRANSPORT
556   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n",
557       GNUNET_i2s (&n->id), n, old, new, line);
558 #endif
559   GNUNET_free (old);
560   GNUNET_free (new);
561   return GNUNET_OK;
562 }
563
564 static ssize_t
565 send_with_plugin ( const struct GNUNET_PeerIdentity * target,
566     const char *msgbuf,
567     size_t msgbuf_size,
568     uint32_t priority,
569     struct GNUNET_TIME_Relative timeout,
570     struct Session * session,
571     const char * plugin_name,
572     const void *addr,
573     size_t addrlen,
574     int force_address,
575     GNUNET_TRANSPORT_TransmitContinuation cont,
576     void *cont_cls)
577
578 {
579   struct GNUNET_TRANSPORT_PluginFunctions *papi;
580   size_t ret = GNUNET_SYSERR;
581
582   papi = GST_plugins_find (plugin_name);
583   if (papi == NULL)
584   {
585     if (cont != NULL)
586       cont (cont_cls, target, GNUNET_SYSERR);
587     return GNUNET_SYSERR;
588   }
589
590   ret = papi->send (papi->cls,
591       target,
592       msgbuf, msgbuf_size,
593       0,
594       timeout,
595       session,
596       addr, addrlen,
597       GNUNET_YES,
598       cont, cont_cls);
599
600   if (ret == -1)
601   {
602     if (cont != NULL)
603       cont (cont_cls, target, GNUNET_SYSERR);
604   }
605   return ret;
606 }
607
608 /**
609  * Task invoked to start a transmission to another peer.
610  *
611  * @param cls the 'struct NeighbourMapEntry'
612  * @param tc scheduler context
613  */
614 static void
615 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
616
617
618 /**
619  * We're done with our transmission attempt, continue processing.
620  *
621  * @param cls the 'struct MessageQueue' of the message
622  * @param receiver intended receiver
623  * @param success whether it worked or not
624  */
625 static void
626 transmit_send_continuation (void *cls,
627                             const struct GNUNET_PeerIdentity *receiver,
628                             int success)
629 {
630   struct MessageQueue *mq;
631   struct NeighbourMapEntry *n;
632
633   mq = cls;
634   n = mq->n;
635   if (NULL != n)
636   {
637     GNUNET_assert (n->is_active == mq);
638     n->is_active = NULL;
639     if (success == GNUNET_YES)
640     {
641       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
642       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
643     }
644   }
645 #if DEBUG_TRANSPORT
646   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n",
647               ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type),
648               (success == GNUNET_OK) ? "successful" : "FAILED");
649 #endif
650   if (NULL != mq->cont)
651     mq->cont (mq->cont_cls, success);
652   GNUNET_free (mq);
653 }
654
655
656 /**
657  * Check the ready list for the given neighbour and if a plugin is
658  * ready for transmission (and if we have a message), do so!
659  *
660  * @param n target peer for which to transmit
661  */
662 static void
663 try_transmission_to_peer (struct NeighbourMapEntry *n)
664 {
665   struct MessageQueue *mq;
666   struct GNUNET_TIME_Relative timeout;
667   ssize_t ret;
668
669   if (n->is_active != NULL)
670   {
671     GNUNET_break (0);
672     return;                     /* transmission already pending */
673   }
674   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
675   {
676     GNUNET_break (0);
677     return;                     /* currently waiting for bandwidth */
678   }
679   while (NULL != (mq = n->messages_head))
680   {
681     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
682     if (timeout.rel_value > 0)
683       break;
684     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
685     n->is_active = mq;
686     mq->n = n;
687     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
688   }
689   if (NULL == mq)
690     return;                     /* no more messages */
691
692   if (GST_plugins_find (n->plugin_name) == NULL)
693   {
694     GNUNET_break (0);
695     return;
696   }
697   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
698   n->is_active = mq;
699   mq->n = n;
700
701   if  ((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0))
702   {
703     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n",
704                 GNUNET_i2s (&n->id));
705     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
706     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
707     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
708     return;
709   }
710
711   ret = send_with_plugin (&n->id,
712                           mq->message_buf, mq->message_buf_size, 0,
713                           timeout,
714                           n->session, n->plugin_name, n->addr, n->addrlen,
715                           GNUNET_YES,
716                           &transmit_send_continuation, mq);
717   if (ret == -1)
718   {
719     /* failure, but 'send' would not call continuation in this case,
720      * so we need to do it here! */
721     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
722   }
723
724 }
725
726
727 /**
728  * Task invoked to start a transmission to another peer.
729  *
730  * @param cls the 'struct NeighbourMapEntry'
731  * @param tc scheduler context
732  */
733 static void
734 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
735 {
736   struct NeighbourMapEntry *n = cls;
737   GNUNET_assert (NULL != lookup_neighbour(&n->id));
738   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
739   try_transmission_to_peer (n);
740 }
741
742
743 /**
744  * Initialize the neighbours subsystem.
745  *
746  * @param cls closure for callbacks
747  * @param connect_cb function to call if we connect to a peer
748  * @param disconnect_cb function to call if we disconnect from a peer
749  */
750 void
751 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
752                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
753 {
754   callback_cls = cls;
755   connect_notify_cb = connect_cb;
756   disconnect_notify_cb = disconnect_cb;
757   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
758 }
759
760
761 static void
762 send_disconnect_cont (void *cls,
763     const struct GNUNET_PeerIdentity * target,
764     int result)
765 {
766 #if DEBUG_TRANSPORT
767   struct NeighbourMapEntry *n = cls;
768   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending DISCONNECT message to peer `%4s': %i\n",
769               GNUNET_i2s (&n->id), result);
770 #endif
771 }
772
773 static int
774 send_disconnect (struct NeighbourMapEntry *n)
775 {
776   size_t ret;
777   struct SessionDisconnectMessage disconnect_msg;
778
779 #if DEBUG_TRANSPORT
780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending DISCONNECT message to peer `%4s'\n",
781               GNUNET_i2s (&n->id));
782 #endif
783
784   disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
785   disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
786   disconnect_msg.reserved = htonl (0);
787   disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
788                                        sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
789                                        sizeof (struct GNUNET_TIME_AbsoluteNBO) );
790   disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
791   disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
792   disconnect_msg.public_key = GST_my_public_key;
793   GNUNET_assert (GNUNET_OK ==
794                  GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
795                                          &disconnect_msg.purpose,
796                                          &disconnect_msg.signature));
797
798   ret = send_with_plugin(&n->id, (const char *) &disconnect_msg, sizeof (disconnect_msg),
799                           UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
800                           n->session, n->plugin_name, n->addr, n->addrlen,
801                           GNUNET_YES, &send_disconnect_cont, n);
802
803   if (ret == GNUNET_SYSERR)
804     return GNUNET_SYSERR;
805
806   GNUNET_STATISTICS_update (GST_stats,
807                             gettext_noop ("# peers disconnected due to external request"), 1,
808                             GNUNET_NO);
809   return GNUNET_OK;
810 }
811
812 /**
813  * Disconnect from the given neighbour, clean up the record.
814  *
815  * @param n neighbour to disconnect from
816  */
817 static void
818 disconnect_neighbour (struct NeighbourMapEntry *n)
819 {
820   struct MessageQueue *mq;
821   int was_connected = is_connected(n);
822
823   /* send DISCONNECT MESSAGE */
824   if (is_connected(n) || is_connecting(n))
825   {
826     if (GNUNET_OK == send_disconnect(n))
827       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n",
828                   GNUNET_i2s (&n->id));
829     else
830       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not send DISCONNECT_MSG to `%s'\n",
831                   GNUNET_i2s (&n->id));
832   }
833
834
835   if (is_disconnecting(n))
836     return;
837   change_state (n, S_DISCONNECT);
838
839   while (NULL != (mq = n->messages_head))
840   {
841     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
842     if (NULL != mq->cont)
843       mq->cont (mq->cont_cls, GNUNET_SYSERR);
844     GNUNET_free (mq);
845   }
846   if (NULL != n->is_active)
847   {
848     n->is_active->n = NULL;
849     n->is_active = NULL;
850   }
851   if (was_connected)
852   {
853     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
854     GNUNET_SCHEDULER_cancel (n->keepalive_task);
855     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;  
856     GNUNET_assert (neighbours_connected > 0);
857     neighbours_connected--;
858     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
859                               GNUNET_NO);
860     disconnect_notify_cb (callback_cls, &n->id);
861   }
862   GNUNET_assert (GNUNET_YES ==
863                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
864                                                        &n->id.hashPubKey, n));
865   if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest)
866   {
867     GNUNET_SCHEDULER_cancel (n->ats_suggest);
868     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
869   }
870   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
871   {
872     GNUNET_SCHEDULER_cancel (n->timeout_task);
873     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
874   }
875   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
876   {
877     GNUNET_SCHEDULER_cancel (n->transmission_task);
878     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
879   }
880   if (NULL != n->plugin_name)
881   {
882     GNUNET_free (n->plugin_name);
883     n->plugin_name = NULL;
884   }
885   if (NULL != n->addr)
886   {
887     GNUNET_free (n->addr);
888     n->addr = NULL;
889     n->addrlen = 0;
890   }
891   n->session = NULL;
892   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n",
893               GNUNET_i2s (&n->id), n);
894   GNUNET_free (n);
895 }
896
897
898 /**
899  * Peer has been idle for too long. Disconnect.
900  *
901  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
902  * @param tc scheduler context
903  */
904 static void
905 neighbour_timeout_task (void *cls,
906                         const struct GNUNET_SCHEDULER_TaskContext *tc)
907 {
908   struct NeighbourMapEntry *n = cls;
909
910   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
911
912   GNUNET_STATISTICS_update (GST_stats,
913                             gettext_noop ("# peers disconnected due to timeout"), 1,
914                             GNUNET_NO);
915   disconnect_neighbour (n);
916 }
917
918
919 /**
920  * Send another keepalive message.
921  *
922  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
923  * @param tc scheduler context
924  */
925 static void
926 neighbour_keepalive_task (void *cls,
927                           const struct GNUNET_SCHEDULER_TaskContext *tc)
928 {
929   struct NeighbourMapEntry *n = cls;
930   struct GNUNET_MessageHeader m;
931
932   n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
933                                                     &neighbour_keepalive_task,
934                                                     n);
935   GNUNET_assert (is_connected(n));
936   GNUNET_STATISTICS_update (GST_stats,
937                             gettext_noop ("# keepalives sent"), 1,
938                             GNUNET_NO);
939   m.size = htons (sizeof (struct GNUNET_MessageHeader));
940   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
941
942   send_with_plugin(&n->id, (const void *) &m,
943                    sizeof (m),
944                    UINT32_MAX /* priority */ ,
945                    GNUNET_TIME_UNIT_FOREVER_REL,
946                    n->session, n->plugin_name, n->addr, n->addrlen,
947                    GNUNET_YES, NULL, NULL);
948 }
949
950
951 /**
952  * Disconnect from the given neighbour.
953  *
954  * @param cls unused
955  * @param key hash of neighbour's public key (not used)
956  * @param value the 'struct NeighbourMapEntry' of the neighbour
957  */
958 static int
959 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
960 {
961   struct NeighbourMapEntry *n = value;
962
963 #if DEBUG_TRANSPORT
964   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
965               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
966 #endif
967   if (is_connected(n))
968     GNUNET_STATISTICS_update (GST_stats,
969                               gettext_noop ("# peers disconnected due to global disconnect"), 1,
970                               GNUNET_NO);
971   disconnect_neighbour (n);
972   return GNUNET_OK;
973 }
974
975
976 static void
977 ats_suggest_cancel (void *cls,
978     const struct GNUNET_SCHEDULER_TaskContext *tc)
979 {
980   struct NeighbourMapEntry *n = cls;
981
982   n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
983
984   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985               " ATS did not suggested address to connect to peer `%s'\n",
986               GNUNET_i2s (&n->id));
987
988   disconnect_neighbour(n);
989 }
990
991
992 /**
993  * Cleanup the neighbours subsystem.
994  */
995 void
996 GST_neighbours_stop ()
997 {
998   GNUNET_assert (neighbours != NULL);
999
1000   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
1001                                          NULL);
1002   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
1003   GNUNET_assert (neighbours_connected == 0);
1004   neighbours = NULL;
1005   callback_cls = NULL;
1006   connect_notify_cb = NULL;
1007   disconnect_notify_cb = NULL;
1008 }
1009
1010
1011 /**
1012  * We tried to send a SESSION_CONNECT message to another peer.  If this
1013  * succeeded, we change the state.  If it failed, we should tell
1014  * ATS to not use this address anymore (until it is re-validated).
1015  *
1016  * @param cls the 'struct NeighbourMapEntry'
1017  * @param success GNUNET_OK on success
1018  */
1019 static void
1020 send_connect_continuation (void *cls,
1021       const struct GNUNET_PeerIdentity * target,
1022       int success)
1023
1024 {
1025   struct NeighbourMapEntry *n = cls;
1026
1027   GNUNET_assert (n != NULL);
1028   GNUNET_assert (!is_connected(n));
1029
1030   if (is_disconnecting(n))
1031     return; /* neighbour is going away */
1032   if (GNUNET_YES != success)
1033   {
1034 #if DEBUG_TRANSPORT
1035     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036               "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1037               GNUNET_i2s (&n->id), n->plugin_name,
1038               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name,
1039                                                                   n->addr,
1040                                                                   n->addrlen),
1041               n->session);
1042 #endif
1043
1044     GNUNET_ATS_address_destroyed (GST_ats,
1045                                   &n->id,
1046                                   n->plugin_name, 
1047                                   n->addr,
1048                                   n->addrlen,
1049                                   NULL);
1050
1051     if (n->ats_suggest!= GNUNET_SCHEDULER_NO_TASK)
1052       GNUNET_SCHEDULER_cancel(n->ats_suggest);
1053     n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, n);
1054     GNUNET_ATS_suggest_address(GST_ats, &n->id);
1055     return;
1056   }
1057   change_state(n, S_CONNECT_SENT);
1058 }
1059
1060
1061 /**
1062  * We tried to switch addresses with an peer already connected. If it failed,
1063  * we should tell ATS to not use this address anymore (until it is re-validated).
1064  *
1065  * @param cls the 'struct NeighbourMapEntry'
1066  * @param success GNUNET_OK on success
1067  */
1068 static void
1069 send_switch_address_continuation (void *cls,
1070       const struct GNUNET_PeerIdentity * target,
1071       int success)
1072
1073 {
1074   struct NeighbourMapEntry *n = cls;
1075
1076   GNUNET_assert (n != NULL);
1077   if (is_disconnecting(n))
1078     return; /* neighbour is going away */
1079
1080   GNUNET_assert (n->state == S_CONNECTED);
1081   if (GNUNET_YES != success)
1082   {
1083 #if DEBUG_TRANSPORT
1084     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085               "Failed to switch connected peer `%s' to plugin `%s' address '%s' session %X, asking ATS for new address \n",
1086               GNUNET_i2s (&n->id), n->plugin_name,
1087               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name,
1088                                                                   n->addr,
1089                                                                   n->addrlen),
1090               n->session);
1091 #endif
1092
1093     GNUNET_ATS_address_destroyed (GST_ats,
1094                                   &n->id,
1095                                   n->plugin_name,
1096                                   n->addr,
1097                                   n->addrlen,
1098                                   NULL);
1099
1100     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1101       GNUNET_SCHEDULER_cancel(n->ats_suggest);
1102     n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, n);
1103     GNUNET_ATS_suggest_address(GST_ats, &n->id);
1104     return;
1105   }
1106 }
1107
1108 /**
1109  * We tried to send a SESSION_CONNECT message to another peer.  If this
1110  * succeeded, we change the state.  If it failed, we should tell
1111  * ATS to not use this address anymore (until it is re-validated).
1112  *
1113  * @param cls the 'struct NeighbourMapEntry'
1114  * @param success GNUNET_OK on success
1115  */
1116 static void
1117 send_connect_ack_continuation (void *cls,
1118       const struct GNUNET_PeerIdentity * target,
1119       int success)
1120
1121 {
1122   struct NeighbourMapEntry *n = cls;
1123
1124   GNUNET_assert (n != NULL);
1125
1126   if (GNUNET_YES == success)
1127     return; /* sending successful */
1128
1129   /* sending failed, ask for next address  */
1130 #if DEBUG_TRANSPORT
1131     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1132               "Failed to send CONNECT_MSG to peer `%4s' with plugin `%s' address '%s' session %X, asking ATS for new address \n",
1133               GNUNET_i2s (&n->id), n->plugin_name,
1134               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name,
1135                                                                   n->addr,
1136                                                                   n->addrlen),
1137               n->session);
1138 #endif
1139     change_state(n, S_NOT_CONNECTED);
1140
1141     GNUNET_ATS_address_destroyed (GST_ats,
1142                                   &n->id,
1143                                   n->plugin_name,
1144                                   n->addr,
1145                                   n->addrlen,
1146                                   NULL);
1147
1148     if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1149       GNUNET_SCHEDULER_cancel(n->ats_suggest);
1150     n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, n);
1151     GNUNET_ATS_suggest_address(GST_ats, &n->id);
1152 }
1153
1154 /**
1155  * For an existing neighbour record, set the active connection to
1156  * the given address.
1157  *
1158  * @param peer identity of the peer to switch the address for
1159  * @param plugin_name name of transport that delivered the PONG
1160  * @param address address of the other peer, NULL if other peer
1161  *                       connected to us
1162  * @param address_len number of bytes in address
1163  * @param session session to use (or NULL)
1164  * @param ats performance data
1165  * @param ats_count number of entries in ats
1166  * @return GNUNET_YES if we are currently connected, GNUNET_NO if the
1167  *         connection is not up (yet)
1168  */
1169 int
1170 GST_neighbours_switch_to_address_3way (const struct GNUNET_PeerIdentity *peer,
1171                                   const char *plugin_name, const void *address,
1172                                   size_t address_len, struct Session *session,
1173                                   const struct GNUNET_ATS_Information
1174                                   *ats, uint32_t ats_count,
1175                                   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1176                                   struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out)
1177 {
1178   struct NeighbourMapEntry *n;
1179   struct SessionConnectMessage connect_msg;
1180   size_t msg_len;
1181   size_t ret;
1182
1183   GNUNET_assert (neighbours != NULL);
1184   n = lookup_neighbour (peer);
1185   if (NULL == n)
1186   {
1187     if (NULL == session)
1188       GNUNET_ATS_address_destroyed (GST_ats,
1189                                     peer,
1190                                     plugin_name, address,
1191                                     address_len, NULL);    
1192     return GNUNET_NO;
1193   }
1194
1195   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1196   {
1197     GNUNET_SCHEDULER_cancel(n->ats_suggest);
1198     n->ats_suggest = GNUNET_SCHEDULER_NO_TASK;
1199   }
1200
1201 #if DEBUG_TRANSPORT
1202   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203               "ATS tells us to switch to plugin `%s' address '%s' session %X for %s peer `%s'\n",
1204               plugin_name,
1205               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
1206                                                                   address,
1207                                                                   address_len),
1208               session, (is_connected(n) ? "CONNECTED" : "NOT CONNECTED"),
1209               GNUNET_i2s (peer));
1210 #endif
1211
1212   GNUNET_free_non_null (n->addr);
1213   n->addr = GNUNET_malloc (address_len);
1214   memcpy (n->addr, address, address_len);
1215   n->bandwidth_in = bandwidth_in;
1216   n->bandwidth_out = bandwidth_out;
1217   n->addrlen = address_len;
1218   n->session = session;
1219   GNUNET_free_non_null (n->plugin_name);
1220   n->plugin_name = GNUNET_strdup (plugin_name);
1221   GNUNET_SCHEDULER_cancel (n->timeout_task);
1222   n->timeout_task =
1223       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1224                                     &neighbour_timeout_task, n);
1225
1226   if (n->state == S_DISCONNECT)
1227   {
1228     /* We are disconnecting, nothing to do here */
1229     return GNUNET_NO;
1230   }
1231   /* We are not connected/connecting and initiate a fresh connect */
1232   if (n->state == S_NOT_CONNECTED)
1233   {
1234     msg_len = sizeof (struct SessionConnectMessage);
1235     connect_msg.header.size = htons (msg_len);
1236     connect_msg.header.type =
1237         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1238     connect_msg.reserved = htonl (0);
1239     connect_msg.timestamp =
1240         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1241
1242     ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1243                             session, plugin_name, address, address_len,
1244                             GNUNET_YES, &send_connect_continuation, n);
1245
1246     return GNUNET_NO;
1247   }
1248   /* We received a CONNECT message and asked ATS for an address */
1249   else if (n->state == S_CONNECT_RECV)
1250   {
1251     msg_len = sizeof (struct SessionConnectMessage);
1252     connect_msg.header.size = htons (msg_len);
1253     connect_msg.header.type =
1254       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK);
1255     connect_msg.reserved = htonl (0);
1256     connect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1257
1258     ret = send_with_plugin(&n->id, (const void *) &connect_msg, msg_len, UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1259                            session, plugin_name, address, address_len,
1260                            GNUNET_YES, &send_connect_ack_continuation, n);
1261     if (ret == GNUNET_SYSERR)
1262     {
1263       change_state (n, S_NOT_CONNECTED);
1264       GNUNET_break (0);
1265     }
1266     return GNUNET_NO;
1267   }
1268   /* connected peer is switching addresses */
1269   else if (n->state == S_CONNECTED)
1270   {
1271     msg_len = sizeof (struct SessionConnectMessage);
1272     connect_msg.header.size = htons (msg_len);
1273     connect_msg.header.type =
1274         htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
1275     connect_msg.reserved = htonl (0);
1276     connect_msg.timestamp =
1277         GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1278
1279     ret = send_with_plugin (peer, (const char *) &connect_msg, msg_len, UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL,
1280                             session, plugin_name, address, address_len,
1281                             GNUNET_YES, &send_switch_address_continuation, n);
1282     if (ret == GNUNET_SYSERR)
1283     {
1284       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1285                 "Failed to send CONNECT_MESSAGE to `%4s' using plugin `%s' address '%s' session %X\n",
1286                 GNUNET_i2s (peer), plugin_name,
1287                 (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
1288                                                                     address,
1289                                                                     address_len),
1290                 session);
1291     }
1292     return GNUNET_NO;
1293   }
1294   else if (n->state == S_CONNECT_SENT)
1295   {
1296       //FIXME
1297      return GNUNET_NO;
1298   }
1299   GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Invalid connection state to switch addresses %u \n", n->state);
1300   GNUNET_break_op (0);
1301   return GNUNET_NO;
1302 }
1303
1304
1305 /**
1306  * Create an entry in the neighbour map for the given peer
1307  * 
1308  * @param peer peer to create an entry for
1309  * @return new neighbour map entry
1310  */
1311 static struct NeighbourMapEntry *
1312 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
1313 {
1314   struct NeighbourMapEntry *n;
1315
1316 #if DEBUG_TRANSPORT
1317   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318               "Unknown peer `%s', creating new neighbour\n",
1319               GNUNET_i2s (peer));
1320 #endif
1321   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
1322   n->id = *peer;
1323   n->state = S_NOT_CONNECTED;
1324   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
1325                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1326                                  MAX_BANDWIDTH_CARRY_S);
1327   n->timeout_task =
1328     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1329                                   &neighbour_timeout_task, n);
1330   GNUNET_assert (GNUNET_OK ==
1331                  GNUNET_CONTAINER_multihashmap_put (neighbours,
1332                                                     &n->id.hashPubKey, n,
1333                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1334   return n;
1335 }
1336
1337
1338 /**
1339  * Try to create a connection to the given target (eventually).
1340  *
1341  * @param target peer to try to connect to
1342  */
1343 void
1344 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
1345 {
1346   struct NeighbourMapEntry *n;
1347
1348   GNUNET_assert (neighbours != NULL);
1349 #if DEBUG_TRANSPORT
1350   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
1351               GNUNET_i2s (target));
1352 #endif
1353   GNUNET_assert (0 !=
1354                  memcmp (target, &GST_my_identity,
1355                          sizeof (struct GNUNET_PeerIdentity)));
1356   n = lookup_neighbour (target);
1357
1358   if (NULL != n)
1359   {
1360     if ((is_connected(n)) || (is_connecting(n)))
1361       return;                     /* already connecting or connected */
1362     if (is_disconnecting(n))
1363       change_state (n, S_NOT_CONNECTED);
1364   }
1365
1366
1367   if (n == NULL)
1368     n = setup_neighbour (target);
1369 #if DEBUG_TRANSPORT
1370   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1371               "Asking ATS for suggested address to connect to peer `%s'\n",
1372               GNUNET_i2s (&n->id));
1373 #endif
1374
1375    GNUNET_ATS_suggest_address (GST_ats, &n->id);
1376 }
1377
1378 /**
1379  * Test if we're connected to the given peer.
1380  *
1381  * @param target peer to test
1382  * @return GNUNET_YES if we are connected, GNUNET_NO if not
1383  */
1384 int
1385 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
1386 {
1387   struct NeighbourMapEntry *n;
1388
1389   GNUNET_assert (neighbours != NULL);
1390
1391   n = lookup_neighbour (target);
1392
1393   if ((NULL == n) || (!is_connected(n)))
1394     return GNUNET_NO;           /* not connected */
1395   return GNUNET_YES;
1396 }
1397
1398
1399 /**
1400  * A session was terminated. Take note.
1401  *
1402  * @param peer identity of the peer where the session died
1403  * @param session session that is gone
1404  */
1405 void
1406 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
1407                                    struct Session *session)
1408 {
1409   struct NeighbourMapEntry *n;
1410
1411   GNUNET_assert (neighbours != NULL);
1412
1413 #if DEBUG_TRANSPORT
1414   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1415               "Session %X to peer `%s' ended \n",
1416               session, GNUNET_i2s (peer));
1417 #endif
1418
1419   n = lookup_neighbour (peer);
1420   if (NULL == n)
1421     return;
1422   if (session != n->session)
1423     return;                     /* doesn't affect us */
1424
1425   n->session = NULL;
1426   GNUNET_free (n->addr);
1427   n->addr = NULL;
1428   n->addrlen = 0;
1429
1430   /* not connected anymore anyway, shouldn't matter */
1431   if ((!is_connected(n)) && (!is_connecting(n)))
1432     return;
1433
1434   /* We are connected, so ask ATS to switch addresses */
1435   GNUNET_SCHEDULER_cancel (n->timeout_task);
1436   n->timeout_task =
1437       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
1438                                     &neighbour_timeout_task, n);
1439   /* try QUICKLY to re-establish a connection, reduce timeout! */
1440   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
1441     GNUNET_SCHEDULER_cancel(n->ats_suggest);
1442   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, n);
1443   GNUNET_ATS_suggest_address (GST_ats, peer);
1444 }
1445
1446
1447 /**
1448  * Transmit a message to the given target using the active connection.
1449  *
1450  * @param target destination
1451  * @param msg message to send
1452  * @param msg_size number of bytes in msg
1453  * @param timeout when to fail with timeout
1454  * @param cont function to call when done
1455  * @param cont_cls closure for 'cont'
1456  */
1457 void
1458 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
1459                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
1460                      GST_NeighbourSendContinuation cont, void *cont_cls)
1461 {
1462   struct NeighbourMapEntry *n;
1463   struct MessageQueue *mq;
1464
1465   GNUNET_assert (neighbours != NULL);
1466
1467   n = lookup_neighbour (target);
1468   if ((n == NULL) || (!is_connected(n)))
1469   {
1470     GNUNET_STATISTICS_update (GST_stats,
1471                               gettext_noop
1472                               ("# messages not sent (no such peer or not connected)"),
1473                               1, GNUNET_NO);
1474 #if DEBUG_TRANSPORT
1475     if (n == NULL)
1476       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1477                   "Could not send message to peer `%s': unknown neighbour",
1478                   GNUNET_i2s (target));
1479     else if (!is_connected(n))
1480       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1481                   "Could not send message to peer `%s': not connected\n",
1482                   GNUNET_i2s (target));
1483 #endif
1484     if (NULL != cont)
1485       cont (cont_cls, GNUNET_SYSERR);
1486     return;
1487   }
1488
1489   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0))
1490   {
1491     GNUNET_STATISTICS_update (GST_stats,
1492                               gettext_noop
1493                               ("# messages not sent (no such peer or not connected)"),
1494                               1, GNUNET_NO);
1495 #if DEBUG_TRANSPORT
1496       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1497                   "Could not send message to peer `%s': no address available\n",
1498                   GNUNET_i2s (target));
1499 #endif
1500
1501     if (NULL != cont)
1502       cont (cont_cls, GNUNET_SYSERR);
1503     return;
1504   }
1505
1506   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
1507   GNUNET_STATISTICS_update (GST_stats,
1508                             gettext_noop
1509                             ("# bytes in message queue for other peers"),
1510                             msg_size, GNUNET_NO);
1511   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
1512   mq->cont = cont;
1513   mq->cont_cls = cont_cls;
1514   /* FIXME: this memcpy can be up to 7% of our total runtime! */
1515   memcpy (&mq[1], msg, msg_size);
1516   mq->message_buf = (const char *) &mq[1];
1517   mq->message_buf_size = msg_size;
1518   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1519   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
1520
1521   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
1522       (NULL == n->is_active))
1523     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
1524 }
1525
1526
1527 /**
1528  * We have received a message from the given sender.  How long should
1529  * we delay before receiving more?  (Also used to keep the peer marked
1530  * as live).
1531  *
1532  * @param sender sender of the message
1533  * @param size size of the message
1534  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
1535  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
1536  *                   GNUNET_SYSERR if the connection is not fully up yet
1537  * @return how long to wait before reading more from this sender
1538  */
1539 struct GNUNET_TIME_Relative
1540 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
1541                                         *sender, ssize_t size, int *do_forward)
1542 {
1543   struct NeighbourMapEntry *n;
1544   struct GNUNET_TIME_Relative ret;
1545
1546   GNUNET_assert (neighbours != NULL);
1547
1548   n = lookup_neighbour (sender);
1549   if (n == NULL)
1550   {
1551     GST_neighbours_try_connect (sender);
1552     n = lookup_neighbour (sender);
1553     if (NULL == n)
1554     {
1555       GNUNET_STATISTICS_update (GST_stats,
1556                                 gettext_noop
1557                                 ("# messages discarded due to lack of neighbour record"),
1558                                 1, GNUNET_NO);
1559       *do_forward = GNUNET_NO;
1560       return GNUNET_TIME_UNIT_ZERO;
1561     }
1562   }
1563   if (!is_connected(n))
1564   {
1565     *do_forward = GNUNET_SYSERR;
1566     return GNUNET_TIME_UNIT_ZERO;
1567   }
1568   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
1569   {
1570     n->quota_violation_count++;
1571 #if DEBUG_TRANSPORT
1572     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1573                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
1574                 n->in_tracker.available_bytes_per_s__,
1575                 n->quota_violation_count);
1576 #endif
1577     /* Discount 32k per violation */
1578     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
1579   }
1580   else
1581   {
1582     if (n->quota_violation_count > 0)
1583     {
1584       /* try to add 32k back */
1585       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
1586       n->quota_violation_count--;
1587     }
1588   }
1589   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
1590   {
1591     GNUNET_STATISTICS_update (GST_stats,
1592                               gettext_noop
1593                               ("# bandwidth quota violations by other peers"),
1594                               1, GNUNET_NO);
1595     *do_forward = GNUNET_NO;
1596     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1597   }
1598   *do_forward = GNUNET_YES;
1599   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024);
1600   if (ret.rel_value > 0)
1601   {
1602 #if DEBUG_TRANSPORT
1603     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1605                 (unsigned long long) n->in_tracker.
1606                 consumption_since_last_update__,
1607                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1608                 (unsigned long long) ret.rel_value);
1609 #endif
1610     GNUNET_STATISTICS_update (GST_stats,
1611                               gettext_noop ("# ms throttling suggested"),
1612                               (int64_t) ret.rel_value, GNUNET_NO);
1613   }
1614   return ret;
1615 }
1616
1617
1618 /**
1619  * Keep the connection to the given neighbour alive longer,
1620  * we received a KEEPALIVE (or equivalent).
1621  *
1622  * @param neighbour neighbour to keep alive
1623  */
1624 void
1625 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1626 {
1627   struct NeighbourMapEntry *n;
1628
1629   GNUNET_assert (neighbours != NULL);
1630
1631   n = lookup_neighbour (neighbour);
1632   if (NULL == n)
1633   {
1634     GNUNET_STATISTICS_update (GST_stats,
1635                               gettext_noop
1636                               ("# KEEPALIVE messages discarded (not connected)"),
1637                               1, GNUNET_NO);
1638     return;
1639   }
1640   GNUNET_SCHEDULER_cancel (n->timeout_task);
1641   n->timeout_task =
1642       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1643                                     &neighbour_timeout_task, n);
1644 }
1645
1646
1647 /**
1648  * Change the incoming quota for the given peer.
1649  *
1650  * @param neighbour identity of peer to change qutoa for
1651  * @param quota new quota
1652  */
1653 void
1654 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1655                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1656 {
1657   struct NeighbourMapEntry *n;
1658
1659   GNUNET_assert (neighbours != NULL);
1660
1661   n = lookup_neighbour (neighbour);
1662   if (n == NULL)
1663   {
1664     GNUNET_STATISTICS_update (GST_stats,
1665                               gettext_noop
1666                               ("# SET QUOTA messages ignored (no such peer)"),
1667                               1, GNUNET_NO);
1668     return;
1669   }
1670   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1671   if (0 != ntohl (quota.value__))
1672     return;
1673 #if DEBUG_TRANSPORT
1674   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1675               GNUNET_i2s (&n->id), "SET_QUOTA");
1676 #endif
1677   if (is_connected(n))
1678     GNUNET_STATISTICS_update (GST_stats,
1679                               gettext_noop ("# disconnects due to quota of 0"), 1,
1680                               GNUNET_NO);
1681   disconnect_neighbour (n);
1682 }
1683
1684
1685 /**
1686  * Closure for the neighbours_iterate function.
1687  */
1688 struct IteratorContext
1689 {
1690   /**
1691    * Function to call on each connected neighbour.
1692    */
1693   GST_NeighbourIterator cb;
1694
1695   /**
1696    * Closure for 'cb'.
1697    */
1698   void *cb_cls;
1699 };
1700
1701
1702 /**
1703  * Call the callback from the closure for each connected neighbour.
1704  *
1705  * @param cls the 'struct IteratorContext'
1706  * @param key the hash of the public key of the neighbour
1707  * @param value the 'struct NeighbourMapEntry'
1708  * @return GNUNET_OK (continue to iterate)
1709  */
1710 static int
1711 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1712 {
1713   struct IteratorContext *ic = cls;
1714   struct NeighbourMapEntry *n = value;
1715
1716   if (is_connected(n))
1717     return GNUNET_OK;
1718
1719   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->plugin_name, n->addr, n->addrlen);
1720   return GNUNET_OK;
1721 }
1722
1723
1724 /**
1725  * Iterate over all connected neighbours.
1726  *
1727  * @param cb function to call
1728  * @param cb_cls closure for cb
1729  */
1730 void
1731 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1732 {
1733   struct IteratorContext ic;
1734
1735   GNUNET_assert (neighbours != NULL);
1736
1737   ic.cb = cb;
1738   ic.cb_cls = cb_cls;
1739   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1740 }
1741
1742 /**
1743  * If we have an active connection to the given target, it must be shutdown.
1744  *
1745  * @param target peer to disconnect from
1746  */
1747 void
1748 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1749 {
1750   struct NeighbourMapEntry *n;
1751
1752   GNUNET_assert (neighbours != NULL);
1753
1754   n = lookup_neighbour (target);
1755   if (NULL == n)
1756     return;                     /* not active */
1757   if (is_connected(n))
1758   {
1759     send_disconnect(n);
1760
1761     n = lookup_neighbour (target);
1762     if (NULL == n)
1763       return;                     /* gone already */
1764   }
1765   disconnect_neighbour (n);
1766 }
1767
1768
1769 /**
1770  * We received a disconnect message from the given peer,
1771  * validate and process.
1772  * 
1773  * @param peer sender of the message
1774  * @param msg the disconnect message
1775  */
1776 void
1777 GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity *peer,
1778                                           const struct GNUNET_MessageHeader *msg)
1779 {
1780   struct NeighbourMapEntry *n;
1781   const struct SessionDisconnectMessage *sdm;
1782   GNUNET_HashCode hc;
1783
1784 #if DEBUG_TRANSPORT
1785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1786       "Received DISCONNECT message from peer `%s'\n", GNUNET_i2s (peer));
1787 #endif
1788
1789   if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage))
1790   {
1791     // GNUNET_break_op (0);
1792     GNUNET_STATISTICS_update (GST_stats,
1793                               gettext_noop ("# disconnect messages ignored (old format)"), 1,
1794                               GNUNET_NO);
1795     return;
1796   }
1797   sdm = (const struct SessionDisconnectMessage* ) msg;
1798   n = lookup_neighbour (peer);
1799   if (NULL == n)
1800     return;                     /* gone already */
1801   if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <=
1802       n->connect_ts.abs_value)
1803   {
1804     GNUNET_STATISTICS_update (GST_stats,
1805                               gettext_noop ("# disconnect messages ignored (timestamp)"), 1,
1806                               GNUNET_NO);
1807     return;
1808   }
1809   GNUNET_CRYPTO_hash (&sdm->public_key,
1810                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1811                       &hc);
1812   if (0 != memcmp (peer,
1813                    &hc,
1814                    sizeof (struct GNUNET_PeerIdentity)))
1815   {
1816     GNUNET_break_op (0);
1817     return;
1818   }
1819   if (ntohl (sdm->purpose.size) != 
1820       sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1821       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1822       sizeof (struct GNUNET_TIME_AbsoluteNBO))
1823   {
1824     GNUNET_break_op (0);
1825     return;
1826   }
1827   if (GNUNET_OK !=
1828       GNUNET_CRYPTO_rsa_verify (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT,
1829                                 &sdm->purpose,
1830                                 &sdm->signature,
1831                                 &sdm->public_key))
1832   {
1833     GNUNET_break_op (0);
1834     return;
1835   }
1836   GST_neighbours_force_disconnect (peer);
1837 }
1838
1839 /**
1840  * We received a 'SESSION_CONNECT_ACK' message from the other peer.
1841  * Consider switching to it.
1842  *
1843  * @param message possibly a 'struct SessionConnectMessage' (check format)
1844  * @param peer identity of the peer to switch the address for
1845  * @param plugin_name name of transport that delivered the PONG
1846  * @param address address of the other peer, NULL if other peer
1847  *                       connected to us
1848  * @param address_len number of bytes in address
1849  * @param session session to use (or NULL)
1850  * @param ats performance data
1851  * @param ats_count number of entries in ats
1852   */
1853 void
1854 GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message,
1855                                const struct GNUNET_PeerIdentity *peer,
1856                                const char *plugin_name,
1857                                const char *sender_address, uint16_t sender_address_len,
1858                                struct Session *session,
1859                                const struct GNUNET_ATS_Information *ats,
1860                                uint32_t ats_count)
1861 {
1862   const struct SessionConnectMessage *scm;
1863   struct QuotaSetMessage q_msg;
1864   struct GNUNET_MessageHeader msg;
1865   struct NeighbourMapEntry *n;
1866   size_t msg_len;
1867   size_t ret;
1868   int was_connected;
1869
1870 #if DEBUG_TRANSPORT
1871   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1872       "Received CONNECT_ACK message from peer `%s'\n", GNUNET_i2s (peer));
1873 #endif
1874
1875   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
1876   {
1877     GNUNET_break_op (0);
1878     return;
1879   }
1880
1881   scm = (const struct SessionConnectMessage *) message;
1882   GNUNET_break_op (ntohl (scm->reserved) == 0);
1883   n = lookup_neighbour (peer);
1884   if (NULL == n)
1885     n = setup_neighbour (peer);
1886 /*
1887   if (n->state != S_CONNECT_SENT)
1888   {
1889     GNUNET_break (0);
1890     send_disconnect(n);
1891     return;
1892   }
1893 */
1894   if (NULL != session)
1895     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1896                      "transport-ats",
1897                      "Giving ATS session %p of plugin %s for peer %s\n",
1898                      session,
1899                      plugin_name,
1900                      GNUNET_i2s (peer));
1901   GNUNET_ATS_address_update (GST_ats,
1902                              peer,
1903                              plugin_name, sender_address, sender_address_len,
1904                              session, ats, ats_count);
1905
1906   was_connected = is_connected(n);
1907   if (!is_connected(n))
1908     change_state (n, S_CONNECTED);
1909
1910 #if DEBUG_TRANSPORT
1911   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1912               "Setting inbound quota of %u for peer `%s' to \n",
1913               ntohl (n->bandwidth_in.value__), GNUNET_i2s (&n->id));
1914 #endif
1915   GST_neighbours_set_incoming_quota(&n->id, n->bandwidth_in);
1916
1917   /* send ACK (ACK)*/
1918   msg_len =  sizeof (msg);
1919   msg.size = htons (msg_len);
1920   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK);
1921
1922   ret = send_with_plugin (&n->id, (const char *) &msg, msg_len, UINT32_MAX,
1923                           GNUNET_TIME_UNIT_FOREVER_REL,
1924                           n->session, n->plugin_name, n->addr, n->addrlen,
1925                           GNUNET_YES, NULL, NULL);
1926
1927   if (ret == GNUNET_SYSERR)
1928     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1929               "Failed to send SESSION_ACK to `%4s' using plugin `%s' address '%s' session %X\n",
1930               GNUNET_i2s (&n->id), n->plugin_name,
1931               (n->addrlen == 0) ? "<inbound>" : GST_plugins_a2s (n->plugin_name,
1932                                                                  n->addr,
1933                                                                  n->addrlen),
1934               n->session);
1935
1936
1937   if (!was_connected)
1938   {
1939     if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
1940       n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
1941                                                         &neighbour_keepalive_task,
1942                                                         n);
1943
1944     neighbours_connected++;
1945     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
1946                               GNUNET_NO);
1947     connect_notify_cb (callback_cls, &n->id, ats, ats_count);
1948
1949 #if DEBUG_TRANSPORT
1950     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1951                 "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
1952                 ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
1953 #endif
1954     q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
1955     q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1956     q_msg.quota = n->bandwidth_out;
1957     q_msg.peer = (*peer);
1958     GST_clients_broadcast (&q_msg.header, GNUNET_NO);
1959   }
1960 }
1961
1962 void
1963 GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message,
1964     const struct GNUNET_PeerIdentity *peer,
1965     const char *plugin_name,
1966     const char *sender_address, uint16_t sender_address_len,
1967     struct Session *session,
1968     const struct GNUNET_ATS_Information *ats,
1969     uint32_t ats_count)
1970 {
1971   struct NeighbourMapEntry *n;
1972   struct QuotaSetMessage q_msg;
1973
1974 #if DEBUG_TRANSPORT
1975   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1976       "Received ACK message from peer `%s'\n", GNUNET_i2s (peer));
1977 #endif
1978
1979   if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader))
1980   {
1981     GNUNET_break_op (0);
1982     return;
1983   }
1984
1985   n = lookup_neighbour (peer);
1986   if (NULL == n)
1987   {
1988     send_disconnect(n);
1989     GNUNET_break (0);
1990   }
1991 // FIXME check this
1992 //  if (n->state != S_CONNECT_RECV)
1993 /*  if (is_connecting(n))
1994   {
1995     send_disconnect (n);
1996     change_state (n, S_DISCONNECT);
1997     GNUNET_break (0);
1998     return;
1999   }
2000 */
2001   if (is_connected(n))
2002     return;
2003
2004   if (NULL != session)
2005     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2006                      "transport-ats",
2007                      "Giving ATS session %p of plugin %s for peer %s\n",
2008                      session,
2009                      plugin_name,
2010                      GNUNET_i2s (peer));
2011   GNUNET_ATS_address_update (GST_ats,
2012                              peer,
2013                              plugin_name, sender_address, sender_address_len,
2014                              session, ats, ats_count);
2015
2016   change_state (n, S_CONNECTED);
2017
2018   GST_neighbours_set_incoming_quota(&n->id, n->bandwidth_in);
2019
2020   if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK)
2021     n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
2022                                                       &neighbour_keepalive_task,
2023                                                       n);
2024
2025   neighbours_connected++;
2026   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
2027                             GNUNET_NO);
2028   connect_notify_cb (callback_cls, &n->id, ats, ats_count);
2029
2030 #if DEBUG_TRANSPORT
2031   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2032               "Sending outbound quota of %u Bps for peer `%s' to all clients\n",
2033               ntohl (n->bandwidth_out.value__), GNUNET_i2s (peer));
2034 #endif
2035
2036   q_msg.header.size = htons (sizeof (struct QuotaSetMessage));
2037   q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
2038   q_msg.quota = n->bandwidth_out;
2039   q_msg.peer = (*peer);
2040   GST_clients_broadcast (&q_msg.header, GNUNET_NO);
2041 }
2042
2043 struct BlackListCheckContext
2044 {
2045   struct GNUNET_ATS_Information *ats;
2046
2047   uint32_t ats_count;
2048
2049   struct Session *session;
2050
2051   char *sender_address;
2052
2053   uint16_t sender_address_len;
2054
2055   char *plugin_name;
2056
2057   struct GNUNET_TIME_Absolute ts;
2058 };
2059
2060
2061 static void
2062 handle_connect_blacklist_cont (void *cls,
2063                                const struct GNUNET_PeerIdentity
2064                                * peer, int result)
2065 {
2066   struct NeighbourMapEntry *n;
2067   struct BlackListCheckContext * bcc = cls;
2068
2069 #if DEBUG_TRANSPORT
2070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2071       "Blacklist check due to CONNECT message: `%s'\n", GNUNET_i2s (peer), (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN");
2072 #endif
2073
2074   /* not allowed */
2075   if (GNUNET_OK != result)
2076   {
2077     GNUNET_free (bcc);
2078     return;
2079   }
2080
2081   n = lookup_neighbour (peer);
2082   if (NULL == n)
2083     n = setup_neighbour (peer);
2084
2085   if (bcc->ts.abs_value > n->connect_ts.abs_value)
2086   {
2087     if (NULL != bcc->session)
2088       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2089                        "transport-ats",
2090                        "Giving ATS session %p of plugin %s address `%s' for peer %s\n",
2091                        bcc->session,
2092                        bcc->plugin_name,
2093                        GST_plugins_a2s (bcc->plugin_name, bcc->sender_address, bcc->sender_address_len),
2094                        GNUNET_i2s (peer));
2095     GNUNET_ATS_address_update (GST_ats,
2096                                peer,
2097                                bcc->plugin_name, bcc->sender_address, bcc->sender_address_len,
2098                                bcc->session, bcc->ats, bcc->ats_count);
2099     n->connect_ts = bcc->ts;
2100   }
2101
2102   GNUNET_free (bcc);
2103 /*
2104   if (n->state != S_NOT_CONNECTED)
2105     return;*/
2106   change_state (n, S_CONNECT_RECV);
2107
2108   /* Ask ATS for an address to connect via that address */
2109   if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK)
2110     GNUNET_SCHEDULER_cancel(n->ats_suggest);
2111   n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, n);
2112   GNUNET_ATS_suggest_address(GST_ats, peer);
2113 }
2114
2115 /**
2116  * We received a 'SESSION_CONNECT' message from the other peer.
2117  * Consider switching to it.
2118  *
2119  * @param message possibly a 'struct SessionConnectMessage' (check format)
2120  * @param peer identity of the peer to switch the address for
2121  * @param plugin_name name of transport that delivered the PONG
2122  * @param address address of the other peer, NULL if other peer
2123  *                       connected to us
2124  * @param address_len number of bytes in address
2125  * @param session session to use (or NULL)
2126  * @param ats performance data
2127  * @param ats_count number of entries in ats (excluding 0-termination)
2128   */
2129 void
2130 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
2131                                const struct GNUNET_PeerIdentity *peer,
2132                                const char *plugin_name,
2133                                const char *sender_address, uint16_t sender_address_len,
2134                                struct Session *session,
2135                                const struct GNUNET_ATS_Information *ats,
2136                                uint32_t ats_count)
2137 {
2138   const struct SessionConnectMessage *scm;
2139   struct NeighbourMapEntry * n;
2140   struct BlackListCheckContext * bcc = NULL;
2141
2142 #if DEBUG_TRANSPORT
2143   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2144       "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer));
2145 #endif
2146
2147   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
2148   {
2149     GNUNET_break_op (0);
2150     return;
2151   }
2152
2153   scm = (const struct SessionConnectMessage *) message;
2154   GNUNET_break_op (ntohl (scm->reserved) == 0);
2155
2156   n = lookup_neighbour(peer);
2157   if (n != NULL)
2158   {
2159     /* connected peer switches addresses */
2160     if (is_connected(n))
2161     {
2162       GNUNET_ATS_address_update(GST_ats, peer, plugin_name, sender_address, sender_address_len, session, ats, ats_count);
2163       return;
2164     }
2165   }
2166
2167   /* we are not connected to this peer */
2168   /* do blacklist check*/
2169   bcc = GNUNET_malloc (sizeof (struct BlackListCheckContext) +
2170             sizeof (struct GNUNET_ATS_Information) * ats_count +
2171             sender_address_len +
2172             strlen (plugin_name)+1);
2173
2174   bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
2175
2176   bcc->ats_count = ats_count;
2177   bcc->sender_address_len = sender_address_len;
2178   bcc->session = session;
2179
2180   bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1];
2181   memcpy (bcc->ats, ats,sizeof (struct GNUNET_ATS_Information) * ats_count );
2182
2183   bcc->sender_address = (char *) &bcc->ats[ats_count];
2184   memcpy (bcc->sender_address, sender_address , sender_address_len);
2185
2186   bcc->plugin_name = &bcc->sender_address[sender_address_len];
2187   strcpy (bcc->plugin_name, plugin_name);
2188
2189   GST_blacklist_test_allowed (peer, plugin_name, handle_connect_blacklist_cont, bcc);
2190 }
2191
2192
2193 /* end of file gnunet-service-transport_neighbours.c */