fix disconnect message transmission, send keepalives
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48 /**
49  * How often do we send KEEPALIVE messages to each of our neighbours?
50  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
51  * send 3 keepalives in each interval, so 3 messages would need to be
52  * lost in a row for a disconnect).
53  */
54 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
55
56
57 /**
58  * Entry in neighbours.
59  */
60 struct NeighbourMapEntry;
61
62 /**
63  * Message a peer sends to another to indicate its
64  * preference for communicating via a particular
65  * session (and the desire to establish a real
66  * connection).
67  */
68 struct SessionConnectMessage
69 {
70   /**
71    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
72    */
73   struct GNUNET_MessageHeader header;
74
75   /**
76    * Always zero.
77    */
78   uint32_t reserved GNUNET_PACKED;
79
80   /**
81    * Absolute time at the sender.  Only the most recent connect
82    * message implies which session is preferred by the sender.
83    */
84   struct GNUNET_TIME_AbsoluteNBO timestamp;
85
86 };
87
88
89 struct SessionDisconnectMessage
90 {
91   /**
92    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
93    */
94   struct GNUNET_MessageHeader header;
95
96   /**
97    * Always zero.
98    */
99   uint32_t reserved GNUNET_PACKED;
100
101   /**
102    * Purpose of the signature.  Extends over the timestamp.
103    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
104    */
105   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
106
107   /**
108    * Absolute time at the sender.  Only the most recent connect
109    * message implies which session is preferred by the sender.
110    */
111   struct GNUNET_TIME_AbsoluteNBO timestamp;
112
113   /**
114    * Signature of the peer that sends us the disconnect.  Only
115    * valid if the timestamp is AFTER the timestamp from the
116    * corresponding 'CONNECT' message.
117    */
118   struct GNUNET_CRYPTO_RsaSignature signature;
119 };
120
121
122 /**
123  * For each neighbour we keep a list of messages
124  * that we still want to transmit to the neighbour.
125  */
126 struct MessageQueue
127 {
128
129   /**
130    * This is a doubly linked list.
131    */
132   struct MessageQueue *next;
133
134   /**
135    * This is a doubly linked list.
136    */
137   struct MessageQueue *prev;
138
139   /**
140    * Once this message is actively being transmitted, which
141    * neighbour is it associated with?
142    */
143   struct NeighbourMapEntry *n;
144
145   /**
146    * Function to call once we're done.
147    */
148   GST_NeighbourSendContinuation cont;
149
150   /**
151    * Closure for 'cont'
152    */
153   void *cont_cls;
154
155   /**
156    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
157    * stuck together in memory.  Allocated at the end of this struct.
158    */
159   const char *message_buf;
160
161   /**
162    * Size of the message buf
163    */
164   size_t message_buf_size;
165
166   /**
167    * At what time should we fail?
168    */
169   struct GNUNET_TIME_Absolute timeout;
170
171 };
172
173
174 /**
175  * Entry in neighbours.
176  */
177 struct NeighbourMapEntry
178 {
179
180   /**
181    * Head of list of messages we would like to send to this peer;
182    * must contain at most one message per client.
183    */
184   struct MessageQueue *messages_head;
185
186   /**
187    * Tail of list of messages we would like to send to this peer; must
188    * contain at most one message per client.
189    */
190   struct MessageQueue *messages_tail;
191
192   /**
193    * Context for address suggestion.
194    * NULL after we are connected.
195    */
196   struct GNUNET_ATS_SuggestionContext *asc;
197
198   /**
199    * Performance data for the peer.
200    */
201   struct GNUNET_TRANSPORT_ATS_Information *ats;
202
203   /**
204    * Are we currently trying to send a message? If so, which one?
205    */
206   struct MessageQueue *is_active;
207
208   /**
209    * Active session for communicating with the peer.
210    */
211   struct Session *session;
212
213   /**
214    * Name of the plugin we currently use.
215    */
216   char *plugin_name;
217
218   /**
219    * Address used for communicating with the peer, NULL for inbound connections.
220    */
221   void *addr;
222
223   /**
224    * Number of bytes in 'addr'.
225    */
226   size_t addrlen;
227
228   /**
229    * Identity of this neighbour.
230    */
231   struct GNUNET_PeerIdentity id;
232
233   /**
234    * ID of task scheduled to run when this peer is about to
235    * time out (will free resources associated with the peer).
236    */
237   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
238
239   /**
240    * ID of task scheduled to send keepalives.
241    */
242   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
243
244   /**
245    * ID of task scheduled to run when we should try transmitting
246    * the head of the message queue.
247    */
248   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
249
250   /**
251    * Tracker for inbound bandwidth.
252    */
253   struct GNUNET_BANDWIDTH_Tracker in_tracker;
254
255   /**
256    * How often has the other peer (recently) violated the inbound
257    * traffic limit?  Incremented by 10 per violation, decremented by 1
258    * per non-violation (for each time interval).
259    */
260   unsigned int quota_violation_count;
261
262   /**
263    * Number of values in 'ats' array.
264    */
265   unsigned int ats_count;
266
267   /**
268    * Are we already in the process of disconnecting this neighbour?
269    */
270   int in_disconnect;
271
272   /**
273    * Do we currently consider this neighbour connected? (as far as
274    * the connect/disconnect callbacks are concerned)?
275    */
276   int is_connected;
277
278 };
279
280
281 /**
282  * All known neighbours and their HELLOs.
283  */
284 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
285
286 /**
287  * Closure for connect_notify_cb and disconnect_notify_cb
288  */
289 static void *callback_cls;
290
291 /**
292  * Function to call when we connected to a neighbour.
293  */
294 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
295
296 /**
297  * Function to call when we disconnected from a neighbour.
298  */
299 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
300
301 /**
302  * counter for connected neighbours
303  */
304 static int neighbours_connected;
305
306 /**
307  * Lookup a neighbour entry in the neighbours hash map.
308  *
309  * @param pid identity of the peer to look up
310  * @return the entry, NULL if there is no existing record
311  */
312 static struct NeighbourMapEntry *
313 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
314 {
315   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
316 }
317
318
319 /**
320  * Task invoked to start a transmission to another peer.
321  *
322  * @param cls the 'struct NeighbourMapEntry'
323  * @param tc scheduler context
324  */
325 static void
326 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
327
328
329 /**
330  * We're done with our transmission attempt, continue processing.
331  *
332  * @param cls the 'struct MessageQueue' of the message
333  * @param receiver intended receiver
334  * @param success whether it worked or not
335  */
336 static void
337 transmit_send_continuation (void *cls,
338                             const struct GNUNET_PeerIdentity *receiver,
339                             int success)
340 {
341   struct MessageQueue *mq;
342   struct NeighbourMapEntry *n;
343
344   mq = cls;
345   n = mq->n;
346   if (NULL != n)
347   {
348     GNUNET_assert (n->is_active == mq);
349     n->is_active = NULL;
350     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
351     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
352   }
353   if (NULL != mq->cont)
354     mq->cont (mq->cont_cls, success);
355   GNUNET_free (mq);
356 }
357
358
359 /**
360  * Check the ready list for the given neighbour and if a plugin is
361  * ready for transmission (and if we have a message), do so!
362  *
363  * @param n target peer for which to transmit
364  */
365 static void
366 try_transmission_to_peer (struct NeighbourMapEntry *n)
367 {
368   struct MessageQueue *mq;
369   struct GNUNET_TIME_Relative timeout;
370   ssize_t ret;
371   struct GNUNET_TRANSPORT_PluginFunctions *papi;
372
373   if (n->is_active != NULL)
374     return;                     /* transmission already pending */
375   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
376     return;                     /* currently waiting for bandwidth */
377   while (NULL != (mq = n->messages_head))
378   {
379     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
380     if (timeout.rel_value > 0)
381       break;
382     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
383     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
384   }
385   if (NULL == mq)
386     return;                     /* no more messages */
387
388   papi = GST_plugins_find (n->plugin_name);
389   if (papi == NULL)
390   {
391     GNUNET_break (0);
392     return;
393   }
394   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
395   n->is_active = mq;
396   mq->n = n;
397
398   if  (((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0)))
399   {
400     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No address peer for peer `%s'\n",
401                 GNUNET_i2s (&n->id));
402     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
403     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
404     return;
405   }
406
407   ret =
408       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
409                   0 /* priority -- remove from plugin API? */ ,
410                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
411                   &transmit_send_continuation, mq);
412   if (ret == -1)
413   {
414     /* failure, but 'send' would not call continuation in this case,
415      * so we need to do it here! */
416     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
417     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
418   }
419 }
420
421
422 /**
423  * Task invoked to start a transmission to another peer.
424  *
425  * @param cls the 'struct NeighbourMapEntry'
426  * @param tc scheduler context
427  */
428 static void
429 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
430 {
431   struct NeighbourMapEntry *n = cls;
432
433   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
434   try_transmission_to_peer (n);
435 }
436
437
438 /**
439  * Initialize the neighbours subsystem.
440  *
441  * @param cls closure for callbacks
442  * @param connect_cb function to call if we connect to a peer
443  * @param disconnect_cb function to call if we disconnect from a peer
444  */
445 void
446 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
447                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
448 {
449   callback_cls = cls;
450   connect_notify_cb = connect_cb;
451   disconnect_notify_cb = disconnect_cb;
452   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
453 }
454
455
456 /**
457  * Disconnect from the given neighbour, clean up the record.
458  *
459  * @param n neighbour to disconnect from
460  */
461 static void
462 disconnect_neighbour (struct NeighbourMapEntry *n)
463 {
464   struct MessageQueue *mq;
465
466   if (GNUNET_YES == n->in_disconnect)
467     return;
468   n->in_disconnect = GNUNET_YES;
469   while (NULL != (mq = n->messages_head))
470   {
471     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
472     mq->cont (mq->cont_cls, GNUNET_SYSERR);
473     GNUNET_free (mq);
474   }
475   if (NULL != n->is_active)
476   {
477     n->is_active->n = NULL;
478     n->is_active = NULL;
479   }
480   if (GNUNET_YES == n->is_connected)
481   {
482     n->is_connected = GNUNET_NO;
483     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
484     GNUNET_SCHEDULER_cancel (n->keepalive_task);
485     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;  
486     GNUNET_assert (neighbours_connected > 0);
487     neighbours_connected--;
488     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
489                               GNUNET_NO);
490     disconnect_notify_cb (callback_cls, &n->id);
491   }
492   GNUNET_assert (GNUNET_YES ==
493                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
494                                                        &n->id.hashPubKey, n));
495   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
496   {
497     GNUNET_SCHEDULER_cancel (n->timeout_task);
498     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
499   }
500   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
501   {
502     GNUNET_SCHEDULER_cancel (n->transmission_task);
503     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
504   }
505   if (NULL != n->asc)
506   {
507     GNUNET_ATS_suggest_address_cancel (n->asc);
508     n->asc = NULL;
509   }
510   GNUNET_array_grow (n->ats, n->ats_count, 0);
511   if (NULL != n->plugin_name)
512   {
513     GNUNET_free (n->plugin_name);
514     n->plugin_name = NULL;
515   }
516   if (NULL != n->addr)
517   {
518     GNUNET_free (n->addr);
519     n->addr = NULL;
520     n->addrlen = 0;
521   }
522   n->session = NULL;
523   GNUNET_free (n);
524 }
525
526
527 /**
528  * Peer has been idle for too long. Disconnect.
529  *
530  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
531  * @param tc scheduler context
532  */
533 static void
534 neighbour_timeout_task (void *cls,
535                         const struct GNUNET_SCHEDULER_TaskContext *tc)
536 {
537   struct NeighbourMapEntry *n = cls;
538
539   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
540   if (GNUNET_YES == n->is_connected)
541     GNUNET_STATISTICS_update (GST_stats,
542                               gettext_noop ("# peers disconnected due to timeout"), 1,
543                               GNUNET_NO);
544   disconnect_neighbour (n);
545 }
546
547
548 /**
549  * Send another keepalive message.
550  *
551  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
552  * @param tc scheduler context
553  */
554 static void
555 neighbour_keepalive_task (void *cls,
556                           const struct GNUNET_SCHEDULER_TaskContext *tc)
557 {
558   struct NeighbourMapEntry *n = cls;
559   struct GNUNET_MessageHeader m;
560   struct GNUNET_TRANSPORT_PluginFunctions *papi;
561
562   n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;
563   GNUNET_assert (GNUNET_YES == n->is_connected);
564   GNUNET_STATISTICS_update (GST_stats,
565                             gettext_noop ("# keepalives sent"), 1,
566                             GNUNET_NO);
567   m.size = htons (sizeof (struct GNUNET_MessageHeader));
568   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
569   papi = GST_plugins_find (n->plugin_name);
570   if (papi != NULL)
571     papi->send (papi->cls, 
572                 &n->id, (const void *) &m,
573                 sizeof (m),
574                 UINT32_MAX /* priority */ ,
575                 GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
576                 GNUNET_YES, NULL, NULL);
577   n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
578                                                     &neighbour_keepalive_task,
579                                                     n);
580 }
581
582
583 /**
584  * Disconnect from the given neighbour.
585  *
586  * @param cls unused
587  * @param key hash of neighbour's public key (not used)
588  * @param value the 'struct NeighbourMapEntry' of the neighbour
589  */
590 static int
591 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
592 {
593   struct NeighbourMapEntry *n = value;
594
595 #if DEBUG_TRANSPORT
596   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
597               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
598 #endif
599   if (GNUNET_YES == n->is_connected)
600     GNUNET_STATISTICS_update (GST_stats,
601                               gettext_noop ("# peers disconnected due to global disconnect"), 1,
602                               GNUNET_NO);
603   disconnect_neighbour (n);
604   return GNUNET_OK;
605 }
606
607
608 /**
609  * Cleanup the neighbours subsystem.
610  */
611 void
612 GST_neighbours_stop ()
613 {
614   GNUNET_assert (neighbours != NULL);
615
616   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
617                                          NULL);
618   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
619   GNUNET_assert (neighbours_connected == 0);
620   neighbours = NULL;
621   callback_cls = NULL;
622   connect_notify_cb = NULL;
623   disconnect_notify_cb = NULL;
624 }
625
626
627 /**
628  * For an existing neighbour record, set the active connection to
629  * the given address.
630  *
631  * @param peer identity of the peer to switch the address for
632  * @param plugin_name name of transport that delivered the PONG
633  * @param address address of the other peer, NULL if other peer
634  *                       connected to us
635  * @param address_len number of bytes in address
636  * @param session session to use (or NULL)
637  * @param ats performance data
638  * @param ats_count number of entries in ats (excluding 0-termination)
639  */
640 void
641 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
642                                   const char *plugin_name, const void *address,
643                                   size_t address_len, struct Session *session,
644                                   const struct GNUNET_TRANSPORT_ATS_Information
645                                   *ats, uint32_t ats_count)
646 {
647   struct NeighbourMapEntry *n;
648   struct SessionConnectMessage connect_msg;
649
650   GNUNET_assert (neighbours != NULL);
651
652   n = lookup_neighbour (peer);
653   if (NULL == n)
654   {
655     GNUNET_break (0);
656     return;
657   }
658
659 #if DEBUG_TRANSPORT
660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
661               "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
662               GNUNET_i2s (peer), plugin_name,
663               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
664                                                                   address,
665                                                                   address_len),
666               session);
667 #endif
668
669   GNUNET_free_non_null (n->addr);
670   n->addr = GNUNET_malloc (address_len);
671   memcpy (n->addr, address, address_len);
672   n->addrlen = address_len;
673   n->session = session;
674   GNUNET_array_grow (n->ats, n->ats_count, ats_count);
675   memcpy (n->ats, ats,
676           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
677   GNUNET_free_non_null (n->plugin_name);
678   n->plugin_name = GNUNET_strdup (plugin_name);
679   GNUNET_SCHEDULER_cancel (n->timeout_task);
680   n->timeout_task =
681       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
682                                     &neighbour_timeout_task, n);
683   connect_msg.header.size = htons (sizeof (struct SessionConnectMessage));
684   connect_msg.header.type =
685       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
686   connect_msg.reserved = htonl (0);
687   connect_msg.timestamp =
688       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
689   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
690                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
691 }
692
693
694 /**
695  * Try to connect to the target peer using the given address
696  *
697  * @param cls the 'struct NeighbourMapEntry' of the target
698  * @param target identity of the target peer
699  * @param plugin_name name of the plugin
700  * @param plugin_address binary address
701  * @param plugin_address_len length of address
702  * @param session session to use
703  * @param bandwidth available bandwidth
704  * @param ats performance data for the address (as far as known)
705  * @param ats_count number of performance records in 'ats'
706  */
707 static void
708 try_connect_using_address (void *cls, const struct GNUNET_PeerIdentity *target,
709                            const char *plugin_name, const void *plugin_address,
710                            size_t plugin_address_len, struct Session *session,
711                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
712                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
713                            uint32_t ats_count)
714 {
715   struct NeighbourMapEntry *n = cls;
716   int was_connected;
717
718   n->asc = NULL;
719   was_connected = n->is_connected;
720   n->is_connected = GNUNET_YES;
721   GST_neighbours_switch_to_address (target, plugin_name, plugin_address,
722                                     plugin_address_len, session, ats,
723                                     ats_count);
724   if (GNUNET_YES == was_connected)
725     return;
726   n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task,
727                                                 n);
728   neighbours_connected++;
729   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
730                             GNUNET_NO);
731   connect_notify_cb (callback_cls, target, n->ats, n->ats_count);
732 }
733
734
735 /**
736  * Try to create a connection to the given target (eventually).
737  *
738  * @param target peer to try to connect to
739  */
740 void
741 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
742 {
743   struct NeighbourMapEntry *n;
744
745   GNUNET_assert (neighbours != NULL);
746
747 #if DEBUG_TRANSPORT
748   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
749               GNUNET_i2s (target));
750 #endif
751
752   GNUNET_assert (0 !=
753                  memcmp (target, &GST_my_identity,
754                          sizeof (struct GNUNET_PeerIdentity)));
755   n = lookup_neighbour (target);
756   if ((NULL != n) && (GNUNET_YES == n->is_connected))
757     return;                     /* already connected */
758   if (n == NULL)
759   {
760 #if DEBUG_TRANSPORT
761     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762                 "Unknown peer `%s', creating new neighbour\n",
763                 GNUNET_i2s (target));
764 #endif
765     n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
766     n->id = *target;
767     GNUNET_array_grow (n->ats, n->ats_count, 1);
768     n->ats[0].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);;
769     n->ats[0].value = htonl (0);
770     GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
771                                    GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
772                                    MAX_BANDWIDTH_CARRY_S);
773     n->timeout_task =
774         GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
775                                       &neighbour_timeout_task, n);
776     GNUNET_assert (GNUNET_OK ==
777                    GNUNET_CONTAINER_multihashmap_put (neighbours,
778                                                       &n->id.hashPubKey, n,
779                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
780   }
781   if (n->asc != NULL)
782     return;                     /* already trying */
783 #if DEBUG_TRANSPORT
784   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
785               "Asking ATS for suggested address to connect to peer `%s'\n",
786               GNUNET_i2s (target));
787 #endif
788   n->asc =
789       GNUNET_ATS_suggest_address (GST_ats, target, &try_connect_using_address,
790                                   n);
791 }
792
793
794 /**
795  * Test if we're connected to the given peer.
796  *
797  * @param target peer to test
798  * @return GNUNET_YES if we are connected, GNUNET_NO if not
799  */
800 int
801 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
802 {
803   struct NeighbourMapEntry *n;
804
805   GNUNET_assert (neighbours != NULL);
806
807   n = lookup_neighbour (target);
808   if ((NULL == n) || (n->is_connected != GNUNET_YES))
809     return GNUNET_NO;           /* not connected */
810   return GNUNET_YES;
811 }
812
813
814 /**
815  * A session was terminated. Take note.
816  *
817  * @param peer identity of the peer where the session died
818  * @param session session that is gone
819  */
820 void
821 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
822                                    struct Session *session)
823 {
824   struct NeighbourMapEntry *n;
825
826   GNUNET_assert (neighbours != NULL);
827
828 #if DEBUG_TRANSPORT
829   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
830               "Session %X to peer `%s' ended \n",
831               session, GNUNET_i2s (peer));
832 #endif
833
834   n = lookup_neighbour (peer);
835   if (NULL == n)
836     return;
837   if (session != n->session)
838     return;                     /* doesn't affect us */
839
840   n->session = NULL;
841   GNUNET_free (n->addr);
842   n->addr = NULL;
843   n->addrlen = 0;
844
845
846   if (GNUNET_YES != n->is_connected)
847     return;                     /* not connected anymore anyway, shouldn't matter */
848
849   //n->is_connected = GNUNET_NO;
850
851   /* fast disconnect unless ATS suggests a new address */
852   GNUNET_SCHEDULER_cancel (n->timeout_task);
853   n->timeout_task =
854       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
855                                     &neighbour_timeout_task, n);
856   /* try QUICKLY to re-establish a connection, reduce timeout! */
857   if (NULL != n->ats)
858   {
859     /* how can this be!? */
860     //GNUNET_break (0);
861     return;
862   }
863   n->asc =
864       GNUNET_ATS_suggest_address (GST_ats, peer, &try_connect_using_address, n);
865 }
866
867
868 /**
869  * Transmit a message to the given target using the active connection.
870  *
871  * @param target destination
872  * @param msg message to send
873  * @param msg_size number of bytes in msg
874  * @param timeout when to fail with timeout
875  * @param cont function to call when done
876  * @param cont_cls closure for 'cont'
877  */
878 void
879 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
880                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
881                      GST_NeighbourSendContinuation cont, void *cont_cls)
882 {
883   struct NeighbourMapEntry *n;
884   struct MessageQueue *mq;
885
886   GNUNET_assert (neighbours != NULL);
887
888   n = lookup_neighbour (target);
889   if ((n == NULL) || (GNUNET_YES != n->is_connected))
890   {
891     GNUNET_STATISTICS_update (GST_stats,
892                               gettext_noop
893                               ("# messages not sent (no such peer or not connected)"),
894                               1, GNUNET_NO);
895 #if DEBUG_TRANSPORT
896     if (n == NULL)
897       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
898                   "Could not send message to peer `%s': unknown neighbor",
899                   GNUNET_i2s (target));
900     else if (GNUNET_YES != n->is_connected)
901       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
902                   "Could not send message to peer `%s': not connected\n",
903                   GNUNET_i2s (target));
904 #endif
905     if (NULL != cont)
906       cont (cont_cls, GNUNET_SYSERR);
907     return;
908   }
909
910   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0))
911   {
912     GNUNET_STATISTICS_update (GST_stats,
913                               gettext_noop
914                               ("# messages not sent (no such peer or not connected)"),
915                               1, GNUNET_NO);
916 #if DEBUG_TRANSPORT
917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
918                   "Could not send message to peer `%s': no address available\n",
919                   GNUNET_i2s (target));
920 #endif
921
922     if (NULL != cont)
923       cont (cont_cls, GNUNET_SYSERR);
924     return;
925   }
926
927
928   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
929   GNUNET_STATISTICS_update (GST_stats,
930                             gettext_noop
931                             ("# bytes in message queue for other peers"),
932                             msg_size, GNUNET_NO);
933   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
934   mq->cont = cont;
935   mq->cont_cls = cont_cls;
936   /* FIXME: this memcpy can be up to 7% of our total runtime! */
937   memcpy (&mq[1], msg, msg_size);
938   mq->message_buf = (const char *) &mq[1];
939   mq->message_buf_size = msg_size;
940   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
941   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
942   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
943       (NULL == n->is_active))
944     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
945 }
946
947
948 /**
949  * We have received a message from the given sender.  How long should
950  * we delay before receiving more?  (Also used to keep the peer marked
951  * as live).
952  *
953  * @param sender sender of the message
954  * @param size size of the message
955  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
956  *                   GNUNET_NO if the neighbour is not connected or violates the quota
957  * @return how long to wait before reading more from this sender
958  */
959 struct GNUNET_TIME_Relative
960 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
961                                         *sender, ssize_t size, int *do_forward)
962 {
963   struct NeighbourMapEntry *n;
964   struct GNUNET_TIME_Relative ret;
965
966   GNUNET_assert (neighbours != NULL);
967
968   n = lookup_neighbour (sender);
969   if (n == NULL)
970   {
971     *do_forward = GNUNET_NO;
972     return GNUNET_TIME_UNIT_ZERO;
973   }
974   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
975   {
976     n->quota_violation_count++;
977 #if DEBUG_TRANSPORT
978     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
979                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
980                 n->in_tracker.available_bytes_per_s__,
981                 n->quota_violation_count);
982 #endif
983     /* Discount 32k per violation */
984     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
985   }
986   else
987   {
988     if (n->quota_violation_count > 0)
989     {
990       /* try to add 32k back */
991       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
992       n->quota_violation_count--;
993     }
994   }
995   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
996   {
997     GNUNET_STATISTICS_update (GST_stats,
998                               gettext_noop
999                               ("# bandwidth quota violations by other peers"),
1000                               1, GNUNET_NO);
1001     *do_forward = GNUNET_NO;
1002     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
1003   }
1004   *do_forward = GNUNET_YES;
1005   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
1006   if (ret.rel_value > 0)
1007   {
1008 #if DEBUG_TRANSPORT
1009     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
1011                 (unsigned long long) n->in_tracker.
1012                 consumption_since_last_update__,
1013                 (unsigned int) n->in_tracker.available_bytes_per_s__,
1014                 (unsigned long long) ret.rel_value);
1015 #endif
1016     GNUNET_STATISTICS_update (GST_stats,
1017                               gettext_noop ("# ms throttling suggested"),
1018                               (int64_t) ret.rel_value, GNUNET_NO);
1019   }
1020   return ret;
1021 }
1022
1023
1024 /**
1025  * Keep the connection to the given neighbour alive longer,
1026  * we received a KEEPALIVE (or equivalent).
1027  *
1028  * @param neighbour neighbour to keep alive
1029  */
1030 void
1031 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1032 {
1033   struct NeighbourMapEntry *n;
1034
1035   GNUNET_assert (neighbours != NULL);
1036
1037   n = lookup_neighbour (neighbour);
1038   if (NULL == n)
1039   {
1040     GNUNET_STATISTICS_update (GST_stats,
1041                               gettext_noop
1042                               ("# KEEPALIVE messages discarded (not connected)"),
1043                               1, GNUNET_NO);
1044     return;
1045   }
1046   GNUNET_SCHEDULER_cancel (n->timeout_task);
1047   n->timeout_task =
1048       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1049                                     &neighbour_timeout_task, n);
1050 }
1051
1052
1053 /**
1054  * Change the incoming quota for the given peer.
1055  *
1056  * @param neighbour identity of peer to change qutoa for
1057  * @param quota new quota
1058  */
1059 void
1060 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1061                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1062 {
1063   struct NeighbourMapEntry *n;
1064
1065   GNUNET_assert (neighbours != NULL);
1066
1067   n = lookup_neighbour (neighbour);
1068   if (n == NULL)
1069   {
1070     GNUNET_STATISTICS_update (GST_stats,
1071                               gettext_noop
1072                               ("# SET QUOTA messages ignored (no such peer)"),
1073                               1, GNUNET_NO);
1074     return;
1075   }
1076   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1077   if (0 != ntohl (quota.value__))
1078     return;
1079 #if DEBUG_TRANSPORT
1080   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1081               GNUNET_i2s (&n->id), "SET_QUOTA");
1082 #endif
1083   if (GNUNET_YES == n->is_connected)
1084     GNUNET_STATISTICS_update (GST_stats,
1085                               gettext_noop ("# disconnects due to quota of 0"), 1,
1086                               GNUNET_NO);
1087   disconnect_neighbour (n);
1088 }
1089
1090
1091 /**
1092  * Closure for the neighbours_iterate function.
1093  */
1094 struct IteratorContext
1095 {
1096   /**
1097    * Function to call on each connected neighbour.
1098    */
1099   GST_NeighbourIterator cb;
1100
1101   /**
1102    * Closure for 'cb'.
1103    */
1104   void *cb_cls;
1105 };
1106
1107
1108 /**
1109  * Call the callback from the closure for each connected neighbour.
1110  *
1111  * @param cls the 'struct IteratorContext'
1112  * @param key the hash of the public key of the neighbour
1113  * @param value the 'struct NeighbourMapEntry'
1114  * @return GNUNET_OK (continue to iterate)
1115  */
1116 static int
1117 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1118 {
1119   struct IteratorContext *ic = cls;
1120   struct NeighbourMapEntry *n = value;
1121
1122   if (GNUNET_YES != n->is_connected)
1123     return GNUNET_OK;
1124
1125   GNUNET_assert (n->ats_count > 0);
1126   ic->cb (ic->cb_cls, &n->id, n->ats, n->ats_count, n->plugin_name, n->addr, n->addrlen);
1127   return GNUNET_OK;
1128 }
1129
1130
1131 /**
1132  * Iterate over all connected neighbours.
1133  *
1134  * @param cb function to call
1135  * @param cb_cls closure for cb
1136  */
1137 void
1138 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1139 {
1140   struct IteratorContext ic;
1141
1142   GNUNET_assert (neighbours != NULL);
1143
1144   ic.cb = cb;
1145   ic.cb_cls = cb_cls;
1146   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1147 }
1148
1149
1150 /**
1151  * If we have an active connection to the given target, it must be shutdown.
1152  *
1153  * @param target peer to disconnect from
1154  */
1155 void
1156 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1157 {
1158   struct NeighbourMapEntry *n;
1159   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1160   struct SessionDisconnectMessage disconnect_msg;
1161
1162   GNUNET_assert (neighbours != NULL);
1163
1164   n = lookup_neighbour (target);
1165   if (NULL == n)
1166     return;                     /* not active */
1167   if (GNUNET_YES == n->is_connected)
1168   {
1169     /* we're actually connected, send DISCONNECT message */
1170     disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
1171     disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1172     disconnect_msg.reserved = htonl (0);
1173     disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1174                                          sizeof (struct GNUNET_TIME_AbsoluteNBO));
1175     disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
1176     disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1177     GNUNET_assert (GNUNET_OK ==
1178                    GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
1179                                            &disconnect_msg.purpose,
1180                                            &disconnect_msg.signature));
1181     papi = GST_plugins_find (n->plugin_name);
1182     if (papi != NULL)
1183       papi->send (papi->cls, target, (const void *) &disconnect_msg,
1184                   sizeof (disconnect_msg),
1185                   UINT32_MAX /* priority */ ,
1186                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
1187                   GNUNET_YES, NULL, NULL);
1188     GNUNET_STATISTICS_update (GST_stats,
1189                               gettext_noop ("# peers disconnected due to external request"), 1,
1190                               GNUNET_NO);
1191   }
1192   disconnect_neighbour (n);
1193 }
1194
1195
1196 /* end of file gnunet-service-transport_neighbours.c */