report disconnect reasons in stats
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48
49 /**
50  * Entry in neighbours.
51  */
52 struct NeighbourMapEntry;
53
54 /**
55  * Message a peer sends to another to indicate its
56  * preference for communicating via a particular
57  * session (and the desire to establish a real
58  * connection).
59  */
60 struct SessionConnectMessage
61 {
62   /**
63    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
64    */
65   struct GNUNET_MessageHeader header;
66
67   /**
68    * Always zero.
69    */
70   uint32_t reserved GNUNET_PACKED;
71
72   /**
73    * Absolute time at the sender.  Only the most recent connect
74    * message implies which session is preferred by the sender.
75    */
76   struct GNUNET_TIME_AbsoluteNBO timestamp;
77
78 };
79
80
81 struct SessionDisconnectMessage
82 {
83   /**
84    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
85    */
86   struct GNUNET_MessageHeader header;
87
88   /**
89    * Always zero.
90    */
91   uint32_t reserved GNUNET_PACKED;
92
93   /**
94    * Purpose of the signature.  Extends over the timestamp.
95    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
96    */
97   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
98
99   /**
100    * Absolute time at the sender.  Only the most recent connect
101    * message implies which session is preferred by the sender.
102    */
103   struct GNUNET_TIME_AbsoluteNBO timestamp;
104
105   /**
106    * Signature of the peer that sends us the disconnect.  Only
107    * valid if the timestamp is AFTER the timestamp from the
108    * corresponding 'CONNECT' message.
109    */
110   struct GNUNET_CRYPTO_RsaSignature signature;
111 };
112
113
114 /**
115  * For each neighbour we keep a list of messages
116  * that we still want to transmit to the neighbour.
117  */
118 struct MessageQueue
119 {
120
121   /**
122    * This is a doubly linked list.
123    */
124   struct MessageQueue *next;
125
126   /**
127    * This is a doubly linked list.
128    */
129   struct MessageQueue *prev;
130
131   /**
132    * Once this message is actively being transmitted, which
133    * neighbour is it associated with?
134    */
135   struct NeighbourMapEntry *n;
136
137   /**
138    * Function to call once we're done.
139    */
140   GST_NeighbourSendContinuation cont;
141
142   /**
143    * Closure for 'cont'
144    */
145   void *cont_cls;
146
147   /**
148    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
149    * stuck together in memory.  Allocated at the end of this struct.
150    */
151   const char *message_buf;
152
153   /**
154    * Size of the message buf
155    */
156   size_t message_buf_size;
157
158   /**
159    * At what time should we fail?
160    */
161   struct GNUNET_TIME_Absolute timeout;
162
163 };
164
165
166 /**
167  * Entry in neighbours.
168  */
169 struct NeighbourMapEntry
170 {
171
172   /**
173    * Head of list of messages we would like to send to this peer;
174    * must contain at most one message per client.
175    */
176   struct MessageQueue *messages_head;
177
178   /**
179    * Tail of list of messages we would like to send to this peer; must
180    * contain at most one message per client.
181    */
182   struct MessageQueue *messages_tail;
183
184   /**
185    * Context for address suggestion.
186    * NULL after we are connected.
187    */
188   struct GNUNET_ATS_SuggestionContext *asc;
189
190   /**
191    * Performance data for the peer.
192    */
193   struct GNUNET_TRANSPORT_ATS_Information *ats;
194
195   /**
196    * Are we currently trying to send a message? If so, which one?
197    */
198   struct MessageQueue *is_active;
199
200   /**
201    * Active session for communicating with the peer.
202    */
203   struct Session *session;
204
205   /**
206    * Name of the plugin we currently use.
207    */
208   char *plugin_name;
209
210   /**
211    * Address used for communicating with the peer, NULL for inbound connections.
212    */
213   void *addr;
214
215   /**
216    * Number of bytes in 'addr'.
217    */
218   size_t addrlen;
219
220   /**
221    * Identity of this neighbour.
222    */
223   struct GNUNET_PeerIdentity id;
224
225   /**
226    * ID of task scheduled to run when this peer is about to
227    * time out (will free resources associated with the peer).
228    */
229   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
230
231   /**
232    * ID of task scheduled to run when we should try transmitting
233    * the head of the message queue.
234    */
235   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
236
237   /**
238    * Tracker for inbound bandwidth.
239    */
240   struct GNUNET_BANDWIDTH_Tracker in_tracker;
241
242   /**
243    * How often has the other peer (recently) violated the inbound
244    * traffic limit?  Incremented by 10 per violation, decremented by 1
245    * per non-violation (for each time interval).
246    */
247   unsigned int quota_violation_count;
248
249   /**
250    * Number of values in 'ats' array.
251    */
252   unsigned int ats_count;
253
254   /**
255    * Are we already in the process of disconnecting this neighbour?
256    */
257   int in_disconnect;
258
259   /**
260    * Do we currently consider this neighbour connected? (as far as
261    * the connect/disconnect callbacks are concerned)?
262    */
263   int is_connected;
264
265 };
266
267
268 /**
269  * All known neighbours and their HELLOs.
270  */
271 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
272
273 /**
274  * Closure for connect_notify_cb and disconnect_notify_cb
275  */
276 static void *callback_cls;
277
278 /**
279  * Function to call when we connected to a neighbour.
280  */
281 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
282
283 /**
284  * Function to call when we disconnected from a neighbour.
285  */
286 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
287
288 /**
289  * counter for connected neighbours
290  */
291 static int neighbours_connected;
292
293 /**
294  * Lookup a neighbour entry in the neighbours hash map.
295  *
296  * @param pid identity of the peer to look up
297  * @return the entry, NULL if there is no existing record
298  */
299 static struct NeighbourMapEntry *
300 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
301 {
302   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
303 }
304
305
306 /**
307  * Task invoked to start a transmission to another peer.
308  *
309  * @param cls the 'struct NeighbourMapEntry'
310  * @param tc scheduler context
311  */
312 static void
313 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
314
315
316 /**
317  * We're done with our transmission attempt, continue processing.
318  *
319  * @param cls the 'struct MessageQueue' of the message
320  * @param receiver intended receiver
321  * @param success whether it worked or not
322  */
323 static void
324 transmit_send_continuation (void *cls,
325                             const struct GNUNET_PeerIdentity *receiver,
326                             int success)
327 {
328   struct MessageQueue *mq;
329   struct NeighbourMapEntry *n;
330
331   mq = cls;
332   n = mq->n;
333   if (NULL != n)
334   {
335     GNUNET_assert (n->is_active == mq);
336     n->is_active = NULL;
337     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
338     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
339   }
340   if (NULL != mq->cont)
341     mq->cont (mq->cont_cls, success);
342   GNUNET_free (mq);
343 }
344
345
346 /**
347  * Check the ready list for the given neighbour and if a plugin is
348  * ready for transmission (and if we have a message), do so!
349  *
350  * @param n target peer for which to transmit
351  */
352 static void
353 try_transmission_to_peer (struct NeighbourMapEntry *n)
354 {
355   struct MessageQueue *mq;
356   struct GNUNET_TIME_Relative timeout;
357   ssize_t ret;
358   struct GNUNET_TRANSPORT_PluginFunctions *papi;
359
360   if (n->is_active != NULL)
361     return;                     /* transmission already pending */
362   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
363     return;                     /* currently waiting for bandwidth */
364   while (NULL != (mq = n->messages_head))
365   {
366     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
367     if (timeout.rel_value > 0)
368       break;
369     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
370     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
371   }
372   if (NULL == mq)
373     return;                     /* no more messages */
374
375   papi = GST_plugins_find (n->plugin_name);
376   if (papi == NULL)
377   {
378     GNUNET_break (0);
379     return;
380   }
381   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
382   n->is_active = mq;
383   mq->n = n;
384
385   if  (((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0)))
386   {
387     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No address peer for peer `%s'\n",
388                 GNUNET_i2s (&n->id));
389     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
390     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
391     return;
392   }
393
394   ret =
395       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
396                   0 /* priority -- remove from plugin API? */ ,
397                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
398                   &transmit_send_continuation, mq);
399   if (ret == -1)
400   {
401     /* failure, but 'send' would not call continuation in this case,
402      * so we need to do it here! */
403     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
404     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
405   }
406 }
407
408
409 /**
410  * Task invoked to start a transmission to another peer.
411  *
412  * @param cls the 'struct NeighbourMapEntry'
413  * @param tc scheduler context
414  */
415 static void
416 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
417 {
418   struct NeighbourMapEntry *n = cls;
419
420   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
421   try_transmission_to_peer (n);
422 }
423
424
425 /**
426  * Initialize the neighbours subsystem.
427  *
428  * @param cls closure for callbacks
429  * @param connect_cb function to call if we connect to a peer
430  * @param disconnect_cb function to call if we disconnect from a peer
431  */
432 void
433 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
434                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
435 {
436   callback_cls = cls;
437   connect_notify_cb = connect_cb;
438   disconnect_notify_cb = disconnect_cb;
439   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
440 }
441
442
443 /**
444  * Disconnect from the given neighbour, clean up the record.
445  *
446  * @param n neighbour to disconnect from
447  */
448 static void
449 disconnect_neighbour (struct NeighbourMapEntry *n)
450 {
451   struct MessageQueue *mq;
452
453   if (GNUNET_YES == n->in_disconnect)
454     return;
455   n->in_disconnect = GNUNET_YES;
456   while (NULL != (mq = n->messages_head))
457   {
458     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
459     mq->cont (mq->cont_cls, GNUNET_SYSERR);
460     GNUNET_free (mq);
461   }
462   if (NULL != n->is_active)
463   {
464     n->is_active->n = NULL;
465     n->is_active = NULL;
466   }
467   if (GNUNET_YES == n->is_connected)
468   {
469     n->is_connected = GNUNET_NO;
470
471     GNUNET_assert (neighbours_connected > 0);
472     neighbours_connected--;
473
474     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
475                               GNUNET_NO);
476     disconnect_notify_cb (callback_cls, &n->id);
477   }
478   GNUNET_assert (GNUNET_YES ==
479                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
480                                                        &n->id.hashPubKey, n));
481   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
482   {
483     GNUNET_SCHEDULER_cancel (n->timeout_task);
484     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
485   }
486   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
487   {
488     GNUNET_SCHEDULER_cancel (n->transmission_task);
489     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
490   }
491   if (NULL != n->asc)
492   {
493     GNUNET_ATS_suggest_address_cancel (n->asc);
494     n->asc = NULL;
495   }
496   GNUNET_array_grow (n->ats, n->ats_count, 0);
497   if (NULL != n->plugin_name)
498   {
499     GNUNET_free (n->plugin_name);
500     n->plugin_name = NULL;
501   }
502   if (NULL != n->addr)
503   {
504     GNUNET_free (n->addr);
505     n->addr = NULL;
506     n->addrlen = 0;
507   }
508   n->session = NULL;
509   GNUNET_free (n);
510 }
511
512
513 /**
514  * Peer has been idle for too long. Disconnect.
515  *
516  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
517  * @param tc scheduler context
518  */
519 static void
520 neighbour_timeout_task (void *cls,
521                         const struct GNUNET_SCHEDULER_TaskContext *tc)
522 {
523   struct NeighbourMapEntry *n = cls;
524
525   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
526   if (GNUNET_YES == n->is_connected)
527     GNUNET_STATISTICS_update (GST_stats,
528                               gettext_noop ("# peers disconnected due to timeout"), 1,
529                               GNUNET_NO);
530   disconnect_neighbour (n);
531 }
532
533
534 /**
535  * Disconnect from the given neighbour.
536  *
537  * @param cls unused
538  * @param key hash of neighbour's public key (not used)
539  * @param value the 'struct NeighbourMapEntry' of the neighbour
540  */
541 static int
542 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
543 {
544   struct NeighbourMapEntry *n = value;
545
546 #if DEBUG_TRANSPORT
547   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
548               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
549 #endif
550   if (GNUNET_YES == n->is_connected)
551     GNUNET_STATISTICS_update (GST_stats,
552                               gettext_noop ("# peers disconnected due to global disconnect"), 1,
553                               GNUNET_NO);
554   disconnect_neighbour (n);
555   return GNUNET_OK;
556 }
557
558
559 /**
560  * Cleanup the neighbours subsystem.
561  */
562 void
563 GST_neighbours_stop ()
564 {
565   GNUNET_assert (neighbours != NULL);
566
567   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
568                                          NULL);
569   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
570   GNUNET_assert (neighbours_connected == 0);
571   neighbours = NULL;
572   callback_cls = NULL;
573   connect_notify_cb = NULL;
574   disconnect_notify_cb = NULL;
575 }
576
577
578 /**
579  * For an existing neighbour record, set the active connection to
580  * the given address.
581  *
582  * @param peer identity of the peer to switch the address for
583  * @param plugin_name name of transport that delivered the PONG
584  * @param address address of the other peer, NULL if other peer
585  *                       connected to us
586  * @param address_len number of bytes in address
587  * @param session session to use (or NULL)
588  * @param ats performance data
589  * @param ats_count number of entries in ats (excluding 0-termination)
590  */
591 void
592 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
593                                   const char *plugin_name, const void *address,
594                                   size_t address_len, struct Session *session,
595                                   const struct GNUNET_TRANSPORT_ATS_Information
596                                   *ats, uint32_t ats_count)
597 {
598   struct NeighbourMapEntry *n;
599   struct SessionConnectMessage connect_msg;
600
601   GNUNET_assert (neighbours != NULL);
602
603   n = lookup_neighbour (peer);
604   if (NULL == n)
605   {
606     GNUNET_break (0);
607     return;
608   }
609
610 #if DEBUG_TRANSPORT
611   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
612               "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
613               GNUNET_i2s (peer), plugin_name,
614               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
615                                                                   address,
616                                                                   address_len),
617               session);
618 #endif
619
620   GNUNET_free_non_null (n->addr);
621   n->addr = GNUNET_malloc (address_len);
622   memcpy (n->addr, address, address_len);
623   n->addrlen = address_len;
624   n->session = session;
625   GNUNET_array_grow (n->ats, n->ats_count, ats_count);
626   memcpy (n->ats, ats,
627           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
628   GNUNET_free_non_null (n->plugin_name);
629   n->plugin_name = GNUNET_strdup (plugin_name);
630   GNUNET_SCHEDULER_cancel (n->timeout_task);
631   n->timeout_task =
632       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
633                                     &neighbour_timeout_task, n);
634   connect_msg.header.size = htons (sizeof (struct SessionConnectMessage));
635   connect_msg.header.type =
636       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
637   connect_msg.reserved = htonl (0);
638   connect_msg.timestamp =
639       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
640   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
641                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
642 }
643
644
645 /**
646  * Try to connect to the target peer using the given address
647  *
648  * @param cls the 'struct NeighbourMapEntry' of the target
649  * @param target identity of the target peer
650  * @param plugin_name name of the plugin
651  * @param plugin_address binary address
652  * @param plugin_address_len length of address
653  * @param session session to use
654  * @param bandwidth available bandwidth
655  * @param ats performance data for the address (as far as known)
656  * @param ats_count number of performance records in 'ats'
657  */
658 static void
659 try_connect_using_address (void *cls, const struct GNUNET_PeerIdentity *target,
660                            const char *plugin_name, const void *plugin_address,
661                            size_t plugin_address_len, struct Session *session,
662                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
663                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
664                            uint32_t ats_count)
665 {
666   struct NeighbourMapEntry *n = cls;
667   int was_connected;
668
669   n->asc = NULL;
670   was_connected = n->is_connected;
671   n->is_connected = GNUNET_YES;
672   GST_neighbours_switch_to_address (target, plugin_name, plugin_address,
673                                     plugin_address_len, session, ats,
674                                     ats_count);
675   if (GNUNET_YES == was_connected)
676     return;
677
678   neighbours_connected++;
679   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
680                             GNUNET_NO);
681   connect_notify_cb (callback_cls, target, n->ats, n->ats_count);
682 }
683
684
685 /**
686  * Try to create a connection to the given target (eventually).
687  *
688  * @param target peer to try to connect to
689  */
690 void
691 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
692 {
693   struct NeighbourMapEntry *n;
694
695   GNUNET_assert (neighbours != NULL);
696
697 #if DEBUG_TRANSPORT
698   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
699               GNUNET_i2s (target));
700 #endif
701
702   GNUNET_assert (0 !=
703                  memcmp (target, &GST_my_identity,
704                          sizeof (struct GNUNET_PeerIdentity)));
705   n = lookup_neighbour (target);
706   if ((NULL != n) && (GNUNET_YES == n->is_connected))
707     return;                     /* already connected */
708   if (n == NULL)
709   {
710 #if DEBUG_TRANSPORT
711     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
712                 "Unknown peer `%s', creating new neighbour\n",
713                 GNUNET_i2s (target));
714 #endif
715     n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
716     n->id = *target;
717     GNUNET_array_grow (n->ats, n->ats_count, 1);
718     n->ats[0].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);;
719     n->ats[0].value = htonl (0);
720     GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
721                                    GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
722                                    MAX_BANDWIDTH_CARRY_S);
723     n->timeout_task =
724         GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
725                                       &neighbour_timeout_task, n);
726     GNUNET_assert (GNUNET_OK ==
727                    GNUNET_CONTAINER_multihashmap_put (neighbours,
728                                                       &n->id.hashPubKey, n,
729                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
730   }
731   if (n->asc != NULL)
732     return;                     /* already trying */
733 #if DEBUG_TRANSPORT
734   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735               "Asking ATS for suggested address to connect to peer `%s'\n",
736               GNUNET_i2s (target));
737 #endif
738   n->asc =
739       GNUNET_ATS_suggest_address (GST_ats, target, &try_connect_using_address,
740                                   n);
741 }
742
743
744 /**
745  * Test if we're connected to the given peer.
746  *
747  * @param target peer to test
748  * @return GNUNET_YES if we are connected, GNUNET_NO if not
749  */
750 int
751 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
752 {
753   struct NeighbourMapEntry *n;
754
755   GNUNET_assert (neighbours != NULL);
756
757   n = lookup_neighbour (target);
758   if ((NULL == n) || (n->is_connected != GNUNET_YES))
759     return GNUNET_NO;           /* not connected */
760   return GNUNET_YES;
761 }
762
763
764 /**
765  * A session was terminated. Take note.
766  *
767  * @param peer identity of the peer where the session died
768  * @param session session that is gone
769  */
770 void
771 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
772                                    struct Session *session)
773 {
774   struct NeighbourMapEntry *n;
775
776   GNUNET_assert (neighbours != NULL);
777
778 #if DEBUG_TRANSPORT
779   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
780               "Session %X to peer `%s' ended \n",
781               session, GNUNET_i2s (peer));
782 #endif
783
784   n = lookup_neighbour (peer);
785   if (NULL == n)
786     return;
787   if (session != n->session)
788     return;                     /* doesn't affect us */
789
790   n->session = NULL;
791   GNUNET_free (n->addr);
792   n->addr = NULL;
793   n->addrlen = 0;
794
795
796   if (GNUNET_YES != n->is_connected)
797     return;                     /* not connected anymore anyway, shouldn't matter */
798
799   /* we are not connected until ATS suggests a new address */
800   //n->is_connected = GNUNET_NO;
801
802   GNUNET_SCHEDULER_cancel (n->timeout_task);
803   n->timeout_task =
804       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
805                                     &neighbour_timeout_task, n);
806   /* try QUICKLY to re-establish a connection, reduce timeout! */
807   if (NULL != n->ats)
808   {
809     /* how can this be!? */
810     //GNUNET_break (0);
811     return;
812   }
813   n->asc =
814       GNUNET_ATS_suggest_address (GST_ats, peer, &try_connect_using_address, n);
815 }
816
817
818 /**
819  * Transmit a message to the given target using the active connection.
820  *
821  * @param target destination
822  * @param msg message to send
823  * @param msg_size number of bytes in msg
824  * @param timeout when to fail with timeout
825  * @param cont function to call when done
826  * @param cont_cls closure for 'cont'
827  */
828 void
829 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
830                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
831                      GST_NeighbourSendContinuation cont, void *cont_cls)
832 {
833   struct NeighbourMapEntry *n;
834   struct MessageQueue *mq;
835
836   GNUNET_assert (neighbours != NULL);
837
838   n = lookup_neighbour (target);
839   if ((n == NULL) || (GNUNET_YES != n->is_connected))
840   {
841     GNUNET_STATISTICS_update (GST_stats,
842                               gettext_noop
843                               ("# messages not sent (no such peer or not connected)"),
844                               1, GNUNET_NO);
845 #if DEBUG_TRANSPORT
846     if (n == NULL)
847       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
848                   "Could not send message to peer `%s': unknown neighbor",
849                   GNUNET_i2s (target));
850     else if (GNUNET_YES != n->is_connected)
851       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
852                   "Could not send message to peer `%s': not connected\n",
853                   GNUNET_i2s (target));
854 #endif
855     if (NULL != cont)
856       cont (cont_cls, GNUNET_SYSERR);
857     return;
858   }
859
860   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0))
861   {
862     GNUNET_STATISTICS_update (GST_stats,
863                               gettext_noop
864                               ("# messages not sent (no such peer or not connected)"),
865                               1, GNUNET_NO);
866 #if DEBUG_TRANSPORT
867       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
868                   "Could not send message to peer `%s': no address available\n",
869                   GNUNET_i2s (target));
870 #endif
871
872     if (NULL != cont)
873       cont (cont_cls, GNUNET_SYSERR);
874     return;
875   }
876
877
878   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
879   GNUNET_STATISTICS_update (GST_stats,
880                             gettext_noop
881                             ("# bytes in message queue for other peers"),
882                             msg_size, GNUNET_NO);
883   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
884   mq->cont = cont;
885   mq->cont_cls = cont_cls;
886   /* FIXME: this memcpy can be up to 7% of our total runtime! */
887   memcpy (&mq[1], msg, msg_size);
888   mq->message_buf = (const char *) &mq[1];
889   mq->message_buf_size = msg_size;
890   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
891   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
892   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
893       (NULL == n->is_active))
894     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
895 }
896
897
898 /**
899  * We have received a message from the given sender.  How long should
900  * we delay before receiving more?  (Also used to keep the peer marked
901  * as live).
902  *
903  * @param sender sender of the message
904  * @param size size of the message
905  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
906  *                   GNUNET_NO if the neighbour is not connected or violates the quota
907  * @return how long to wait before reading more from this sender
908  */
909 struct GNUNET_TIME_Relative
910 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
911                                         *sender, ssize_t size, int *do_forward)
912 {
913   struct NeighbourMapEntry *n;
914   struct GNUNET_TIME_Relative ret;
915
916   GNUNET_assert (neighbours != NULL);
917
918   n = lookup_neighbour (sender);
919   if (n == NULL)
920   {
921     *do_forward = GNUNET_NO;
922     return GNUNET_TIME_UNIT_ZERO;
923   }
924   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
925   {
926     n->quota_violation_count++;
927 #if DEBUG_TRANSPORT
928     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
929                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
930                 n->in_tracker.available_bytes_per_s__,
931                 n->quota_violation_count);
932 #endif
933     /* Discount 32k per violation */
934     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
935   }
936   else
937   {
938     if (n->quota_violation_count > 0)
939     {
940       /* try to add 32k back */
941       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
942       n->quota_violation_count--;
943     }
944   }
945   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
946   {
947     GNUNET_STATISTICS_update (GST_stats,
948                               gettext_noop
949                               ("# bandwidth quota violations by other peers"),
950                               1, GNUNET_NO);
951     *do_forward = GNUNET_NO;
952     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
953   }
954   *do_forward = GNUNET_YES;
955   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
956   if (ret.rel_value > 0)
957   {
958 #if DEBUG_TRANSPORT
959     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
960                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
961                 (unsigned long long) n->in_tracker.
962                 consumption_since_last_update__,
963                 (unsigned int) n->in_tracker.available_bytes_per_s__,
964                 (unsigned long long) ret.rel_value);
965 #endif
966     GNUNET_STATISTICS_update (GST_stats,
967                               gettext_noop ("# ms throttling suggested"),
968                               (int64_t) ret.rel_value, GNUNET_NO);
969   }
970   return ret;
971 }
972
973
974 /**
975  * Keep the connection to the given neighbour alive longer,
976  * we received a KEEPALIVE (or equivalent).
977  *
978  * @param neighbour neighbour to keep alive
979  */
980 void
981 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
982 {
983   struct NeighbourMapEntry *n;
984
985   GNUNET_assert (neighbours != NULL);
986
987   n = lookup_neighbour (neighbour);
988   if (NULL == n)
989   {
990     GNUNET_STATISTICS_update (GST_stats,
991                               gettext_noop
992                               ("# KEEPALIVE messages discarded (not connected)"),
993                               1, GNUNET_NO);
994     return;
995   }
996   GNUNET_SCHEDULER_cancel (n->timeout_task);
997   n->timeout_task =
998       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
999                                     &neighbour_timeout_task, n);
1000 }
1001
1002
1003 /**
1004  * Change the incoming quota for the given peer.
1005  *
1006  * @param neighbour identity of peer to change qutoa for
1007  * @param quota new quota
1008  */
1009 void
1010 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1011                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1012 {
1013   struct NeighbourMapEntry *n;
1014
1015   GNUNET_assert (neighbours != NULL);
1016
1017   n = lookup_neighbour (neighbour);
1018   if (n == NULL)
1019   {
1020     GNUNET_STATISTICS_update (GST_stats,
1021                               gettext_noop
1022                               ("# SET QUOTA messages ignored (no such peer)"),
1023                               1, GNUNET_NO);
1024     return;
1025   }
1026   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1027   if (0 != ntohl (quota.value__))
1028     return;
1029 #if DEBUG_TRANSPORT
1030   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1031               GNUNET_i2s (&n->id), "SET_QUOTA");
1032 #endif
1033   if (GNUNET_YES == n->is_connected)
1034     GNUNET_STATISTICS_update (GST_stats,
1035                               gettext_noop ("# disconnects due to quota of 0"), 1,
1036                               GNUNET_NO);
1037   disconnect_neighbour (n);
1038 }
1039
1040
1041 /**
1042  * Closure for the neighbours_iterate function.
1043  */
1044 struct IteratorContext
1045 {
1046   /**
1047    * Function to call on each connected neighbour.
1048    */
1049   GST_NeighbourIterator cb;
1050
1051   /**
1052    * Closure for 'cb'.
1053    */
1054   void *cb_cls;
1055 };
1056
1057
1058 /**
1059  * Call the callback from the closure for each connected neighbour.
1060  *
1061  * @param cls the 'struct IteratorContext'
1062  * @param key the hash of the public key of the neighbour
1063  * @param value the 'struct NeighbourMapEntry'
1064  * @return GNUNET_OK (continue to iterate)
1065  */
1066 static int
1067 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1068 {
1069   struct IteratorContext *ic = cls;
1070   struct NeighbourMapEntry *n = value;
1071
1072   if (GNUNET_YES != n->is_connected)
1073     return GNUNET_OK;
1074
1075   GNUNET_assert (n->ats_count > 0);
1076   ic->cb (ic->cb_cls, &n->id, n->ats, n->ats_count, n->plugin_name, n->addr, n->addrlen);
1077   return GNUNET_OK;
1078 }
1079
1080
1081 /**
1082  * Iterate over all connected neighbours.
1083  *
1084  * @param cb function to call
1085  * @param cb_cls closure for cb
1086  */
1087 void
1088 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1089 {
1090   struct IteratorContext ic;
1091
1092   GNUNET_assert (neighbours != NULL);
1093
1094   ic.cb = cb;
1095   ic.cb_cls = cb_cls;
1096   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1097 }
1098
1099
1100 /**
1101  * If we have an active connection to the given target, it must be shutdown.
1102  *
1103  * @param target peer to disconnect from
1104  */
1105 void
1106 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1107 {
1108   struct NeighbourMapEntry *n;
1109   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1110   struct SessionDisconnectMessage disconnect_msg;
1111
1112   GNUNET_assert (neighbours != NULL);
1113
1114   n = lookup_neighbour (target);
1115   if (NULL == n)
1116     return;                     /* not active */
1117   if (GNUNET_YES == n->is_connected)
1118   {
1119     /* we're actually connected, send DISCONNECT message */
1120     disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
1121     disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1122     disconnect_msg.reserved = htonl (0);
1123     disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1124                                          sizeof (struct GNUNET_TIME_AbsoluteNBO));
1125     disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
1126     disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1127     GNUNET_assert (GNUNET_OK ==
1128                    GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
1129                                            &disconnect_msg.purpose,
1130                                            &disconnect_msg.signature));
1131     papi = GST_plugins_find (n->plugin_name);
1132     if (papi != NULL)
1133       papi->send (papi->cls, target, (const void *) &disconnect_msg,
1134                   sizeof (struct GNUNET_MessageHeader),
1135                   UINT32_MAX /* priority */ ,
1136                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
1137                   GNUNET_YES, NULL, NULL);
1138     GNUNET_STATISTICS_update (GST_stats,
1139                               gettext_noop ("# peers disconnected due to external request"), 1,
1140                               GNUNET_NO);
1141   }
1142   disconnect_neighbour (n);
1143 }
1144
1145
1146 /* end of file gnunet-service-transport_neighbours.c */