Fix NULL dereference.
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48 /**
49  * How often do we send KEEPALIVE messages to each of our neighbours?
50  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
51  * send 3 keepalives in each interval, so 3 messages would need to be
52  * lost in a row for a disconnect).
53  */
54 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
55
56
57 /**
58  * Entry in neighbours.
59  */
60 struct NeighbourMapEntry;
61
62 /**
63  * Message a peer sends to another to indicate its
64  * preference for communicating via a particular
65  * session (and the desire to establish a real
66  * connection).
67  */
68 struct SessionConnectMessage
69 {
70   /**
71    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
72    */
73   struct GNUNET_MessageHeader header;
74
75   /**
76    * Always zero.
77    */
78   uint32_t reserved GNUNET_PACKED;
79
80   /**
81    * Absolute time at the sender.  Only the most recent connect
82    * message implies which session is preferred by the sender.
83    */
84   struct GNUNET_TIME_AbsoluteNBO timestamp;
85
86 };
87
88
89 struct SessionDisconnectMessage
90 {
91   /**
92    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
93    */
94   struct GNUNET_MessageHeader header;
95
96   /**
97    * Always zero.
98    */
99   uint32_t reserved GNUNET_PACKED;
100
101   /**
102    * Purpose of the signature.  Extends over the timestamp.
103    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
104    */
105   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
106
107   /**
108    * Absolute time at the sender.  Only the most recent connect
109    * message implies which session is preferred by the sender.
110    */
111   struct GNUNET_TIME_AbsoluteNBO timestamp;
112
113   /**
114    * Signature of the peer that sends us the disconnect.  Only
115    * valid if the timestamp is AFTER the timestamp from the
116    * corresponding 'CONNECT' message.
117    */
118   struct GNUNET_CRYPTO_RsaSignature signature;
119 };
120
121
122 /**
123  * For each neighbour we keep a list of messages
124  * that we still want to transmit to the neighbour.
125  */
126 struct MessageQueue
127 {
128
129   /**
130    * This is a doubly linked list.
131    */
132   struct MessageQueue *next;
133
134   /**
135    * This is a doubly linked list.
136    */
137   struct MessageQueue *prev;
138
139   /**
140    * Once this message is actively being transmitted, which
141    * neighbour is it associated with?
142    */
143   struct NeighbourMapEntry *n;
144
145   /**
146    * Function to call once we're done.
147    */
148   GST_NeighbourSendContinuation cont;
149
150   /**
151    * Closure for 'cont'
152    */
153   void *cont_cls;
154
155   /**
156    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
157    * stuck together in memory.  Allocated at the end of this struct.
158    */
159   const char *message_buf;
160
161   /**
162    * Size of the message buf
163    */
164   size_t message_buf_size;
165
166   /**
167    * At what time should we fail?
168    */
169   struct GNUNET_TIME_Absolute timeout;
170
171 };
172
173
174 /**
175  * Entry in neighbours.
176  */
177 struct NeighbourMapEntry
178 {
179
180   /**
181    * Head of list of messages we would like to send to this peer;
182    * must contain at most one message per client.
183    */
184   struct MessageQueue *messages_head;
185
186   /**
187    * Tail of list of messages we would like to send to this peer; must
188    * contain at most one message per client.
189    */
190   struct MessageQueue *messages_tail;
191
192   /**
193    * Context for address suggestion.
194    * NULL after we are connected.
195    */
196   struct GNUNET_ATS_SuggestionContext *asc;
197
198   /**
199    * Performance data for the peer.
200    */
201   struct GNUNET_TRANSPORT_ATS_Information *ats;
202
203   /**
204    * Are we currently trying to send a message? If so, which one?
205    */
206   struct MessageQueue *is_active;
207
208   /**
209    * Active session for communicating with the peer.
210    */
211   struct Session *session;
212
213   /**
214    * Name of the plugin we currently use.
215    */
216   char *plugin_name;
217
218   /**
219    * Address used for communicating with the peer, NULL for inbound connections.
220    */
221   void *addr;
222
223   /**
224    * Number of bytes in 'addr'.
225    */
226   size_t addrlen;
227
228   /**
229    * Identity of this neighbour.
230    */
231   struct GNUNET_PeerIdentity id;
232
233   /**
234    * ID of task scheduled to run when this peer is about to
235    * time out (will free resources associated with the peer).
236    */
237   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
238
239   /**
240    * ID of task scheduled to send keepalives.
241    */
242   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
243
244   /**
245    * ID of task scheduled to run when we should try transmitting
246    * the head of the message queue.
247    */
248   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
249
250   /**
251    * Tracker for inbound bandwidth.
252    */
253   struct GNUNET_BANDWIDTH_Tracker in_tracker;
254
255   /**
256    * How often has the other peer (recently) violated the inbound
257    * traffic limit?  Incremented by 10 per violation, decremented by 1
258    * per non-violation (for each time interval).
259    */
260   unsigned int quota_violation_count;
261
262   /**
263    * Number of values in 'ats' array.
264    */
265   unsigned int ats_count;
266
267   /**
268    * Are we already in the process of disconnecting this neighbour?
269    */
270   int in_disconnect;
271
272   /**
273    * Do we currently consider this neighbour connected? (as far as
274    * the connect/disconnect callbacks are concerned)?
275    */
276   int is_connected;
277
278 };
279
280
281 /**
282  * All known neighbours and their HELLOs.
283  */
284 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
285
286 /**
287  * Closure for connect_notify_cb and disconnect_notify_cb
288  */
289 static void *callback_cls;
290
291 /**
292  * Function to call when we connected to a neighbour.
293  */
294 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
295
296 /**
297  * Function to call when we disconnected from a neighbour.
298  */
299 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
300
301 /**
302  * counter for connected neighbours
303  */
304 static int neighbours_connected;
305
306 /**
307  * Lookup a neighbour entry in the neighbours hash map.
308  *
309  * @param pid identity of the peer to look up
310  * @return the entry, NULL if there is no existing record
311  */
312 static struct NeighbourMapEntry *
313 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
314 {
315   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
316 }
317
318
319 /**
320  * Task invoked to start a transmission to another peer.
321  *
322  * @param cls the 'struct NeighbourMapEntry'
323  * @param tc scheduler context
324  */
325 static void
326 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
327
328
329 /**
330  * We're done with our transmission attempt, continue processing.
331  *
332  * @param cls the 'struct MessageQueue' of the message
333  * @param receiver intended receiver
334  * @param success whether it worked or not
335  */
336 static void
337 transmit_send_continuation (void *cls,
338                             const struct GNUNET_PeerIdentity *receiver,
339                             int success)
340 {
341   struct MessageQueue *mq;
342   struct NeighbourMapEntry *n;
343
344   mq = cls;
345   n = mq->n;
346   if (NULL != n)
347   {
348     GNUNET_assert (n->is_active == mq);
349     n->is_active = NULL;
350     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
351     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
352   }
353   if (NULL != mq->cont)
354     mq->cont (mq->cont_cls, success);
355   GNUNET_free (mq);
356 }
357
358
359 /**
360  * Check the ready list for the given neighbour and if a plugin is
361  * ready for transmission (and if we have a message), do so!
362  *
363  * @param n target peer for which to transmit
364  */
365 static void
366 try_transmission_to_peer (struct NeighbourMapEntry *n)
367 {
368   struct MessageQueue *mq;
369   struct GNUNET_TIME_Relative timeout;
370   ssize_t ret;
371   struct GNUNET_TRANSPORT_PluginFunctions *papi;
372
373   if (n->is_active != NULL)
374     return;                     /* transmission already pending */
375   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
376     return;                     /* currently waiting for bandwidth */
377   while (NULL != (mq = n->messages_head))
378   {
379     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
380     if (timeout.rel_value > 0)
381       break;
382     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
383     n->is_active = mq;
384     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
385   }
386   if (NULL == mq)
387     return;                     /* no more messages */
388
389   papi = GST_plugins_find (n->plugin_name);
390   if (papi == NULL)
391   {
392     GNUNET_break (0);
393     return;
394   }
395   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
396   n->is_active = mq;
397   mq->n = n;
398
399   if  (((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0)))
400   {
401     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No address peer for peer `%s'\n",
402                 GNUNET_i2s (&n->id));
403     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
404     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
405     return;
406   }
407
408   ret =
409       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
410                   0 /* priority -- remove from plugin API? */ ,
411                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
412                   &transmit_send_continuation, mq);
413   if (ret == -1)
414   {
415     /* failure, but 'send' would not call continuation in this case,
416      * so we need to do it here! */
417     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
418     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
419   }
420 }
421
422
423 /**
424  * Task invoked to start a transmission to another peer.
425  *
426  * @param cls the 'struct NeighbourMapEntry'
427  * @param tc scheduler context
428  */
429 static void
430 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
431 {
432   struct NeighbourMapEntry *n = cls;
433
434   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
435   try_transmission_to_peer (n);
436 }
437
438
439 /**
440  * Initialize the neighbours subsystem.
441  *
442  * @param cls closure for callbacks
443  * @param connect_cb function to call if we connect to a peer
444  * @param disconnect_cb function to call if we disconnect from a peer
445  */
446 void
447 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
448                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
449 {
450   callback_cls = cls;
451   connect_notify_cb = connect_cb;
452   disconnect_notify_cb = disconnect_cb;
453   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
454 }
455
456
457 /**
458  * Disconnect from the given neighbour, clean up the record.
459  *
460  * @param n neighbour to disconnect from
461  */
462 static void
463 disconnect_neighbour (struct NeighbourMapEntry *n)
464 {
465   struct MessageQueue *mq;
466
467   if (GNUNET_YES == n->in_disconnect)
468     return;
469   n->in_disconnect = GNUNET_YES;
470   while (NULL != (mq = n->messages_head))
471   {
472     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
473     if (NULL != mq->cont)
474       mq->cont (mq->cont_cls, GNUNET_SYSERR);
475     GNUNET_free (mq);
476   }
477   if (NULL != n->is_active)
478   {
479     n->is_active->n = NULL;
480     n->is_active = NULL;
481   }
482   if (GNUNET_YES == n->is_connected)
483   {
484     n->is_connected = GNUNET_NO;
485     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
486     GNUNET_SCHEDULER_cancel (n->keepalive_task);
487     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;  
488     GNUNET_assert (neighbours_connected > 0);
489     neighbours_connected--;
490     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
491                               GNUNET_NO);
492     disconnect_notify_cb (callback_cls, &n->id);
493   }
494   GNUNET_assert (GNUNET_YES ==
495                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
496                                                        &n->id.hashPubKey, n));
497   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
498   {
499     GNUNET_SCHEDULER_cancel (n->timeout_task);
500     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
501   }
502   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
503   {
504     GNUNET_SCHEDULER_cancel (n->transmission_task);
505     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
506   }
507   if (NULL != n->asc)
508   {
509     GNUNET_ATS_suggest_address_cancel (n->asc);
510     n->asc = NULL;
511   }
512   GNUNET_array_grow (n->ats, n->ats_count, 0);
513   if (NULL != n->plugin_name)
514   {
515     GNUNET_free (n->plugin_name);
516     n->plugin_name = NULL;
517   }
518   if (NULL != n->addr)
519   {
520     GNUNET_free (n->addr);
521     n->addr = NULL;
522     n->addrlen = 0;
523   }
524   n->session = NULL;
525   GNUNET_free (n);
526 }
527
528
529 /**
530  * Peer has been idle for too long. Disconnect.
531  *
532  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
533  * @param tc scheduler context
534  */
535 static void
536 neighbour_timeout_task (void *cls,
537                         const struct GNUNET_SCHEDULER_TaskContext *tc)
538 {
539   struct NeighbourMapEntry *n = cls;
540
541   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
542   if (GNUNET_YES == n->is_connected)
543     GNUNET_STATISTICS_update (GST_stats,
544                               gettext_noop ("# peers disconnected due to timeout"), 1,
545                               GNUNET_NO);
546   disconnect_neighbour (n);
547 }
548
549
550 /**
551  * Send another keepalive message.
552  *
553  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
554  * @param tc scheduler context
555  */
556 static void
557 neighbour_keepalive_task (void *cls,
558                           const struct GNUNET_SCHEDULER_TaskContext *tc)
559 {
560   struct NeighbourMapEntry *n = cls;
561   struct GNUNET_MessageHeader m;
562   struct GNUNET_TRANSPORT_PluginFunctions *papi;
563
564   n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
565   GNUNET_assert (GNUNET_YES == n->is_connected);
566   GNUNET_STATISTICS_update (GST_stats,
567                             gettext_noop ("# keepalives sent"), 1,
568                             GNUNET_NO);
569   m.size = htons (sizeof (struct GNUNET_MessageHeader));
570   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
571   papi = GST_plugins_find (n->plugin_name);
572   if (papi != NULL)
573     papi->send (papi->cls, 
574                 &n->id, (const void *) &m,
575                 sizeof (m),
576                 UINT32_MAX /* priority */ ,
577                 GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
578                 GNUNET_YES, NULL, NULL);
579   n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
580                                                     &neighbour_keepalive_task,
581                                                     n);
582 }
583
584
585 /**
586  * Disconnect from the given neighbour.
587  *
588  * @param cls unused
589  * @param key hash of neighbour's public key (not used)
590  * @param value the 'struct NeighbourMapEntry' of the neighbour
591  */
592 static int
593 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
594 {
595   struct NeighbourMapEntry *n = value;
596
597 #if DEBUG_TRANSPORT
598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
599               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
600 #endif
601   if (GNUNET_YES == n->is_connected)
602     GNUNET_STATISTICS_update (GST_stats,
603                               gettext_noop ("# peers disconnected due to global disconnect"), 1,
604                               GNUNET_NO);
605   disconnect_neighbour (n);
606   return GNUNET_OK;
607 }
608
609
610 /**
611  * Cleanup the neighbours subsystem.
612  */
613 void
614 GST_neighbours_stop ()
615 {
616   GNUNET_assert (neighbours != NULL);
617
618   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
619                                          NULL);
620   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
621   GNUNET_assert (neighbours_connected == 0);
622   neighbours = NULL;
623   callback_cls = NULL;
624   connect_notify_cb = NULL;
625   disconnect_notify_cb = NULL;
626 }
627
628
629 /**
630  * For an existing neighbour record, set the active connection to
631  * the given address.
632  *
633  * @param peer identity of the peer to switch the address for
634  * @param plugin_name name of transport that delivered the PONG
635  * @param address address of the other peer, NULL if other peer
636  *                       connected to us
637  * @param address_len number of bytes in address
638  * @param session session to use (or NULL)
639  * @param ats performance data
640  * @param ats_count number of entries in ats (excluding 0-termination)
641  */
642 void
643 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
644                                   const char *plugin_name, const void *address,
645                                   size_t address_len, struct Session *session,
646                                   const struct GNUNET_TRANSPORT_ATS_Information
647                                   *ats, uint32_t ats_count)
648 {
649   struct NeighbourMapEntry *n;
650   struct SessionConnectMessage connect_msg;
651
652   GNUNET_assert (neighbours != NULL);
653
654   n = lookup_neighbour (peer);
655   if (NULL == n)
656   {
657     GNUNET_break (0);
658     return;
659   }
660
661 #if DEBUG_TRANSPORT
662   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
663               "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
664               GNUNET_i2s (peer), plugin_name,
665               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
666                                                                   address,
667                                                                   address_len),
668               session);
669 #endif
670
671   GNUNET_free_non_null (n->addr);
672   n->addr = GNUNET_malloc (address_len);
673   memcpy (n->addr, address, address_len);
674   n->addrlen = address_len;
675   n->session = session;
676   GNUNET_array_grow (n->ats, n->ats_count, ats_count);
677   memcpy (n->ats, ats,
678           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
679   GNUNET_free_non_null (n->plugin_name);
680   n->plugin_name = GNUNET_strdup (plugin_name);
681   GNUNET_SCHEDULER_cancel (n->timeout_task);
682   n->timeout_task =
683       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
684                                     &neighbour_timeout_task, n);
685   connect_msg.header.size = htons (sizeof (struct SessionConnectMessage));
686   connect_msg.header.type =
687       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
688   connect_msg.reserved = htonl (0);
689   connect_msg.timestamp =
690       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
691   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
692                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
693 }
694
695
696 /**
697  * Try to connect to the target peer using the given address
698  *
699  * @param cls the 'struct NeighbourMapEntry' of the target
700  * @param target identity of the target peer
701  * @param plugin_name name of the plugin
702  * @param plugin_address binary address
703  * @param plugin_address_len length of address
704  * @param session session to use
705  * @param bandwidth available bandwidth
706  * @param ats performance data for the address (as far as known)
707  * @param ats_count number of performance records in 'ats'
708  */
709 static void
710 try_connect_using_address (void *cls, const struct GNUNET_PeerIdentity *target,
711                            const char *plugin_name, const void *plugin_address,
712                            size_t plugin_address_len, struct Session *session,
713                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
714                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
715                            uint32_t ats_count)
716 {
717   struct NeighbourMapEntry *n = cls;
718   int was_connected;
719
720   n->asc = NULL;
721   was_connected = n->is_connected;
722   n->is_connected = GNUNET_YES;
723   GST_neighbours_switch_to_address (target, plugin_name, plugin_address,
724                                     plugin_address_len, session, ats,
725                                     ats_count);
726   if (GNUNET_YES == was_connected)
727     return;
728   n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task,
729                                                 n);
730   neighbours_connected++;
731   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
732                             GNUNET_NO);
733   connect_notify_cb (callback_cls, target, n->ats, n->ats_count);
734 }
735
736
737 /**
738  * Try to create a connection to the given target (eventually).
739  *
740  * @param target peer to try to connect to
741  */
742 void
743 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
744 {
745   struct NeighbourMapEntry *n;
746
747   GNUNET_assert (neighbours != NULL);
748
749 #if DEBUG_TRANSPORT
750   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
751               GNUNET_i2s (target));
752 #endif
753
754   GNUNET_assert (0 !=
755                  memcmp (target, &GST_my_identity,
756                          sizeof (struct GNUNET_PeerIdentity)));
757   n = lookup_neighbour (target);
758   if ((NULL != n) && (GNUNET_YES == n->is_connected))
759     return;                     /* already connected */
760   if (n == NULL)
761   {
762 #if DEBUG_TRANSPORT
763     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764                 "Unknown peer `%s', creating new neighbour\n",
765                 GNUNET_i2s (target));
766 #endif
767     n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
768     n->id = *target;
769     GNUNET_array_grow (n->ats, n->ats_count, 1);
770     n->ats[0].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);;
771     n->ats[0].value = htonl (0);
772     GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
773                                    GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
774                                    MAX_BANDWIDTH_CARRY_S);
775     n->timeout_task =
776         GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
777                                       &neighbour_timeout_task, n);
778     GNUNET_assert (GNUNET_OK ==
779                    GNUNET_CONTAINER_multihashmap_put (neighbours,
780                                                       &n->id.hashPubKey, n,
781                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
782   }
783   if (n->asc != NULL)
784     return;                     /* already trying */
785 #if DEBUG_TRANSPORT
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "Asking ATS for suggested address to connect to peer `%s'\n",
788               GNUNET_i2s (target));
789 #endif
790   n->asc =
791       GNUNET_ATS_suggest_address (GST_ats, target, &try_connect_using_address,
792                                   n);
793 }
794
795
796 /**
797  * Test if we're connected to the given peer.
798  *
799  * @param target peer to test
800  * @return GNUNET_YES if we are connected, GNUNET_NO if not
801  */
802 int
803 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
804 {
805   struct NeighbourMapEntry *n;
806
807   GNUNET_assert (neighbours != NULL);
808
809   n = lookup_neighbour (target);
810   if ((NULL == n) || (n->is_connected != GNUNET_YES))
811     return GNUNET_NO;           /* not connected */
812   return GNUNET_YES;
813 }
814
815
816 /**
817  * A session was terminated. Take note.
818  *
819  * @param peer identity of the peer where the session died
820  * @param session session that is gone
821  */
822 void
823 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
824                                    struct Session *session)
825 {
826   struct NeighbourMapEntry *n;
827
828   GNUNET_assert (neighbours != NULL);
829
830 #if DEBUG_TRANSPORT
831   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
832               "Session %X to peer `%s' ended \n",
833               session, GNUNET_i2s (peer));
834 #endif
835
836   n = lookup_neighbour (peer);
837   if (NULL == n)
838     return;
839   if (session != n->session)
840     return;                     /* doesn't affect us */
841
842   n->session = NULL;
843   GNUNET_free (n->addr);
844   n->addr = NULL;
845   n->addrlen = 0;
846
847
848   if (GNUNET_YES != n->is_connected)
849     return;                     /* not connected anymore anyway, shouldn't matter */
850
851   //n->is_connected = GNUNET_NO;
852
853   /* fast disconnect unless ATS suggests a new address */
854   GNUNET_SCHEDULER_cancel (n->timeout_task);
855   n->timeout_task =
856       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
857                                     &neighbour_timeout_task, n);
858   /* try QUICKLY to re-establish a connection, reduce timeout! */
859   if (NULL != n->ats)
860   {
861     /* how can this be!? */
862     //GNUNET_break (0);
863     return;
864   }
865   n->asc =
866       GNUNET_ATS_suggest_address (GST_ats, peer, &try_connect_using_address, n);
867 }
868
869
870 /**
871  * Transmit a message to the given target using the active connection.
872  *
873  * @param target destination
874  * @param msg message to send
875  * @param msg_size number of bytes in msg
876  * @param timeout when to fail with timeout
877  * @param cont function to call when done
878  * @param cont_cls closure for 'cont'
879  */
880 void
881 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
882                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
883                      GST_NeighbourSendContinuation cont, void *cont_cls)
884 {
885   struct NeighbourMapEntry *n;
886   struct MessageQueue *mq;
887
888   GNUNET_assert (neighbours != NULL);
889
890   n = lookup_neighbour (target);
891   if ((n == NULL) || (GNUNET_YES != n->is_connected))
892   {
893     GNUNET_STATISTICS_update (GST_stats,
894                               gettext_noop
895                               ("# messages not sent (no such peer or not connected)"),
896                               1, GNUNET_NO);
897 #if DEBUG_TRANSPORT
898     if (n == NULL)
899       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900                   "Could not send message to peer `%s': unknown neighbor",
901                   GNUNET_i2s (target));
902     else if (GNUNET_YES != n->is_connected)
903       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904                   "Could not send message to peer `%s': not connected\n",
905                   GNUNET_i2s (target));
906 #endif
907     if (NULL != cont)
908       cont (cont_cls, GNUNET_SYSERR);
909     return;
910   }
911
912   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0))
913   {
914     GNUNET_STATISTICS_update (GST_stats,
915                               gettext_noop
916                               ("# messages not sent (no such peer or not connected)"),
917                               1, GNUNET_NO);
918 #if DEBUG_TRANSPORT
919       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920                   "Could not send message to peer `%s': no address available\n",
921                   GNUNET_i2s (target));
922 #endif
923
924     if (NULL != cont)
925       cont (cont_cls, GNUNET_SYSERR);
926     return;
927   }
928
929
930   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
931   GNUNET_STATISTICS_update (GST_stats,
932                             gettext_noop
933                             ("# bytes in message queue for other peers"),
934                             msg_size, GNUNET_NO);
935   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
936   mq->cont = cont;
937   mq->cont_cls = cont_cls;
938   /* FIXME: this memcpy can be up to 7% of our total runtime! */
939   memcpy (&mq[1], msg, msg_size);
940   mq->message_buf = (const char *) &mq[1];
941   mq->message_buf_size = msg_size;
942   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
943   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
944   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
945       (NULL == n->is_active))
946     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
947 }
948
949
950 /**
951  * We have received a message from the given sender.  How long should
952  * we delay before receiving more?  (Also used to keep the peer marked
953  * as live).
954  *
955  * @param sender sender of the message
956  * @param size size of the message
957  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
958  *                   GNUNET_NO if the neighbour is not connected or violates the quota
959  * @return how long to wait before reading more from this sender
960  */
961 struct GNUNET_TIME_Relative
962 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
963                                         *sender, ssize_t size, int *do_forward)
964 {
965   struct NeighbourMapEntry *n;
966   struct GNUNET_TIME_Relative ret;
967
968   GNUNET_assert (neighbours != NULL);
969
970   n = lookup_neighbour (sender);
971   if (n == NULL)
972   {
973     *do_forward = GNUNET_NO;
974     return GNUNET_TIME_UNIT_ZERO;
975   }
976   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
977   {
978     n->quota_violation_count++;
979 #if DEBUG_TRANSPORT
980     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
981                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
982                 n->in_tracker.available_bytes_per_s__,
983                 n->quota_violation_count);
984 #endif
985     /* Discount 32k per violation */
986     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
987   }
988   else
989   {
990     if (n->quota_violation_count > 0)
991     {
992       /* try to add 32k back */
993       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
994       n->quota_violation_count--;
995     }
996   }
997   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
998   {
999     GNUNET_STATISTICS_update (GST_stats,
1000                               gettext_noop
1001                               ("# bandwidth quota violations by other peers"),
1002                               1, GNUNET_NO);
1003     *do_forward = GNUNET_NO;
1004     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1005   }
1006   *do_forward = GNUNET_YES;
1007   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
1008   if (ret.rel_value > 0)
1009   {
1010 #if DEBUG_TRANSPORT
1011     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1012                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1013                 (unsigned long long) n->in_tracker.
1014                 consumption_since_last_update__,
1015                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1016                 (unsigned long long) ret.rel_value);
1017 #endif
1018     GNUNET_STATISTICS_update (GST_stats,
1019                               gettext_noop ("# ms throttling suggested"),
1020                               (int64_t) ret.rel_value, GNUNET_NO);
1021   }
1022   return ret;
1023 }
1024
1025
1026 /**
1027  * Keep the connection to the given neighbour alive longer,
1028  * we received a KEEPALIVE (or equivalent).
1029  *
1030  * @param neighbour neighbour to keep alive
1031  */
1032 void
1033 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1034 {
1035   struct NeighbourMapEntry *n;
1036
1037   GNUNET_assert (neighbours != NULL);
1038
1039   n = lookup_neighbour (neighbour);
1040   if (NULL == n)
1041   {
1042     GNUNET_STATISTICS_update (GST_stats,
1043                               gettext_noop
1044                               ("# KEEPALIVE messages discarded (not connected)"),
1045                               1, GNUNET_NO);
1046     return;
1047   }
1048   GNUNET_SCHEDULER_cancel (n->timeout_task);
1049   n->timeout_task =
1050       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1051                                     &neighbour_timeout_task, n);
1052 }
1053
1054
1055 /**
1056  * Change the incoming quota for the given peer.
1057  *
1058  * @param neighbour identity of peer to change qutoa for
1059  * @param quota new quota
1060  */
1061 void
1062 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1063                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1064 {
1065   struct NeighbourMapEntry *n;
1066
1067   GNUNET_assert (neighbours != NULL);
1068
1069   n = lookup_neighbour (neighbour);
1070   if (n == NULL)
1071   {
1072     GNUNET_STATISTICS_update (GST_stats,
1073                               gettext_noop
1074                               ("# SET QUOTA messages ignored (no such peer)"),
1075                               1, GNUNET_NO);
1076     return;
1077   }
1078   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1079   if (0 != ntohl (quota.value__))
1080     return;
1081 #if DEBUG_TRANSPORT
1082   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1083               GNUNET_i2s (&n->id), "SET_QUOTA");
1084 #endif
1085   if (GNUNET_YES == n->is_connected)
1086     GNUNET_STATISTICS_update (GST_stats,
1087                               gettext_noop ("# disconnects due to quota of 0"), 1,
1088                               GNUNET_NO);
1089   disconnect_neighbour (n);
1090 }
1091
1092
1093 /**
1094  * Closure for the neighbours_iterate function.
1095  */
1096 struct IteratorContext
1097 {
1098   /**
1099    * Function to call on each connected neighbour.
1100    */
1101   GST_NeighbourIterator cb;
1102
1103   /**
1104    * Closure for 'cb'.
1105    */
1106   void *cb_cls;
1107 };
1108
1109
1110 /**
1111  * Call the callback from the closure for each connected neighbour.
1112  *
1113  * @param cls the 'struct IteratorContext'
1114  * @param key the hash of the public key of the neighbour
1115  * @param value the 'struct NeighbourMapEntry'
1116  * @return GNUNET_OK (continue to iterate)
1117  */
1118 static int
1119 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1120 {
1121   struct IteratorContext *ic = cls;
1122   struct NeighbourMapEntry *n = value;
1123
1124   if (GNUNET_YES != n->is_connected)
1125     return GNUNET_OK;
1126
1127   GNUNET_assert (n->ats_count > 0);
1128   ic->cb (ic->cb_cls, &n->id, n->ats, n->ats_count, n->plugin_name, n->addr, n->addrlen);
1129   return GNUNET_OK;
1130 }
1131
1132
1133 /**
1134  * Iterate over all connected neighbours.
1135  *
1136  * @param cb function to call
1137  * @param cb_cls closure for cb
1138  */
1139 void
1140 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1141 {
1142   struct IteratorContext ic;
1143
1144   GNUNET_assert (neighbours != NULL);
1145
1146   ic.cb = cb;
1147   ic.cb_cls = cb_cls;
1148   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1149 }
1150
1151
1152 /**
1153  * If we have an active connection to the given target, it must be shutdown.
1154  *
1155  * @param target peer to disconnect from
1156  */
1157 void
1158 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1159 {
1160   struct NeighbourMapEntry *n;
1161   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1162   struct SessionDisconnectMessage disconnect_msg;
1163
1164   GNUNET_assert (neighbours != NULL);
1165
1166   n = lookup_neighbour (target);
1167   if (NULL == n)
1168     return;                     /* not active */
1169   if (GNUNET_YES == n->is_connected)
1170   {
1171     /* we're actually connected, send DISCONNECT message */
1172     disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
1173     disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1174     disconnect_msg.reserved = htonl (0);
1175     disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1176                                          sizeof (struct GNUNET_TIME_AbsoluteNBO));
1177     disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
1178     disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1179     GNUNET_assert (GNUNET_OK ==
1180                    GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
1181                                            &disconnect_msg.purpose,
1182                                            &disconnect_msg.signature));
1183     papi = GST_plugins_find (n->plugin_name);
1184     if (papi != NULL)
1185       papi->send (papi->cls, target, (const void *) &disconnect_msg,
1186                   sizeof (disconnect_msg),
1187                   UINT32_MAX /* priority */ ,
1188                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
1189                   GNUNET_YES, NULL, NULL);
1190     GNUNET_STATISTICS_update (GST_stats,
1191                               gettext_noop ("# peers disconnected due to external request"), 1,
1192                               GNUNET_NO);
1193   }
1194   disconnect_neighbour (n);
1195 }
1196
1197
1198 /* end of file gnunet-service-transport_neighbours.c */