Extended ATS and transport service to store session IDs to support inbound sessions
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48
49 /**
50  * Entry in neighbours. 
51  */
52 struct NeighbourMapEntry;
53
54
55 /**
56  * For each neighbour we keep a list of messages
57  * that we still want to transmit to the neighbour.
58  */
59 struct MessageQueue
60 {
61
62   /**
63    * This is a doubly linked list.
64    */
65   struct MessageQueue *next;
66
67   /**
68    * This is a doubly linked list.
69    */
70   struct MessageQueue *prev;
71
72   /**
73    * Once this message is actively being transmitted, which
74    * neighbour is it associated with?
75    */
76   struct NeighbourMapEntry *n;
77
78   /**
79    * Function to call once we're done.
80    */
81   GST_NeighbourSendContinuation cont;
82
83   /**
84    * Closure for 'cont'
85    */
86   void *cont_cls;
87
88   /**
89    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
90    * stuck together in memory.  Allocated at the end of this struct.
91    */
92   const char *message_buf;
93
94   /**
95    * Size of the message buf
96    */
97   size_t message_buf_size;
98
99   /**
100    * At what time should we fail?
101    */
102   struct GNUNET_TIME_Absolute timeout;
103
104 };
105
106
107 /**
108  * Entry in neighbours. 
109  */
110 struct NeighbourMapEntry
111 {
112
113   /**
114    * Head of list of messages we would like to send to this peer;
115    * must contain at most one message per client.
116    */
117   struct MessageQueue *messages_head;
118
119   /**
120    * Tail of list of messages we would like to send to this peer; must
121    * contain at most one message per client.
122    */
123   struct MessageQueue *messages_tail;
124
125   /**
126    * Context for address suggestion.
127    * NULL after we are connected.
128    */
129   struct GNUNET_ATS_SuggestionContext *asc;
130
131   /**
132    * Performance data for the peer.
133    */
134   struct GNUNET_TRANSPORT_ATS_Information *ats;
135
136   /**
137    * Are we currently trying to send a message? If so, which one?
138    */
139   struct MessageQueue *is_active;
140
141   /**
142    * Active session for communicating with the peer.
143    */
144   struct Session *session;
145
146   /**
147    * Name of the plugin we currently use.
148    */
149   char *plugin_name;
150
151   /**
152    * Address used for communicating with the peer, NULL for inbound connections.
153    */
154   void *addr;
155
156   /**
157    * Number of bytes in 'addr'.
158    */
159   size_t addrlen;
160
161   /**
162    * Identity of this neighbour.
163    */
164   struct GNUNET_PeerIdentity id;
165
166   /**
167    * ID of task scheduled to run when this peer is about to
168    * time out (will free resources associated with the peer).
169    */
170   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
171
172   /**
173    * ID of task scheduled to run when we should try transmitting
174    * the head of the message queue.  
175    */
176   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
177
178   /**
179    * Tracker for inbound bandwidth.
180    */
181   struct GNUNET_BANDWIDTH_Tracker in_tracker;
182
183   /**
184    * How often has the other peer (recently) violated the inbound
185    * traffic limit?  Incremented by 10 per violation, decremented by 1
186    * per non-violation (for each time interval).
187    */
188   unsigned int quota_violation_count;
189
190   /**
191    * Number of values in 'ats' array.
192    */
193   unsigned int ats_count;
194
195   /**
196    * Are we already in the process of disconnecting this neighbour?
197    */
198   int in_disconnect;
199
200   /**
201    * Do we currently consider this neighbour connected? (as far as
202    * the connect/disconnect callbacks are concerned)?
203    */
204   int is_connected;
205
206 };
207
208
209 /**
210  * All known neighbours and their HELLOs.
211  */
212 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
213
214 /**
215  * Closure for connect_notify_cb and disconnect_notify_cb
216  */
217 static void *callback_cls;
218
219 /**
220  * Function to call when we connected to a neighbour.
221  */
222 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
223
224 /**
225  * Function to call when we disconnected from a neighbour.
226  */
227 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
228
229
230 /**
231  * Lookup a neighbour entry in the neighbours hash map.
232  *
233  * @param pid identity of the peer to look up
234  * @return the entry, NULL if there is no existing record
235  */
236 static struct NeighbourMapEntry *
237 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
238 {
239   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
240 }
241
242
243 /**
244  * Task invoked to start a transmission to another peer.
245  *
246  * @param cls the 'struct NeighbourMapEntry'
247  * @param tc scheduler context
248  */
249 static void
250 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
251
252
253 /**
254  * We're done with our transmission attempt, continue processing.
255  *
256  * @param cls the 'struct MessageQueue' of the message
257  * @param receiver intended receiver
258  * @param success whether it worked or not
259  */
260 static void
261 transmit_send_continuation (void *cls,
262                             const struct GNUNET_PeerIdentity *receiver,
263                             int success)
264 {
265   struct MessageQueue *mq;
266   struct NeighbourMapEntry *n;
267
268   mq = cls;
269   n = mq->n;
270   if (NULL != n)
271   {
272     GNUNET_assert (n->is_active == mq);
273     n->is_active = NULL;
274     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
275     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
276   }
277   if (NULL != mq->cont)
278     mq->cont (mq->cont_cls, success);
279   GNUNET_free (mq);
280 }
281
282
283 /**
284  * Check the ready list for the given neighbour and if a plugin is
285  * ready for transmission (and if we have a message), do so!
286  *
287  * @param neighbour target peer for which to transmit
288  */
289 static void
290 try_transmission_to_peer (struct NeighbourMapEntry *n)
291 {
292   struct MessageQueue *mq;
293   struct GNUNET_TIME_Relative timeout;
294   ssize_t ret;
295   struct GNUNET_TRANSPORT_PluginFunctions *papi;
296
297   if (n->is_active != NULL)
298     return;                     /* transmission already pending */
299   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
300     return;                     /* currently waiting for bandwidth */
301   mq = n->messages_head;
302   while (NULL != (mq = n->messages_head))
303   {
304     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
305     if (timeout.rel_value > 0)
306       break;
307     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
308   }
309   if (NULL == mq)
310     return;                     /* no more messages */
311
312   papi = GST_plugins_find (n->plugin_name);
313   if (papi == NULL)
314   {
315     GNUNET_break (0);
316     return;
317   }
318   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
319   n->is_active = mq;
320   mq->n = n;
321
322
323   ret =
324       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
325                   0 /* priority -- remove from plugin API? */ ,
326                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
327                   &transmit_send_continuation, mq);
328   if (ret == -1)
329   {
330     /* failure, but 'send' would not call continuation in this case,
331      * so we need to do it here! */
332     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
333     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
334   }
335 }
336
337
338 /**
339  * Task invoked to start a transmission to another peer.
340  *
341  * @param cls the 'struct NeighbourMapEntry'
342  * @param tc scheduler context
343  */
344 static void
345 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
346 {
347   struct NeighbourMapEntry *n = cls;
348
349   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
350   try_transmission_to_peer (n);
351 }
352
353
354 /**
355  * Initialize the neighbours subsystem.
356  *
357  * @param cls closure for callbacks
358  * @param connect_cb function to call if we connect to a peer
359  * @param disconnect_cb function to call if we disconnect from a peer
360  */
361 void
362 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
363                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
364 {
365   callback_cls = cls;
366   connect_notify_cb = connect_cb;
367   disconnect_notify_cb = disconnect_cb;
368   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
369 }
370
371
372 /**
373  * Disconnect from the given neighbour, clean up the record.
374  *
375  * @param n neighbour to disconnect from
376  */
377 static void
378 disconnect_neighbour (struct NeighbourMapEntry *n)
379 {
380   struct MessageQueue *mq;
381
382   if (GNUNET_YES == n->in_disconnect)
383     return;
384   n->in_disconnect = GNUNET_YES;
385   while (NULL != (mq = n->messages_head))
386   {
387     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
388     mq->cont (mq->cont_cls, GNUNET_SYSERR);
389     GNUNET_free (mq);
390   }
391   if (NULL != n->is_active)
392   {
393     n->is_active->n = NULL;
394     n->is_active = NULL;
395   }
396   if (GNUNET_YES == n->is_connected)
397   {
398     n->is_connected = GNUNET_NO;
399     disconnect_notify_cb (callback_cls, &n->id);
400   }
401   GNUNET_assert (GNUNET_YES ==
402                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
403                                                        &n->id.hashPubKey, n));
404   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
405   {
406     GNUNET_SCHEDULER_cancel (n->timeout_task);
407     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
408   }
409   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
410   {
411     GNUNET_SCHEDULER_cancel (n->timeout_task);
412     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
413   }
414   if (NULL != n->asc)
415   {
416     GNUNET_ATS_suggest_address_cancel (n->asc);
417     n->asc = NULL;
418   }
419   GNUNET_array_grow (n->ats, n->ats_count, 0);
420   if (NULL != n->plugin_name)
421   {
422     GNUNET_free (n->plugin_name);
423     n->plugin_name = NULL;
424   }
425   if (NULL != n->addr)
426   {
427     GNUNET_free (n->addr);
428     n->addr = NULL;
429     n->addrlen = 0;
430   }
431   n->session = NULL;
432   GNUNET_free (n);
433 }
434
435
436 /**
437  * Peer has been idle for too long. Disconnect.
438  *
439  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
440  * @param tc scheduler context
441  */
442 static void
443 neighbour_timeout_task (void *cls,
444                         const struct GNUNET_SCHEDULER_TaskContext *tc)
445 {
446   struct NeighbourMapEntry *n = cls;
447
448   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
449   disconnect_neighbour (n);
450 }
451
452
453 /**
454  * Disconnect from the given neighbour.
455  *
456  * @param cls unused
457  * @param key hash of neighbour's public key (not used)
458  * @param value the 'struct NeighbourMapEntry' of the neighbour
459  */
460 static int
461 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
462 {
463   struct NeighbourMapEntry *n = value;
464
465 #if DEBUG_TRANSPORT
466   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
467               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
468 #endif
469   disconnect_neighbour (n);
470   return GNUNET_OK;
471 }
472
473
474 /**
475  * Cleanup the neighbours subsystem.
476  */
477 void
478 GST_neighbours_stop ()
479 {
480   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
481                                          NULL);
482   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
483   neighbours = NULL;
484   callback_cls = NULL;
485   connect_notify_cb = NULL;
486   disconnect_notify_cb = NULL;
487 }
488
489
490 /**
491  * For an existing neighbour record, set the active connection to
492  * the given address.
493  *
494  * @param peer identity of the peer to switch the address for
495  * @param plugin_name name of transport that delivered the PONG
496  * @param address address of the other peer, NULL if other peer
497  *                       connected to us
498  * @param address_len number of bytes in address
499  * @param session session to use (or NULL)
500  * @param ats performance data
501  * @param ats_count number of entries in ats (excluding 0-termination)
502  */
503 void
504 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
505                                   const char *plugin_name, const void *address,
506                                   size_t address_len, struct Session *session,
507                                   const struct GNUNET_TRANSPORT_ATS_Information
508                                   *ats, uint32_t ats_count)
509 {
510   struct NeighbourMapEntry *n;
511   struct GNUNET_MessageHeader connect_msg;
512
513   n = lookup_neighbour (peer);
514   if (NULL == n)
515   {
516     GNUNET_break (0);
517     return;
518   }
519
520 #if DEBUG_TRANSPORT
521   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
522               GNUNET_i2s (peer),
523               plugin_name,
524               (address_len == 0) ? "<inbound>" :
525               GST_plugins_a2s(plugin_name,address,address_len),
526               session);
527 #endif
528
529   GNUNET_free_non_null (n->addr);
530   n->addr = GNUNET_malloc (address_len);
531   memcpy (n->addr, address, address_len);
532   n->addrlen = address_len;
533   n->session = session;
534   GNUNET_array_grow (n->ats, n->ats_count, ats_count);
535   memcpy (n->ats, ats,
536           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
537   GNUNET_free_non_null (n->plugin_name);
538   n->plugin_name = GNUNET_strdup (plugin_name);
539   GNUNET_SCHEDULER_cancel (n->timeout_task);
540   n->timeout_task =
541       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
542                                     &neighbour_timeout_task, n);
543   connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
544   connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
545   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
546                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
547 }
548
549
550 /**
551  * Try to connect to the target peer using the given address
552  *
553  * @param cls the 'struct NeighbourMapEntry' of the target
554  * @param target identity of the target peer
555  * @param plugin_name name of the plugin
556  * @param plugin_address binary address
557  * @param plugin_address_len length of address
558  * @param bandwidth available bandwidth
559  * @param ats performance data for the address (as far as known)
560  * @param ats_count number of performance records in 'ats'
561  */
562 static void
563 try_connect_using_address (void *cls, const struct GNUNET_PeerIdentity *target,
564                            const char *plugin_name, const void *plugin_address,
565                            size_t plugin_address_len,
566                            struct Session *session,
567                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
568                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
569                            uint32_t ats_count)
570 {
571   struct NeighbourMapEntry *n = cls;
572
573   n->asc = NULL;
574   GST_neighbours_switch_to_address (target, plugin_name, plugin_address,
575                                     plugin_address_len, session, ats, ats_count);
576   if (GNUNET_YES == n->is_connected)
577     return;
578   n->is_connected = GNUNET_YES;
579   connect_notify_cb (callback_cls, target, n->ats, n->ats_count);
580 }
581
582
583 /**
584  * Try to create a connection to the given target (eventually).
585  *
586  * @param target peer to try to connect to
587  */
588 void
589 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
590 {
591   struct NeighbourMapEntry *n;
592
593   GNUNET_assert (0 !=
594                  memcmp (target, &GST_my_identity,
595                          sizeof (struct GNUNET_PeerIdentity)));
596   n = lookup_neighbour (target);
597   if ((NULL != n) && (GNUNET_YES == n->is_connected))
598     return;                     /* already connected */
599   if (n == NULL)
600   {
601     n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
602     n->id = *target;
603     GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
604                                    GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
605                                    MAX_BANDWIDTH_CARRY_S);
606     n->timeout_task =
607         GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
608                                       &neighbour_timeout_task, n);
609     GNUNET_assert (GNUNET_OK ==
610                    GNUNET_CONTAINER_multihashmap_put (neighbours,
611                                                       &n->id.hashPubKey, n,
612                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
613   }
614   if (n->asc != NULL)
615     return;                     /* already trying */
616   n->asc =
617       GNUNET_ATS_suggest_address (GST_ats, target, &try_connect_using_address,
618                                   n);
619 }
620
621
622 /**
623  * Test if we're connected to the given peer.
624  * 
625  * @param target peer to test
626  * @return GNUNET_YES if we are connected, GNUNET_NO if not
627  */
628 int
629 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
630 {
631   struct NeighbourMapEntry *n;
632
633   n = lookup_neighbour (target);
634   if ((NULL == n) || (n->is_connected != GNUNET_YES))
635     return GNUNET_NO;           /* not connected */
636   return GNUNET_YES;
637 }
638
639
640 /**
641  * A session was terminated. Take note.
642  *
643  * @param peer identity of the peer where the session died
644  * @param session session that is gone
645  */
646 void
647 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
648                                    struct Session *session)
649 {
650   struct NeighbourMapEntry *n;
651
652   n = lookup_neighbour (peer);
653   if (NULL == n)
654     return;
655   if (session != n->session)
656     return;                     /* doesn't affect us */
657   n->session = NULL;
658   if (GNUNET_YES != n->is_connected)
659     return;                     /* not connected anymore anyway, shouldn't matter */
660   /* try QUICKLY to re-establish a connection, reduce timeout! */
661   if (NULL != n->ats)
662   {
663     /* how can this be!? */
664     GNUNET_break (0);
665     return;
666   }
667   GNUNET_SCHEDULER_cancel (n->timeout_task);
668   n->timeout_task =
669       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
670                                     &neighbour_timeout_task, n);
671   n->asc =
672       GNUNET_ATS_suggest_address (GST_ats, peer, &try_connect_using_address, n);
673 }
674
675
676 /**
677  * Transmit a message to the given target using the active connection.
678  *
679  * @param target destination
680  * @param msg message to send
681  * @param msg_size number of bytes in msg
682  * @param timeout when to fail with timeout
683  * @param cont function to call when done
684  * @param cont_cls closure for 'cont'
685  */
686 void
687 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
688                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
689                      GST_NeighbourSendContinuation cont, void *cont_cls)
690 {
691   struct NeighbourMapEntry *n;
692   struct MessageQueue *mq;
693
694   n = lookup_neighbour (target);
695   if ((n == NULL) || (GNUNET_YES != n->is_connected))
696   {
697     GNUNET_STATISTICS_update (GST_stats,
698                               gettext_noop
699                               ("# SET QUOTA messages ignored (no such peer)"),
700                               1, GNUNET_NO);
701 #if DEBUG_TRANSPORT
702     if (n == NULL)
703     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
704                 "Could not send message to peer `%s': unknown neighbor", GNUNET_i2s (target));
705     if (GNUNET_YES != n->is_connected)
706     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707                 "Could not send message to peer `%s': not connected\n", GNUNET_i2s (target));
708 #endif
709     if (NULL != cont)
710       cont (cont_cls, GNUNET_SYSERR);
711     return;
712   }
713   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
714   GNUNET_STATISTICS_update (GST_stats,
715                             gettext_noop
716                             ("# bytes in message queue for other peers"),
717                             msg_size, GNUNET_NO);
718   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
719   mq->cont = cont;
720   mq->cont_cls = cont_cls;
721   /* FIXME: this memcpy can be up to 7% of our total runtime! */
722   memcpy (&mq[1], msg, msg_size);
723   mq->message_buf = (const char *) &mq[1];
724   mq->message_buf_size = msg_size;
725   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
726   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
727   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
728       (NULL == n->is_active))
729     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
730 }
731
732
733 /**
734  * We have received a message from the given sender.  How long should
735  * we delay before receiving more?  (Also used to keep the peer marked
736  * as live).
737  *
738  * @param sender sender of the message
739  * @param size size of the message
740  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
741  *                   GNUNET_NO if the neighbour is not connected or violates the quota
742  * @return how long to wait before reading more from this sender
743  */
744 struct GNUNET_TIME_Relative
745 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
746                                         *sender, ssize_t size, int *do_forward)
747 {
748   struct NeighbourMapEntry *n;
749   struct GNUNET_TIME_Relative ret;
750
751   n = lookup_neighbour (sender);
752   if (n == NULL)
753   {
754     *do_forward = GNUNET_NO;
755     return GNUNET_TIME_UNIT_ZERO;
756   }
757   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
758   {
759     n->quota_violation_count++;
760 #if DEBUG_TRANSPORT
761     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
763                 n->in_tracker.available_bytes_per_s__,
764                 n->quota_violation_count);
765 #endif
766     /* Discount 32k per violation */
767     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
768   }
769   else
770   {
771     if (n->quota_violation_count > 0)
772     {
773       /* try to add 32k back */
774       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
775       n->quota_violation_count--;
776     }
777   }
778   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
779   {
780     GNUNET_STATISTICS_update (GST_stats,
781                               gettext_noop
782                               ("# bandwidth quota violations by other peers"),
783                               1, GNUNET_NO);
784     *do_forward = GNUNET_NO;
785     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
786   }
787   *do_forward = GNUNET_YES;
788   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
789   if (ret.rel_value > 0)
790   {
791 #if DEBUG_TRANSPORT
792     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
794                 (unsigned long long) n->
795                 in_tracker.consumption_since_last_update__,
796                 (unsigned int) n->in_tracker.available_bytes_per_s__,
797                 (unsigned long long) ret.rel_value);
798 #endif
799     GNUNET_STATISTICS_update (GST_stats,
800                               gettext_noop ("# ms throttling suggested"),
801                               (int64_t) ret.rel_value, GNUNET_NO);
802   }
803   return ret;
804 }
805
806
807 /**
808  * Keep the connection to the given neighbour alive longer,
809  * we received a KEEPALIVE (or equivalent).
810  *
811  * @param neighbour neighbour to keep alive
812  */
813 void
814 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
815 {
816   struct NeighbourMapEntry *n;
817
818   n = lookup_neighbour (neighbour);
819   if (NULL == n)
820   {
821     GNUNET_STATISTICS_update (GST_stats,
822                               gettext_noop
823                               ("# KEEPALIVE messages discarded (not connected)"),
824                               1, GNUNET_NO);
825     return;
826   }
827   GNUNET_SCHEDULER_cancel (n->timeout_task);
828   n->timeout_task =
829       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
830                                     &neighbour_timeout_task, n);
831 }
832
833
834 /**
835  * Change the incoming quota for the given peer.
836  *
837  * @param neighbour identity of peer to change qutoa for
838  * @param quota new quota 
839  */
840 void
841 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
842                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
843 {
844   struct NeighbourMapEntry *n;
845
846   n = lookup_neighbour (neighbour);
847   if (n == NULL)
848   {
849     GNUNET_STATISTICS_update (GST_stats,
850                               gettext_noop
851                               ("# SET QUOTA messages ignored (no such peer)"),
852                               1, GNUNET_NO);
853     return;
854   }
855   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
856   if (0 != ntohl (quota.value__))
857     return;
858 #if DEBUG_TRANSPORT
859   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
860               GNUNET_i2s (&n->id), "SET_QUOTA");
861 #endif
862   GNUNET_STATISTICS_update (GST_stats,
863                             gettext_noop ("# disconnects due to quota of 0"), 1,
864                             GNUNET_NO);
865   disconnect_neighbour (n);
866 }
867
868
869 /**
870  * Closure for the neighbours_iterate function.
871  */
872 struct IteratorContext
873 {
874   /**
875    * Function to call on each connected neighbour.
876    */
877   GST_NeighbourIterator cb;
878
879   /**
880    * Closure for 'cb'.
881    */
882   void *cb_cls;
883 };
884
885
886 /**
887  * Call the callback from the closure for each connected neighbour.
888  *
889  * @param cls the 'struct IteratorContext'
890  * @param key the hash of the public key of the neighbour
891  * @param value the 'struct NeighbourMapEntry'
892  * @return GNUNET_OK (continue to iterate)
893  */
894 static int
895 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
896 {
897   struct IteratorContext *ic = cls;
898   struct NeighbourMapEntry *n = value;
899
900   if (GNUNET_YES != n->is_connected)
901     return GNUNET_OK;
902   GNUNET_assert (n->ats_count > 0);
903   ic->cb (ic->cb_cls, &n->id, n->ats, n->ats_count - 1);
904   return GNUNET_OK;
905 }
906
907
908 /**
909  * Iterate over all connected neighbours.
910  *
911  * @param cb function to call 
912  * @param cb_cls closure for cb
913  */
914 void
915 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
916 {
917   struct IteratorContext ic;
918
919   ic.cb = cb;
920   ic.cb_cls = cb_cls;
921   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
922 }
923
924
925 /**
926  * If we have an active connection to the given target, it must be shutdown.
927  *
928  * @param target peer to disconnect from
929  */
930 void
931 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
932 {
933   struct NeighbourMapEntry *n;
934   struct GNUNET_TRANSPORT_PluginFunctions *papi;
935   struct GNUNET_MessageHeader disconnect_msg;
936
937   n = lookup_neighbour (target);
938   if (NULL == n)
939     return;                     /* not active */
940   if (GNUNET_YES == n->is_connected)
941   {
942     /* we're actually connected, send DISCONNECT message */
943     disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
944     disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
945     papi = GST_plugins_find (n->plugin_name);
946     if (papi != NULL)
947       papi->send (papi->cls, target, (const void *) &disconnect_msg,
948                   sizeof (struct GNUNET_MessageHeader),
949                   UINT32_MAX /* priority */ ,
950                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
951                   GNUNET_YES, NULL, NULL);
952   }
953   disconnect_neighbour (n);
954 }
955
956
957 /* end of file gnunet-service-transport_neighbours.c */