binding to specific address
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48
49 /**
50  * Entry in neighbours.
51  */
52 struct NeighbourMapEntry;
53
54 /**
55  * Message a peer sends to another to indicate its
56  * preference for communicating via a particular
57  * session (and the desire to establish a real
58  * connection).
59  */
60 struct SessionConnectMessage
61 {
62   /**
63    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
64    */
65   struct GNUNET_MessageHeader header;
66
67   /**
68    * Always zero.
69    */
70   uint32_t reserved GNUNET_PACKED;
71
72   /**
73    * Absolute time at the sender.  Only the most recent connect
74    * message implies which session is preferred by the sender.
75    */
76   struct GNUNET_TIME_AbsoluteNBO timestamp;
77
78 };
79
80
81 struct SessionDisconnectMessage
82 {
83   /**
84    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
85    */
86   struct GNUNET_MessageHeader header;
87
88   /**
89    * Always zero.
90    */
91   uint32_t reserved GNUNET_PACKED;
92
93   /**
94    * Purpose of the signature.  Extends over the timestamp.
95    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
96    */
97   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
98
99   /**
100    * Absolute time at the sender.  Only the most recent connect
101    * message implies which session is preferred by the sender.
102    */
103   struct GNUNET_TIME_AbsoluteNBO timestamp;
104
105   /**
106    * Signature of the peer that sends us the disconnect.  Only
107    * valid if the timestamp is AFTER the timestamp from the
108    * corresponding 'CONNECT' message.
109    */
110   struct GNUNET_CRYPTO_RsaSignature signature;
111 };
112
113
114 /**
115  * For each neighbour we keep a list of messages
116  * that we still want to transmit to the neighbour.
117  */
118 struct MessageQueue
119 {
120
121   /**
122    * This is a doubly linked list.
123    */
124   struct MessageQueue *next;
125
126   /**
127    * This is a doubly linked list.
128    */
129   struct MessageQueue *prev;
130
131   /**
132    * Once this message is actively being transmitted, which
133    * neighbour is it associated with?
134    */
135   struct NeighbourMapEntry *n;
136
137   /**
138    * Function to call once we're done.
139    */
140   GST_NeighbourSendContinuation cont;
141
142   /**
143    * Closure for 'cont'
144    */
145   void *cont_cls;
146
147   /**
148    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
149    * stuck together in memory.  Allocated at the end of this struct.
150    */
151   const char *message_buf;
152
153   /**
154    * Size of the message buf
155    */
156   size_t message_buf_size;
157
158   /**
159    * At what time should we fail?
160    */
161   struct GNUNET_TIME_Absolute timeout;
162
163 };
164
165
166 /**
167  * Entry in neighbours.
168  */
169 struct NeighbourMapEntry
170 {
171
172   /**
173    * Head of list of messages we would like to send to this peer;
174    * must contain at most one message per client.
175    */
176   struct MessageQueue *messages_head;
177
178   /**
179    * Tail of list of messages we would like to send to this peer; must
180    * contain at most one message per client.
181    */
182   struct MessageQueue *messages_tail;
183
184   /**
185    * Context for address suggestion.
186    * NULL after we are connected.
187    */
188   struct GNUNET_ATS_SuggestionContext *asc;
189
190   /**
191    * Performance data for the peer.
192    */
193   struct GNUNET_TRANSPORT_ATS_Information *ats;
194
195   /**
196    * Are we currently trying to send a message? If so, which one?
197    */
198   struct MessageQueue *is_active;
199
200   /**
201    * Active session for communicating with the peer.
202    */
203   struct Session *session;
204
205   /**
206    * Name of the plugin we currently use.
207    */
208   char *plugin_name;
209
210   /**
211    * Address used for communicating with the peer, NULL for inbound connections.
212    */
213   void *addr;
214
215   /**
216    * Number of bytes in 'addr'.
217    */
218   size_t addrlen;
219
220   /**
221    * Identity of this neighbour.
222    */
223   struct GNUNET_PeerIdentity id;
224
225   /**
226    * ID of task scheduled to run when this peer is about to
227    * time out (will free resources associated with the peer).
228    */
229   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
230
231   /**
232    * ID of task scheduled to run when we should try transmitting
233    * the head of the message queue.
234    */
235   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
236
237   /**
238    * Tracker for inbound bandwidth.
239    */
240   struct GNUNET_BANDWIDTH_Tracker in_tracker;
241
242   /**
243    * How often has the other peer (recently) violated the inbound
244    * traffic limit?  Incremented by 10 per violation, decremented by 1
245    * per non-violation (for each time interval).
246    */
247   unsigned int quota_violation_count;
248
249   /**
250    * Number of values in 'ats' array.
251    */
252   unsigned int ats_count;
253
254   /**
255    * Are we already in the process of disconnecting this neighbour?
256    */
257   int in_disconnect;
258
259   /**
260    * Do we currently consider this neighbour connected? (as far as
261    * the connect/disconnect callbacks are concerned)?
262    */
263   int is_connected;
264
265 };
266
267
268 /**
269  * All known neighbours and their HELLOs.
270  */
271 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
272
273 /**
274  * Closure for connect_notify_cb and disconnect_notify_cb
275  */
276 static void *callback_cls;
277
278 /**
279  * Function to call when we connected to a neighbour.
280  */
281 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
282
283 /**
284  * Function to call when we disconnected from a neighbour.
285  */
286 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
287
288 /**
289  * counter for connected neighbours
290  */
291 static int neighbours_connected;
292
293 /**
294  * Lookup a neighbour entry in the neighbours hash map.
295  *
296  * @param pid identity of the peer to look up
297  * @return the entry, NULL if there is no existing record
298  */
299 static struct NeighbourMapEntry *
300 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
301 {
302   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
303 }
304
305
306 /**
307  * Task invoked to start a transmission to another peer.
308  *
309  * @param cls the 'struct NeighbourMapEntry'
310  * @param tc scheduler context
311  */
312 static void
313 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
314
315
316 /**
317  * We're done with our transmission attempt, continue processing.
318  *
319  * @param cls the 'struct MessageQueue' of the message
320  * @param receiver intended receiver
321  * @param success whether it worked or not
322  */
323 void
324 transmit_send_continuation (void *cls,
325                             const struct GNUNET_PeerIdentity *receiver,
326                             int success)
327 {
328   struct MessageQueue *mq;
329   struct NeighbourMapEntry *n;
330
331   mq = cls;
332   n = mq->n;
333   if (NULL != n)
334   {
335     GNUNET_assert (n->is_active == mq);
336     n->is_active = NULL;
337     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
338     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
339   }
340   if (NULL != mq->cont)
341     mq->cont (mq->cont_cls, success);
342   GNUNET_free (mq);
343 }
344
345
346 /**
347  * Check the ready list for the given neighbour and if a plugin is
348  * ready for transmission (and if we have a message), do so!
349  *
350  * @param n target peer for which to transmit
351  */
352 static void
353 try_transmission_to_peer (struct NeighbourMapEntry *n)
354 {
355   struct MessageQueue *mq;
356   struct GNUNET_TIME_Relative timeout;
357   ssize_t ret;
358   struct GNUNET_TRANSPORT_PluginFunctions *papi;
359
360   if (n->is_active != NULL)
361     return;                     /* transmission already pending */
362   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
363     return;                     /* currently waiting for bandwidth */
364   mq = n->messages_head;
365   while (NULL != (mq = n->messages_head))
366   {
367     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
368     if (timeout.rel_value > 0)
369       break;
370     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
371   }
372   if (NULL == mq)
373     return;                     /* no more messages */
374
375   papi = GST_plugins_find (n->plugin_name);
376   if (papi == NULL)
377   {
378     GNUNET_break (0);
379     return;
380   }
381   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
382   n->is_active = mq;
383   mq->n = n;
384
385   if  (((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0)))
386   {
387     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No address peer for peer `%s'\n",
388                 GNUNET_i2s (&n->id));
389     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
390     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
391     return;
392   }
393
394   ret =
395       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
396                   0 /* priority -- remove from plugin API? */ ,
397                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
398                   &transmit_send_continuation, mq);
399   if (ret == -1)
400   {
401     /* failure, but 'send' would not call continuation in this case,
402      * so we need to do it here! */
403     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
404     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
405   }
406 }
407
408
409 /**
410  * Task invoked to start a transmission to another peer.
411  *
412  * @param cls the 'struct NeighbourMapEntry'
413  * @param tc scheduler context
414  */
415 static void
416 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
417 {
418   struct NeighbourMapEntry *n = cls;
419
420   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
421   try_transmission_to_peer (n);
422 }
423
424
425 /**
426  * Initialize the neighbours subsystem.
427  *
428  * @param cls closure for callbacks
429  * @param connect_cb function to call if we connect to a peer
430  * @param disconnect_cb function to call if we disconnect from a peer
431  */
432 void
433 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
434                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
435 {
436   callback_cls = cls;
437   connect_notify_cb = connect_cb;
438   disconnect_notify_cb = disconnect_cb;
439   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
440 }
441
442
443 /**
444  * Disconnect from the given neighbour, clean up the record.
445  *
446  * @param n neighbour to disconnect from
447  */
448 static void
449 disconnect_neighbour (struct NeighbourMapEntry *n)
450 {
451   struct MessageQueue *mq;
452
453   if (GNUNET_YES == n->in_disconnect)
454     return;
455   n->in_disconnect = GNUNET_YES;
456   while (NULL != (mq = n->messages_head))
457   {
458     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
459     mq->cont (mq->cont_cls, GNUNET_SYSERR);
460     GNUNET_free (mq);
461   }
462   if (NULL != n->is_active)
463   {
464     n->is_active->n = NULL;
465     n->is_active = NULL;
466   }
467   if (GNUNET_YES == n->is_connected)
468   {
469     n->is_connected = GNUNET_NO;
470
471     GNUNET_assert (neighbours_connected > 0);
472     neighbours_connected--;
473
474     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
475                               GNUNET_NO);
476     disconnect_notify_cb (callback_cls, &n->id);
477   }
478   GNUNET_assert (GNUNET_YES ==
479                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
480                                                        &n->id.hashPubKey, n));
481   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
482   {
483     GNUNET_SCHEDULER_cancel (n->timeout_task);
484     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
485   }
486   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
487   {
488     GNUNET_SCHEDULER_cancel (n->transmission_task);
489     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
490   }
491   if (NULL != n->asc)
492   {
493     GNUNET_ATS_suggest_address_cancel (n->asc);
494     n->asc = NULL;
495   }
496   GNUNET_array_grow (n->ats, n->ats_count, 0);
497   if (NULL != n->plugin_name)
498   {
499     GNUNET_free (n->plugin_name);
500     n->plugin_name = NULL;
501   }
502   if (NULL != n->addr)
503   {
504     GNUNET_free (n->addr);
505     n->addr = NULL;
506     n->addrlen = 0;
507   }
508   n->session = NULL;
509   GNUNET_free (n);
510 }
511
512
513 /**
514  * Peer has been idle for too long. Disconnect.
515  *
516  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
517  * @param tc scheduler context
518  */
519 static void
520 neighbour_timeout_task (void *cls,
521                         const struct GNUNET_SCHEDULER_TaskContext *tc)
522 {
523   struct NeighbourMapEntry *n = cls;
524
525   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
526   disconnect_neighbour (n);
527 }
528
529
530 /**
531  * Disconnect from the given neighbour.
532  *
533  * @param cls unused
534  * @param key hash of neighbour's public key (not used)
535  * @param value the 'struct NeighbourMapEntry' of the neighbour
536  */
537 static int
538 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
539 {
540   struct NeighbourMapEntry *n = value;
541
542 #if DEBUG_TRANSPORT
543   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
544               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
545 #endif
546   disconnect_neighbour (n);
547   return GNUNET_OK;
548 }
549
550
551 /**
552  * Cleanup the neighbours subsystem.
553  */
554 void
555 GST_neighbours_stop ()
556 {
557   GNUNET_assert (neighbours != NULL);
558
559   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
560                                          NULL);
561   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
562   GNUNET_assert (neighbours_connected == 0);
563   neighbours = NULL;
564   callback_cls = NULL;
565   connect_notify_cb = NULL;
566   disconnect_notify_cb = NULL;
567 }
568
569
570 /**
571  * For an existing neighbour record, set the active connection to
572  * the given address.
573  *
574  * @param peer identity of the peer to switch the address for
575  * @param plugin_name name of transport that delivered the PONG
576  * @param address address of the other peer, NULL if other peer
577  *                       connected to us
578  * @param address_len number of bytes in address
579  * @param session session to use (or NULL)
580  * @param ats performance data
581  * @param ats_count number of entries in ats (excluding 0-termination)
582  */
583 void
584 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
585                                   const char *plugin_name, const void *address,
586                                   size_t address_len, struct Session *session,
587                                   const struct GNUNET_TRANSPORT_ATS_Information
588                                   *ats, uint32_t ats_count)
589 {
590   struct NeighbourMapEntry *n;
591   struct SessionConnectMessage connect_msg;
592
593   GNUNET_assert (neighbours != NULL);
594
595   n = lookup_neighbour (peer);
596   if (NULL == n)
597   {
598     GNUNET_break (0);
599     return;
600   }
601
602 #if DEBUG_TRANSPORT
603   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
604               "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
605               GNUNET_i2s (peer), plugin_name,
606               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
607                                                                   address,
608                                                                   address_len),
609               session);
610 #endif
611
612   GNUNET_free_non_null (n->addr);
613   n->addr = GNUNET_malloc (address_len);
614   memcpy (n->addr, address, address_len);
615   n->addrlen = address_len;
616   n->session = session;
617   GNUNET_array_grow (n->ats, n->ats_count, ats_count);
618   memcpy (n->ats, ats,
619           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
620   GNUNET_free_non_null (n->plugin_name);
621   n->plugin_name = GNUNET_strdup (plugin_name);
622   GNUNET_SCHEDULER_cancel (n->timeout_task);
623   n->timeout_task =
624       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
625                                     &neighbour_timeout_task, n);
626   connect_msg.header.size = htons (sizeof (struct SessionConnectMessage));
627   connect_msg.header.type =
628       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
629   connect_msg.reserved = htonl (0);
630   connect_msg.timestamp =
631       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
632   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
633                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
634 }
635
636
637 /**
638  * Try to connect to the target peer using the given address
639  *
640  * @param cls the 'struct NeighbourMapEntry' of the target
641  * @param target identity of the target peer
642  * @param plugin_name name of the plugin
643  * @param plugin_address binary address
644  * @param plugin_address_len length of address
645  * @param session session to use
646  * @param bandwidth available bandwidth
647  * @param ats performance data for the address (as far as known)
648  * @param ats_count number of performance records in 'ats'
649  */
650 static void
651 try_connect_using_address (void *cls, const struct GNUNET_PeerIdentity *target,
652                            const char *plugin_name, const void *plugin_address,
653                            size_t plugin_address_len, struct Session *session,
654                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
655                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
656                            uint32_t ats_count)
657 {
658   struct NeighbourMapEntry *n = cls;
659   int was_connected;
660
661   n->asc = NULL;
662   was_connected = n->is_connected;
663   n->is_connected = GNUNET_YES;
664   GST_neighbours_switch_to_address (target, plugin_name, plugin_address,
665                                     plugin_address_len, session, ats,
666                                     ats_count);
667   if (GNUNET_YES == was_connected)
668     return;
669
670   neighbours_connected++;
671   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
672                             GNUNET_NO);
673   connect_notify_cb (callback_cls, target, n->ats, n->ats_count);
674 }
675
676
677 /**
678  * Try to create a connection to the given target (eventually).
679  *
680  * @param target peer to try to connect to
681  */
682 void
683 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
684 {
685   struct NeighbourMapEntry *n;
686
687   GNUNET_assert (neighbours != NULL);
688
689 #if DEBUG_TRANSPORT
690   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
691               GNUNET_i2s (target));
692 #endif
693
694   GNUNET_assert (0 !=
695                  memcmp (target, &GST_my_identity,
696                          sizeof (struct GNUNET_PeerIdentity)));
697   n = lookup_neighbour (target);
698   if ((NULL != n) && (GNUNET_YES == n->is_connected))
699     return;                     /* already connected */
700   if (n == NULL)
701   {
702 #if DEBUG_TRANSPORT
703     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704                 "Unknown peer `%s', creating new neighbour\n",
705                 GNUNET_i2s (target));
706 #endif
707     n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
708     n->id = *target;
709     GNUNET_array_grow (n->ats, n->ats_count, 1);
710     n->ats[0].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);;
711     n->ats[0].value = htonl (0);
712     GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
713                                    GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
714                                    MAX_BANDWIDTH_CARRY_S);
715     n->timeout_task =
716         GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
717                                       &neighbour_timeout_task, n);
718     GNUNET_assert (GNUNET_OK ==
719                    GNUNET_CONTAINER_multihashmap_put (neighbours,
720                                                       &n->id.hashPubKey, n,
721                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
722   }
723   if (n->asc != NULL)
724     return;                     /* already trying */
725 #if DEBUG_TRANSPORT
726   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
727               "Asking ATS for suggested address to connect to peer `%s'\n",
728               GNUNET_i2s (target));
729 #endif
730   n->asc =
731       GNUNET_ATS_suggest_address (GST_ats, target, &try_connect_using_address,
732                                   n);
733 }
734
735
736 /**
737  * Test if we're connected to the given peer.
738  *
739  * @param target peer to test
740  * @return GNUNET_YES if we are connected, GNUNET_NO if not
741  */
742 int
743 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
744 {
745   struct NeighbourMapEntry *n;
746
747   GNUNET_assert (neighbours != NULL);
748
749   n = lookup_neighbour (target);
750   if ((NULL == n) || (n->is_connected != GNUNET_YES))
751     return GNUNET_NO;           /* not connected */
752   return GNUNET_YES;
753 }
754
755
756 /**
757  * A session was terminated. Take note.
758  *
759  * @param peer identity of the peer where the session died
760  * @param session session that is gone
761  */
762 void
763 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
764                                    struct Session *session)
765 {
766   struct NeighbourMapEntry *n;
767
768   GNUNET_assert (neighbours != NULL);
769
770 #if DEBUG_TRANSPORT
771   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
772               "Session %X to peer `%s' ended \n",
773               session, GNUNET_i2s (peer));
774 #endif
775
776   n = lookup_neighbour (peer);
777   if (NULL == n)
778     return;
779   if (session != n->session)
780     return;                     /* doesn't affect us */
781
782   n->session = NULL;
783   GNUNET_free (n->addr);
784   n->addr = NULL;
785   n->addrlen = 0;
786
787
788   if (GNUNET_YES != n->is_connected)
789     return;                     /* not connected anymore anyway, shouldn't matter */
790
791   /* we are not connected until ATS suggests a new address */
792   //n->is_connected = GNUNET_NO;
793
794   GNUNET_SCHEDULER_cancel (n->timeout_task);
795   n->timeout_task =
796       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
797                                     &neighbour_timeout_task, n);
798   /* try QUICKLY to re-establish a connection, reduce timeout! */
799   if (NULL != n->ats)
800   {
801     /* how can this be!? */
802     //GNUNET_break (0);
803     return;
804   }
805   n->asc =
806       GNUNET_ATS_suggest_address (GST_ats, peer, &try_connect_using_address, n);
807 }
808
809
810 /**
811  * Transmit a message to the given target using the active connection.
812  *
813  * @param target destination
814  * @param msg message to send
815  * @param msg_size number of bytes in msg
816  * @param timeout when to fail with timeout
817  * @param cont function to call when done
818  * @param cont_cls closure for 'cont'
819  */
820 void
821 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
822                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
823                      GST_NeighbourSendContinuation cont, void *cont_cls)
824 {
825   struct NeighbourMapEntry *n;
826   struct MessageQueue *mq;
827
828   GNUNET_assert (neighbours != NULL);
829
830   n = lookup_neighbour (target);
831   if ((n == NULL) || (GNUNET_YES != n->is_connected))
832   {
833     GNUNET_STATISTICS_update (GST_stats,
834                               gettext_noop
835                               ("# messages not sent (no such peer or not connected)"),
836                               1, GNUNET_NO);
837 #if DEBUG_TRANSPORT
838     if (n == NULL)
839       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
840                   "Could not send message to peer `%s': unknown neighbor",
841                   GNUNET_i2s (target));
842     else if (GNUNET_YES != n->is_connected)
843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844                   "Could not send message to peer `%s': not connected\n",
845                   GNUNET_i2s (target));
846 #endif
847     if (NULL != cont)
848       cont (cont_cls, GNUNET_SYSERR);
849     return;
850   }
851
852   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0))
853   {
854     GNUNET_STATISTICS_update (GST_stats,
855                               gettext_noop
856                               ("# messages not sent (no such peer or not connected)"),
857                               1, GNUNET_NO);
858 #if DEBUG_TRANSPORT
859       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
860                   "Could not send message to peer `%s': no address available\n",
861                   GNUNET_i2s (target));
862 #endif
863
864     if (NULL != cont)
865       cont (cont_cls, GNUNET_SYSERR);
866     return;
867   }
868
869
870   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
871   GNUNET_STATISTICS_update (GST_stats,
872                             gettext_noop
873                             ("# bytes in message queue for other peers"),
874                             msg_size, GNUNET_NO);
875   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
876   mq->cont = cont;
877   mq->cont_cls = cont_cls;
878   /* FIXME: this memcpy can be up to 7% of our total runtime! */
879   memcpy (&mq[1], msg, msg_size);
880   mq->message_buf = (const char *) &mq[1];
881   mq->message_buf_size = msg_size;
882   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
883   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
884   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
885       (NULL == n->is_active))
886     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
887 }
888
889
890 /**
891  * We have received a message from the given sender.  How long should
892  * we delay before receiving more?  (Also used to keep the peer marked
893  * as live).
894  *
895  * @param sender sender of the message
896  * @param size size of the message
897  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
898  *                   GNUNET_NO if the neighbour is not connected or violates the quota
899  * @return how long to wait before reading more from this sender
900  */
901 struct GNUNET_TIME_Relative
902 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
903                                         *sender, ssize_t size, int *do_forward)
904 {
905   struct NeighbourMapEntry *n;
906   struct GNUNET_TIME_Relative ret;
907
908   GNUNET_assert (neighbours != NULL);
909
910   n = lookup_neighbour (sender);
911   if (n == NULL)
912   {
913     *do_forward = GNUNET_NO;
914     return GNUNET_TIME_UNIT_ZERO;
915   }
916   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
917   {
918     n->quota_violation_count++;
919 #if DEBUG_TRANSPORT
920     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
922                 n->in_tracker.available_bytes_per_s__,
923                 n->quota_violation_count);
924 #endif
925     /* Discount 32k per violation */
926     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
927   }
928   else
929   {
930     if (n->quota_violation_count > 0)
931     {
932       /* try to add 32k back */
933       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
934       n->quota_violation_count--;
935     }
936   }
937   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
938   {
939     GNUNET_STATISTICS_update (GST_stats,
940                               gettext_noop
941                               ("# bandwidth quota violations by other peers"),
942                               1, GNUNET_NO);
943     *do_forward = GNUNET_NO;
944     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
945   }
946   *do_forward = GNUNET_YES;
947   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
948   if (ret.rel_value > 0)
949   {
950 #if DEBUG_TRANSPORT
951     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
952                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
953                 (unsigned long long) n->in_tracker.
954                 consumption_since_last_update__,
955                 (unsigned int) n->in_tracker.available_bytes_per_s__,
956                 (unsigned long long) ret.rel_value);
957 #endif
958     GNUNET_STATISTICS_update (GST_stats,
959                               gettext_noop ("# ms throttling suggested"),
960                               (int64_t) ret.rel_value, GNUNET_NO);
961   }
962   return ret;
963 }
964
965
966 /**
967  * Keep the connection to the given neighbour alive longer,
968  * we received a KEEPALIVE (or equivalent).
969  *
970  * @param neighbour neighbour to keep alive
971  */
972 void
973 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
974 {
975   struct NeighbourMapEntry *n;
976
977   GNUNET_assert (neighbours != NULL);
978
979   n = lookup_neighbour (neighbour);
980   if (NULL == n)
981   {
982     GNUNET_STATISTICS_update (GST_stats,
983                               gettext_noop
984                               ("# KEEPALIVE messages discarded (not connected)"),
985                               1, GNUNET_NO);
986     return;
987   }
988   GNUNET_SCHEDULER_cancel (n->timeout_task);
989   n->timeout_task =
990       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
991                                     &neighbour_timeout_task, n);
992 }
993
994
995 /**
996  * Change the incoming quota for the given peer.
997  *
998  * @param neighbour identity of peer to change qutoa for
999  * @param quota new quota
1000  */
1001 void
1002 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1003                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1004 {
1005   struct NeighbourMapEntry *n;
1006
1007   GNUNET_assert (neighbours != NULL);
1008
1009   n = lookup_neighbour (neighbour);
1010   if (n == NULL)
1011   {
1012     GNUNET_STATISTICS_update (GST_stats,
1013                               gettext_noop
1014                               ("# SET QUOTA messages ignored (no such peer)"),
1015                               1, GNUNET_NO);
1016     return;
1017   }
1018   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1019   if (0 != ntohl (quota.value__))
1020     return;
1021 #if DEBUG_TRANSPORT
1022   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1023               GNUNET_i2s (&n->id), "SET_QUOTA");
1024 #endif
1025   GNUNET_STATISTICS_update (GST_stats,
1026                             gettext_noop ("# disconnects due to quota of 0"), 1,
1027                             GNUNET_NO);
1028   disconnect_neighbour (n);
1029 }
1030
1031
1032 /**
1033  * Closure for the neighbours_iterate function.
1034  */
1035 struct IteratorContext
1036 {
1037   /**
1038    * Function to call on each connected neighbour.
1039    */
1040   GST_NeighbourIterator cb;
1041
1042   /**
1043    * Closure for 'cb'.
1044    */
1045   void *cb_cls;
1046 };
1047
1048
1049 /**
1050  * Call the callback from the closure for each connected neighbour.
1051  *
1052  * @param cls the 'struct IteratorContext'
1053  * @param key the hash of the public key of the neighbour
1054  * @param value the 'struct NeighbourMapEntry'
1055  * @return GNUNET_OK (continue to iterate)
1056  */
1057 static int
1058 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1059 {
1060   struct IteratorContext *ic = cls;
1061   struct NeighbourMapEntry *n = value;
1062
1063   if (GNUNET_YES != n->is_connected)
1064     return GNUNET_OK;
1065
1066   GNUNET_assert (n->ats_count > 0);
1067   ic->cb (ic->cb_cls, &n->id, n->ats, n->ats_count);
1068   return GNUNET_OK;
1069 }
1070
1071
1072 /**
1073  * Iterate over all connected neighbours.
1074  *
1075  * @param cb function to call
1076  * @param cb_cls closure for cb
1077  */
1078 void
1079 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1080 {
1081   struct IteratorContext ic;
1082
1083   GNUNET_assert (neighbours != NULL);
1084
1085   ic.cb = cb;
1086   ic.cb_cls = cb_cls;
1087   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1088 }
1089
1090
1091 /**
1092  * If we have an active connection to the given target, it must be shutdown.
1093  *
1094  * @param target peer to disconnect from
1095  */
1096 void
1097 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1098 {
1099   struct NeighbourMapEntry *n;
1100   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1101   struct SessionDisconnectMessage disconnect_msg;
1102
1103   GNUNET_assert (neighbours != NULL);
1104
1105   n = lookup_neighbour (target);
1106   if (NULL == n)
1107     return;                     /* not active */
1108   if (GNUNET_YES == n->is_connected)
1109   {
1110     /* we're actually connected, send DISCONNECT message */
1111     disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
1112     disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1113     disconnect_msg.reserved = htonl (0);
1114     disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1115                                          sizeof (struct GNUNET_TIME_AbsoluteNBO));
1116     disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
1117     disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1118     GNUNET_assert (GNUNET_OK ==
1119                    GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
1120                                            &disconnect_msg.purpose,
1121                                            &disconnect_msg.signature));
1122     papi = GST_plugins_find (n->plugin_name);
1123     if (papi != NULL)
1124       papi->send (papi->cls, target, (const void *) &disconnect_msg,
1125                   sizeof (struct GNUNET_MessageHeader),
1126                   UINT32_MAX /* priority */ ,
1127                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
1128                   GNUNET_YES, NULL, NULL);
1129   }
1130   disconnect_neighbour (n);
1131 }
1132
1133
1134 /* end of file gnunet-service-transport_neighbours.c */