fixing bug #1812
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48 /**
49  * How often do we send KEEPALIVE messages to each of our neighbours?
50  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
51  * send 3 keepalives in each interval, so 3 messages would need to be
52  * lost in a row for a disconnect).
53  */
54 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
55
56
57 /**
58  * Entry in neighbours.
59  */
60 struct NeighbourMapEntry;
61
62 /**
63  * Message a peer sends to another to indicate its
64  * preference for communicating via a particular
65  * session (and the desire to establish a real
66  * connection).
67  */
68 struct SessionConnectMessage
69 {
70   /**
71    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
72    */
73   struct GNUNET_MessageHeader header;
74
75   /**
76    * Always zero.
77    */
78   uint32_t reserved GNUNET_PACKED;
79
80   /**
81    * Absolute time at the sender.  Only the most recent connect
82    * message implies which session is preferred by the sender.
83    */
84   struct GNUNET_TIME_AbsoluteNBO timestamp;
85
86 };
87
88
89 struct SessionDisconnectMessage
90 {
91   /**
92    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
93    */
94   struct GNUNET_MessageHeader header;
95
96   /**
97    * Always zero.
98    */
99   uint32_t reserved GNUNET_PACKED;
100
101   /**
102    * Purpose of the signature.  Extends over the timestamp.
103    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
104    */
105   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
106
107   /**
108    * Absolute time at the sender.  Only the most recent connect
109    * message implies which session is preferred by the sender.
110    */
111   struct GNUNET_TIME_AbsoluteNBO timestamp;
112
113   /**
114    * Signature of the peer that sends us the disconnect.  Only
115    * valid if the timestamp is AFTER the timestamp from the
116    * corresponding 'CONNECT' message.
117    */
118   struct GNUNET_CRYPTO_RsaSignature signature;
119 };
120
121
122 /**
123  * For each neighbour we keep a list of messages
124  * that we still want to transmit to the neighbour.
125  */
126 struct MessageQueue
127 {
128
129   /**
130    * This is a doubly linked list.
131    */
132   struct MessageQueue *next;
133
134   /**
135    * This is a doubly linked list.
136    */
137   struct MessageQueue *prev;
138
139   /**
140    * Once this message is actively being transmitted, which
141    * neighbour is it associated with?
142    */
143   struct NeighbourMapEntry *n;
144
145   /**
146    * Function to call once we're done.
147    */
148   GST_NeighbourSendContinuation cont;
149
150   /**
151    * Closure for 'cont'
152    */
153   void *cont_cls;
154
155   /**
156    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
157    * stuck together in memory.  Allocated at the end of this struct.
158    */
159   const char *message_buf;
160
161   /**
162    * Size of the message buf
163    */
164   size_t message_buf_size;
165
166   /**
167    * At what time should we fail?
168    */
169   struct GNUNET_TIME_Absolute timeout;
170
171 };
172
173
174 /**
175  * Entry in neighbours.
176  */
177 struct NeighbourMapEntry
178 {
179
180   /**
181    * Head of list of messages we would like to send to this peer;
182    * must contain at most one message per client.
183    */
184   struct MessageQueue *messages_head;
185
186   /**
187    * Tail of list of messages we would like to send to this peer; must
188    * contain at most one message per client.
189    */
190   struct MessageQueue *messages_tail;
191
192   /**
193    * Context for address suggestion.
194    * NULL after we are connected.
195    */
196   struct GNUNET_ATS_SuggestionContext *asc;
197
198   /**
199    * Performance data for the peer.
200    */
201   struct GNUNET_TRANSPORT_ATS_Information *ats;
202
203   /**
204    * Are we currently trying to send a message? If so, which one?
205    */
206   struct MessageQueue *is_active;
207
208   /**
209    * Active session for communicating with the peer.
210    */
211   struct Session *session;
212
213   /**
214    * Name of the plugin we currently use.
215    */
216   char *plugin_name;
217
218   /**
219    * Address used for communicating with the peer, NULL for inbound connections.
220    */
221   void *addr;
222
223   /**
224    * Number of bytes in 'addr'.
225    */
226   size_t addrlen;
227
228   /**
229    * Identity of this neighbour.
230    */
231   struct GNUNET_PeerIdentity id;
232
233   /**
234    * ID of task scheduled to run when this peer is about to
235    * time out (will free resources associated with the peer).
236    */
237   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
238
239   /**
240    * ID of task scheduled to send keepalives.
241    */
242   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
243
244   /**
245    * ID of task scheduled to run when we should try transmitting
246    * the head of the message queue.
247    */
248   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
249
250   /**
251    * Tracker for inbound bandwidth.
252    */
253   struct GNUNET_BANDWIDTH_Tracker in_tracker;
254
255   /**
256    * How often has the other peer (recently) violated the inbound
257    * traffic limit?  Incremented by 10 per violation, decremented by 1
258    * per non-violation (for each time interval).
259    */
260   unsigned int quota_violation_count;
261
262   /**
263    * Number of values in 'ats' array.
264    */
265   unsigned int ats_count;
266
267   /**
268    * Are we already in the process of disconnecting this neighbour?
269    */
270   int in_disconnect;
271
272   /**
273    * Do we currently consider this neighbour connected? (as far as
274    * the connect/disconnect callbacks are concerned)?
275    */
276   int is_connected;
277
278 };
279
280
281 /**
282  * All known neighbours and their HELLOs.
283  */
284 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
285
286 /**
287  * Closure for connect_notify_cb and disconnect_notify_cb
288  */
289 static void *callback_cls;
290
291 /**
292  * Function to call when we connected to a neighbour.
293  */
294 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
295
296 /**
297  * Function to call when we disconnected from a neighbour.
298  */
299 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
300
301 /**
302  * counter for connected neighbours
303  */
304 static int neighbours_connected;
305
306 /**
307  * Lookup a neighbour entry in the neighbours hash map.
308  *
309  * @param pid identity of the peer to look up
310  * @return the entry, NULL if there is no existing record
311  */
312 static struct NeighbourMapEntry *
313 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
314 {
315   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
316 }
317
318
319 /**
320  * Task invoked to start a transmission to another peer.
321  *
322  * @param cls the 'struct NeighbourMapEntry'
323  * @param tc scheduler context
324  */
325 static void
326 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
327
328
329 /**
330  * We're done with our transmission attempt, continue processing.
331  *
332  * @param cls the 'struct MessageQueue' of the message
333  * @param receiver intended receiver
334  * @param success whether it worked or not
335  */
336 static void
337 transmit_send_continuation (void *cls,
338                             const struct GNUNET_PeerIdentity *receiver,
339                             int success)
340 {
341   struct MessageQueue *mq;
342   struct NeighbourMapEntry *n;
343
344   mq = cls;
345   n = mq->n;
346   if (NULL != n)
347   {
348     GNUNET_assert (n->is_active == mq);
349     n->is_active = NULL;
350     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
351     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
352   }
353   if (NULL != mq->cont)
354     mq->cont (mq->cont_cls, success);
355   GNUNET_free (mq);
356 }
357
358
359 /**
360  * Check the ready list for the given neighbour and if a plugin is
361  * ready for transmission (and if we have a message), do so!
362  *
363  * @param n target peer for which to transmit
364  */
365 static void
366 try_transmission_to_peer (struct NeighbourMapEntry *n)
367 {
368   struct MessageQueue *mq;
369   struct GNUNET_TIME_Relative timeout;
370   ssize_t ret;
371   struct GNUNET_TRANSPORT_PluginFunctions *papi;
372
373   if (n->is_active != NULL)
374     return;                     /* transmission already pending */
375   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
376     return;                     /* currently waiting for bandwidth */
377   while (NULL != (mq = n->messages_head))
378   {
379     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
380     if (timeout.rel_value > 0)
381       break;
382     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
383     n->is_active = mq;
384     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
385   }
386   if (NULL == mq)
387     return;                     /* no more messages */
388
389   papi = GST_plugins_find (n->plugin_name);
390   if (papi == NULL)
391   {
392     GNUNET_break (0);
393     return;
394   }
395   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
396   n->is_active = mq;
397   mq->n = n;
398
399   if  (((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0)))
400   {
401     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No address peer for peer `%s'\n",
402                 GNUNET_i2s (&n->id));
403     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
404     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
405     return;
406   }
407
408   ret =
409       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
410                   0 /* priority -- remove from plugin API? */ ,
411                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
412                   &transmit_send_continuation, mq);
413   if (ret == -1)
414   {
415     /* failure, but 'send' would not call continuation in this case,
416      * so we need to do it here! */
417     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
418     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
419   }
420 }
421
422
423 /**
424  * Task invoked to start a transmission to another peer.
425  *
426  * @param cls the 'struct NeighbourMapEntry'
427  * @param tc scheduler context
428  */
429 static void
430 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
431 {
432   struct NeighbourMapEntry *n = cls;
433
434   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
435   try_transmission_to_peer (n);
436 }
437
438
439 /**
440  * Initialize the neighbours subsystem.
441  *
442  * @param cls closure for callbacks
443  * @param connect_cb function to call if we connect to a peer
444  * @param disconnect_cb function to call if we disconnect from a peer
445  */
446 void
447 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
448                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
449 {
450   callback_cls = cls;
451   connect_notify_cb = connect_cb;
452   disconnect_notify_cb = disconnect_cb;
453   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
454 }
455
456
457 /**
458  * Disconnect from the given neighbour, clean up the record.
459  *
460  * @param n neighbour to disconnect from
461  */
462 static void
463 disconnect_neighbour (struct NeighbourMapEntry *n)
464 {
465   struct MessageQueue *mq;
466
467   if (GNUNET_YES == n->in_disconnect)
468     return;
469   n->in_disconnect = GNUNET_YES;
470   while (NULL != (mq = n->messages_head))
471   {
472     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
473     mq->cont (mq->cont_cls, GNUNET_SYSERR);
474     GNUNET_free (mq);
475   }
476   if (NULL != n->is_active)
477   {
478     n->is_active->n = NULL;
479     n->is_active = NULL;
480   }
481   if (GNUNET_YES == n->is_connected)
482   {
483     n->is_connected = GNUNET_NO;
484     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
485     GNUNET_SCHEDULER_cancel (n->keepalive_task);
486     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;  
487     GNUNET_assert (neighbours_connected > 0);
488     neighbours_connected--;
489     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
490                               GNUNET_NO);
491     disconnect_notify_cb (callback_cls, &n->id);
492   }
493   GNUNET_assert (GNUNET_YES ==
494                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
495                                                        &n->id.hashPubKey, n));
496   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
497   {
498     GNUNET_SCHEDULER_cancel (n->timeout_task);
499     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
500   }
501   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
502   {
503     GNUNET_SCHEDULER_cancel (n->transmission_task);
504     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
505   }
506   if (NULL != n->asc)
507   {
508     GNUNET_ATS_suggest_address_cancel (n->asc);
509     n->asc = NULL;
510   }
511   GNUNET_array_grow (n->ats, n->ats_count, 0);
512   if (NULL != n->plugin_name)
513   {
514     GNUNET_free (n->plugin_name);
515     n->plugin_name = NULL;
516   }
517   if (NULL != n->addr)
518   {
519     GNUNET_free (n->addr);
520     n->addr = NULL;
521     n->addrlen = 0;
522   }
523   n->session = NULL;
524   GNUNET_free (n);
525 }
526
527
528 /**
529  * Peer has been idle for too long. Disconnect.
530  *
531  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
532  * @param tc scheduler context
533  */
534 static void
535 neighbour_timeout_task (void *cls,
536                         const struct GNUNET_SCHEDULER_TaskContext *tc)
537 {
538   struct NeighbourMapEntry *n = cls;
539
540   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
541   if (GNUNET_YES == n->is_connected)
542     GNUNET_STATISTICS_update (GST_stats,
543                               gettext_noop ("# peers disconnected due to timeout"), 1,
544                               GNUNET_NO);
545   disconnect_neighbour (n);
546 }
547
548
549 /**
550  * Send another keepalive message.
551  *
552  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
553  * @param tc scheduler context
554  */
555 static void
556 neighbour_keepalive_task (void *cls,
557                           const struct GNUNET_SCHEDULER_TaskContext *tc)
558 {
559   struct NeighbourMapEntry *n = cls;
560   struct GNUNET_MessageHeader m;
561   struct GNUNET_TRANSPORT_PluginFunctions *papi;
562
563   n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
564   GNUNET_assert (GNUNET_YES == n->is_connected);
565   GNUNET_STATISTICS_update (GST_stats,
566                             gettext_noop ("# keepalives sent"), 1,
567                             GNUNET_NO);
568   m.size = htons (sizeof (struct GNUNET_MessageHeader));
569   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
570   papi = GST_plugins_find (n->plugin_name);
571   if (papi != NULL)
572     papi->send (papi->cls, 
573                 &n->id, (const void *) &m,
574                 sizeof (m),
575                 UINT32_MAX /* priority */ ,
576                 GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
577                 GNUNET_YES, NULL, NULL);
578   n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
579                                                     &neighbour_keepalive_task,
580                                                     n);
581 }
582
583
584 /**
585  * Disconnect from the given neighbour.
586  *
587  * @param cls unused
588  * @param key hash of neighbour's public key (not used)
589  * @param value the 'struct NeighbourMapEntry' of the neighbour
590  */
591 static int
592 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
593 {
594   struct NeighbourMapEntry *n = value;
595
596 #if DEBUG_TRANSPORT
597   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
598               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
599 #endif
600   if (GNUNET_YES == n->is_connected)
601     GNUNET_STATISTICS_update (GST_stats,
602                               gettext_noop ("# peers disconnected due to global disconnect"), 1,
603                               GNUNET_NO);
604   disconnect_neighbour (n);
605   return GNUNET_OK;
606 }
607
608
609 /**
610  * Cleanup the neighbours subsystem.
611  */
612 void
613 GST_neighbours_stop ()
614 {
615   GNUNET_assert (neighbours != NULL);
616
617   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
618                                          NULL);
619   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
620   GNUNET_assert (neighbours_connected == 0);
621   neighbours = NULL;
622   callback_cls = NULL;
623   connect_notify_cb = NULL;
624   disconnect_notify_cb = NULL;
625 }
626
627
628 /**
629  * For an existing neighbour record, set the active connection to
630  * the given address.
631  *
632  * @param peer identity of the peer to switch the address for
633  * @param plugin_name name of transport that delivered the PONG
634  * @param address address of the other peer, NULL if other peer
635  *                       connected to us
636  * @param address_len number of bytes in address
637  * @param session session to use (or NULL)
638  * @param ats performance data
639  * @param ats_count number of entries in ats (excluding 0-termination)
640  */
641 void
642 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
643                                   const char *plugin_name, const void *address,
644                                   size_t address_len, struct Session *session,
645                                   const struct GNUNET_TRANSPORT_ATS_Information
646                                   *ats, uint32_t ats_count)
647 {
648   struct NeighbourMapEntry *n;
649   struct SessionConnectMessage connect_msg;
650
651   GNUNET_assert (neighbours != NULL);
652
653   n = lookup_neighbour (peer);
654   if (NULL == n)
655   {
656     GNUNET_break (0);
657     return;
658   }
659
660 #if DEBUG_TRANSPORT
661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662               "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
663               GNUNET_i2s (peer), plugin_name,
664               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
665                                                                   address,
666                                                                   address_len),
667               session);
668 #endif
669
670   GNUNET_free_non_null (n->addr);
671   n->addr = GNUNET_malloc (address_len);
672   memcpy (n->addr, address, address_len);
673   n->addrlen = address_len;
674   n->session = session;
675   GNUNET_array_grow (n->ats, n->ats_count, ats_count);
676   memcpy (n->ats, ats,
677           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
678   GNUNET_free_non_null (n->plugin_name);
679   n->plugin_name = GNUNET_strdup (plugin_name);
680   GNUNET_SCHEDULER_cancel (n->timeout_task);
681   n->timeout_task =
682       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
683                                     &neighbour_timeout_task, n);
684   connect_msg.header.size = htons (sizeof (struct SessionConnectMessage));
685   connect_msg.header.type =
686       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
687   connect_msg.reserved = htonl (0);
688   connect_msg.timestamp =
689       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
690   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
691                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
692 }
693
694
695 /**
696  * Try to connect to the target peer using the given address
697  *
698  * @param cls the 'struct NeighbourMapEntry' of the target
699  * @param target identity of the target peer
700  * @param plugin_name name of the plugin
701  * @param plugin_address binary address
702  * @param plugin_address_len length of address
703  * @param session session to use
704  * @param bandwidth available bandwidth
705  * @param ats performance data for the address (as far as known)
706  * @param ats_count number of performance records in 'ats'
707  */
708 static void
709 try_connect_using_address (void *cls, const struct GNUNET_PeerIdentity *target,
710                            const char *plugin_name, const void *plugin_address,
711                            size_t plugin_address_len, struct Session *session,
712                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
713                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
714                            uint32_t ats_count)
715 {
716   struct NeighbourMapEntry *n = cls;
717   int was_connected;
718
719   n->asc = NULL;
720   was_connected = n->is_connected;
721   n->is_connected = GNUNET_YES;
722   GST_neighbours_switch_to_address (target, plugin_name, plugin_address,
723                                     plugin_address_len, session, ats,
724                                     ats_count);
725   if (GNUNET_YES == was_connected)
726     return;
727   n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task,
728                                                 n);
729   neighbours_connected++;
730   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
731                             GNUNET_NO);
732   connect_notify_cb (callback_cls, target, n->ats, n->ats_count);
733 }
734
735
736 /**
737  * Try to create a connection to the given target (eventually).
738  *
739  * @param target peer to try to connect to
740  */
741 void
742 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
743 {
744   struct NeighbourMapEntry *n;
745
746   GNUNET_assert (neighbours != NULL);
747
748 #if DEBUG_TRANSPORT
749   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
750               GNUNET_i2s (target));
751 #endif
752
753   GNUNET_assert (0 !=
754                  memcmp (target, &GST_my_identity,
755                          sizeof (struct GNUNET_PeerIdentity)));
756   n = lookup_neighbour (target);
757   if ((NULL != n) && (GNUNET_YES == n->is_connected))
758     return;                     /* already connected */
759   if (n == NULL)
760   {
761 #if DEBUG_TRANSPORT
762     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
763                 "Unknown peer `%s', creating new neighbour\n",
764                 GNUNET_i2s (target));
765 #endif
766     n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
767     n->id = *target;
768     GNUNET_array_grow (n->ats, n->ats_count, 1);
769     n->ats[0].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);;
770     n->ats[0].value = htonl (0);
771     GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
772                                    GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
773                                    MAX_BANDWIDTH_CARRY_S);
774     n->timeout_task =
775         GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
776                                       &neighbour_timeout_task, n);
777     GNUNET_assert (GNUNET_OK ==
778                    GNUNET_CONTAINER_multihashmap_put (neighbours,
779                                                       &n->id.hashPubKey, n,
780                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
781   }
782   if (n->asc != NULL)
783     return;                     /* already trying */
784 #if DEBUG_TRANSPORT
785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786               "Asking ATS for suggested address to connect to peer `%s'\n",
787               GNUNET_i2s (target));
788 #endif
789   n->asc =
790       GNUNET_ATS_suggest_address (GST_ats, target, &try_connect_using_address,
791                                   n);
792 }
793
794
795 /**
796  * Test if we're connected to the given peer.
797  *
798  * @param target peer to test
799  * @return GNUNET_YES if we are connected, GNUNET_NO if not
800  */
801 int
802 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
803 {
804   struct NeighbourMapEntry *n;
805
806   GNUNET_assert (neighbours != NULL);
807
808   n = lookup_neighbour (target);
809   if ((NULL == n) || (n->is_connected != GNUNET_YES))
810     return GNUNET_NO;           /* not connected */
811   return GNUNET_YES;
812 }
813
814
815 /**
816  * A session was terminated. Take note.
817  *
818  * @param peer identity of the peer where the session died
819  * @param session session that is gone
820  */
821 void
822 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
823                                    struct Session *session)
824 {
825   struct NeighbourMapEntry *n;
826
827   GNUNET_assert (neighbours != NULL);
828
829 #if DEBUG_TRANSPORT
830   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
831               "Session %X to peer `%s' ended \n",
832               session, GNUNET_i2s (peer));
833 #endif
834
835   n = lookup_neighbour (peer);
836   if (NULL == n)
837     return;
838   if (session != n->session)
839     return;                     /* doesn't affect us */
840
841   n->session = NULL;
842   GNUNET_free (n->addr);
843   n->addr = NULL;
844   n->addrlen = 0;
845
846
847   if (GNUNET_YES != n->is_connected)
848     return;                     /* not connected anymore anyway, shouldn't matter */
849
850   //n->is_connected = GNUNET_NO;
851
852   /* fast disconnect unless ATS suggests a new address */
853   GNUNET_SCHEDULER_cancel (n->timeout_task);
854   n->timeout_task =
855       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
856                                     &neighbour_timeout_task, n);
857   /* try QUICKLY to re-establish a connection, reduce timeout! */
858   if (NULL != n->ats)
859   {
860     /* how can this be!? */
861     //GNUNET_break (0);
862     return;
863   }
864   n->asc =
865       GNUNET_ATS_suggest_address (GST_ats, peer, &try_connect_using_address, n);
866 }
867
868
869 /**
870  * Transmit a message to the given target using the active connection.
871  *
872  * @param target destination
873  * @param msg message to send
874  * @param msg_size number of bytes in msg
875  * @param timeout when to fail with timeout
876  * @param cont function to call when done
877  * @param cont_cls closure for 'cont'
878  */
879 void
880 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
881                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
882                      GST_NeighbourSendContinuation cont, void *cont_cls)
883 {
884   struct NeighbourMapEntry *n;
885   struct MessageQueue *mq;
886
887   GNUNET_assert (neighbours != NULL);
888
889   n = lookup_neighbour (target);
890   if ((n == NULL) || (GNUNET_YES != n->is_connected))
891   {
892     GNUNET_STATISTICS_update (GST_stats,
893                               gettext_noop
894                               ("# messages not sent (no such peer or not connected)"),
895                               1, GNUNET_NO);
896 #if DEBUG_TRANSPORT
897     if (n == NULL)
898       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
899                   "Could not send message to peer `%s': unknown neighbor",
900                   GNUNET_i2s (target));
901     else if (GNUNET_YES != n->is_connected)
902       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
903                   "Could not send message to peer `%s': not connected\n",
904                   GNUNET_i2s (target));
905 #endif
906     if (NULL != cont)
907       cont (cont_cls, GNUNET_SYSERR);
908     return;
909   }
910
911   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0))
912   {
913     GNUNET_STATISTICS_update (GST_stats,
914                               gettext_noop
915                               ("# messages not sent (no such peer or not connected)"),
916                               1, GNUNET_NO);
917 #if DEBUG_TRANSPORT
918       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919                   "Could not send message to peer `%s': no address available\n",
920                   GNUNET_i2s (target));
921 #endif
922
923     if (NULL != cont)
924       cont (cont_cls, GNUNET_SYSERR);
925     return;
926   }
927
928
929   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
930   GNUNET_STATISTICS_update (GST_stats,
931                             gettext_noop
932                             ("# bytes in message queue for other peers"),
933                             msg_size, GNUNET_NO);
934   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
935   mq->cont = cont;
936   mq->cont_cls = cont_cls;
937   /* FIXME: this memcpy can be up to 7% of our total runtime! */
938   memcpy (&mq[1], msg, msg_size);
939   mq->message_buf = (const char *) &mq[1];
940   mq->message_buf_size = msg_size;
941   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
942   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
943   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
944       (NULL == n->is_active))
945     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
946 }
947
948
949 /**
950  * We have received a message from the given sender.  How long should
951  * we delay before receiving more?  (Also used to keep the peer marked
952  * as live).
953  *
954  * @param sender sender of the message
955  * @param size size of the message
956  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
957  *                   GNUNET_NO if the neighbour is not connected or violates the quota
958  * @return how long to wait before reading more from this sender
959  */
960 struct GNUNET_TIME_Relative
961 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
962                                         *sender, ssize_t size, int *do_forward)
963 {
964   struct NeighbourMapEntry *n;
965   struct GNUNET_TIME_Relative ret;
966
967   GNUNET_assert (neighbours != NULL);
968
969   n = lookup_neighbour (sender);
970   if (n == NULL)
971   {
972     *do_forward = GNUNET_NO;
973     return GNUNET_TIME_UNIT_ZERO;
974   }
975   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
976   {
977     n->quota_violation_count++;
978 #if DEBUG_TRANSPORT
979     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
981                 n->in_tracker.available_bytes_per_s__,
982                 n->quota_violation_count);
983 #endif
984     /* Discount 32k per violation */
985     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
986   }
987   else
988   {
989     if (n->quota_violation_count > 0)
990     {
991       /* try to add 32k back */
992       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
993       n->quota_violation_count--;
994     }
995   }
996   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
997   {
998     GNUNET_STATISTICS_update (GST_stats,
999                               gettext_noop
1000                               ("# bandwidth quota violations by other peers"),
1001                               1, GNUNET_NO);
1002     *do_forward = GNUNET_NO;
1003     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1004   }
1005   *do_forward = GNUNET_YES;
1006   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
1007   if (ret.rel_value > 0)
1008   {
1009 #if DEBUG_TRANSPORT
1010     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1011                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1012                 (unsigned long long) n->in_tracker.
1013                 consumption_since_last_update__,
1014                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1015                 (unsigned long long) ret.rel_value);
1016 #endif
1017     GNUNET_STATISTICS_update (GST_stats,
1018                               gettext_noop ("# ms throttling suggested"),
1019                               (int64_t) ret.rel_value, GNUNET_NO);
1020   }
1021   return ret;
1022 }
1023
1024
1025 /**
1026  * Keep the connection to the given neighbour alive longer,
1027  * we received a KEEPALIVE (or equivalent).
1028  *
1029  * @param neighbour neighbour to keep alive
1030  */
1031 void
1032 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1033 {
1034   struct NeighbourMapEntry *n;
1035
1036   GNUNET_assert (neighbours != NULL);
1037
1038   n = lookup_neighbour (neighbour);
1039   if (NULL == n)
1040   {
1041     GNUNET_STATISTICS_update (GST_stats,
1042                               gettext_noop
1043                               ("# KEEPALIVE messages discarded (not connected)"),
1044                               1, GNUNET_NO);
1045     return;
1046   }
1047   GNUNET_SCHEDULER_cancel (n->timeout_task);
1048   n->timeout_task =
1049       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1050                                     &neighbour_timeout_task, n);
1051 }
1052
1053
1054 /**
1055  * Change the incoming quota for the given peer.
1056  *
1057  * @param neighbour identity of peer to change qutoa for
1058  * @param quota new quota
1059  */
1060 void
1061 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1062                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1063 {
1064   struct NeighbourMapEntry *n;
1065
1066   GNUNET_assert (neighbours != NULL);
1067
1068   n = lookup_neighbour (neighbour);
1069   if (n == NULL)
1070   {
1071     GNUNET_STATISTICS_update (GST_stats,
1072                               gettext_noop
1073                               ("# SET QUOTA messages ignored (no such peer)"),
1074                               1, GNUNET_NO);
1075     return;
1076   }
1077   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1078   if (0 != ntohl (quota.value__))
1079     return;
1080 #if DEBUG_TRANSPORT
1081   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1082               GNUNET_i2s (&n->id), "SET_QUOTA");
1083 #endif
1084   if (GNUNET_YES == n->is_connected)
1085     GNUNET_STATISTICS_update (GST_stats,
1086                               gettext_noop ("# disconnects due to quota of 0"), 1,
1087                               GNUNET_NO);
1088   disconnect_neighbour (n);
1089 }
1090
1091
1092 /**
1093  * Closure for the neighbours_iterate function.
1094  */
1095 struct IteratorContext
1096 {
1097   /**
1098    * Function to call on each connected neighbour.
1099    */
1100   GST_NeighbourIterator cb;
1101
1102   /**
1103    * Closure for 'cb'.
1104    */
1105   void *cb_cls;
1106 };
1107
1108
1109 /**
1110  * Call the callback from the closure for each connected neighbour.
1111  *
1112  * @param cls the 'struct IteratorContext'
1113  * @param key the hash of the public key of the neighbour
1114  * @param value the 'struct NeighbourMapEntry'
1115  * @return GNUNET_OK (continue to iterate)
1116  */
1117 static int
1118 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1119 {
1120   struct IteratorContext *ic = cls;
1121   struct NeighbourMapEntry *n = value;
1122
1123   if (GNUNET_YES != n->is_connected)
1124     return GNUNET_OK;
1125
1126   GNUNET_assert (n->ats_count > 0);
1127   ic->cb (ic->cb_cls, &n->id, n->ats, n->ats_count, n->plugin_name, n->addr, n->addrlen);
1128   return GNUNET_OK;
1129 }
1130
1131
1132 /**
1133  * Iterate over all connected neighbours.
1134  *
1135  * @param cb function to call
1136  * @param cb_cls closure for cb
1137  */
1138 void
1139 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1140 {
1141   struct IteratorContext ic;
1142
1143   GNUNET_assert (neighbours != NULL);
1144
1145   ic.cb = cb;
1146   ic.cb_cls = cb_cls;
1147   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1148 }
1149
1150
1151 /**
1152  * If we have an active connection to the given target, it must be shutdown.
1153  *
1154  * @param target peer to disconnect from
1155  */
1156 void
1157 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1158 {
1159   struct NeighbourMapEntry *n;
1160   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1161   struct SessionDisconnectMessage disconnect_msg;
1162
1163   GNUNET_assert (neighbours != NULL);
1164
1165   n = lookup_neighbour (target);
1166   if (NULL == n)
1167     return;                     /* not active */
1168   if (GNUNET_YES == n->is_connected)
1169   {
1170     /* we're actually connected, send DISCONNECT message */
1171     disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
1172     disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1173     disconnect_msg.reserved = htonl (0);
1174     disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1175                                          sizeof (struct GNUNET_TIME_AbsoluteNBO));
1176     disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
1177     disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1178     GNUNET_assert (GNUNET_OK ==
1179                    GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
1180                                            &disconnect_msg.purpose,
1181                                            &disconnect_msg.signature));
1182     papi = GST_plugins_find (n->plugin_name);
1183     if (papi != NULL)
1184       papi->send (papi->cls, target, (const void *) &disconnect_msg,
1185                   sizeof (disconnect_msg),
1186                   UINT32_MAX /* priority */ ,
1187                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
1188                   GNUNET_YES, NULL, NULL);
1189     GNUNET_STATISTICS_update (GST_stats,
1190                               gettext_noop ("# peers disconnected due to external request"), 1,
1191                               GNUNET_NO);
1192   }
1193   disconnect_neighbour (n);
1194 }
1195
1196
1197 /* end of file gnunet-service-transport_neighbours.c */