process keepalive, simplify PONGs
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48
49 /**
50  * Entry in neighbours. 
51  */
52 struct NeighbourMapEntry;
53
54
55 /**
56  * For each neighbour we keep a list of messages
57  * that we still want to transmit to the neighbour.
58  */
59 struct MessageQueue
60 {
61
62   /**
63    * This is a doubly linked list.
64    */
65   struct MessageQueue *next;
66
67   /**
68    * This is a doubly linked list.
69    */
70   struct MessageQueue *prev;
71
72   /**
73    * Once this message is actively being transmitted, which
74    * neighbour is it associated with?
75    */
76   struct NeighbourMapEntry *n;
77
78   /**
79    * Function to call once we're done.
80    */
81   GST_NeighbourSendContinuation cont;
82
83   /**
84    * Closure for 'cont'
85    */
86   void *cont_cls;
87
88   /**
89    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
90    * stuck together in memory.  Allocated at the end of this struct.
91    */
92   const char *message_buf;
93
94   /**
95    * Size of the message buf
96    */
97   size_t message_buf_size;
98
99   /**
100    * At what time should we fail?
101    */
102   struct GNUNET_TIME_Absolute timeout;
103
104 };
105
106
107 /**
108  * Entry in neighbours. 
109  */
110 struct NeighbourMapEntry
111 {
112
113   /**
114    * Head of list of messages we would like to send to this peer;
115    * must contain at most one message per client.
116    */
117   struct MessageQueue *messages_head;
118
119   /**
120    * Tail of list of messages we would like to send to this peer; must
121    * contain at most one message per client.
122    */
123   struct MessageQueue *messages_tail;
124
125   /**
126    * Context for address suggestion.
127    * NULL after we are connected.
128    */
129   struct GNUNET_ATS_SuggestionContext *asc;
130
131   /**
132    * Performance data for the peer.
133    */
134   struct GNUNET_TRANSPORT_ATS_Information *ats;
135
136   /**
137    * Are we currently trying to send a message? If so, which one?
138    */
139   struct MessageQueue *is_active;
140
141   /**
142    * Active session for communicating with the peer.
143    */
144   struct Session *session;
145
146   /**
147    * Name of the plugin we currently use.
148    */
149   char *plugin_name;
150
151   /**
152    * Address used for communicating with the peer, NULL for inbound connections.
153    */
154   void *addr;
155   
156   /**
157    * Number of bytes in 'addr'.
158    */
159   size_t addrlen;
160
161   /**
162    * Identity of this neighbour.
163    */
164   struct GNUNET_PeerIdentity id;
165
166   /**
167    * ID of task scheduled to run when this peer is about to
168    * time out (will free resources associated with the peer).
169    */
170   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
171
172   /**
173    * ID of task scheduled to run when we should try transmitting
174    * the head of the message queue.  
175    */
176   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
177
178   /**
179    * Tracker for inbound bandwidth.
180    */
181   struct GNUNET_BANDWIDTH_Tracker in_tracker;
182
183   /**
184    * How often has the other peer (recently) violated the inbound
185    * traffic limit?  Incremented by 10 per violation, decremented by 1
186    * per non-violation (for each time interval).
187    */
188   unsigned int quota_violation_count;
189
190   /**
191    * Number of values in 'ats' array.
192    */
193   unsigned int ats_count;
194
195   /**
196    * Are we already in the process of disconnecting this neighbour?
197    */
198   int in_disconnect;
199
200   /**
201    * Do we currently consider this neighbour connected? (as far as
202    * the connect/disconnect callbacks are concerned)?
203    */
204   int is_connected;
205
206 };
207
208
209 /**
210  * All known neighbours and their HELLOs.
211  */
212 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
213
214 /**
215  * Closure for connect_notify_cb and disconnect_notify_cb
216  */
217 static void *callback_cls;
218
219 /**
220  * Function to call when we connected to a neighbour.
221  */
222 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
223
224 /**
225  * Function to call when we disconnected from a neighbour.
226  */
227 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
228
229
230 /**
231  * Lookup a neighbour entry in the neighbours hash map.
232  *
233  * @param pid identity of the peer to look up
234  * @return the entry, NULL if there is no existing record
235  */
236 static struct NeighbourMapEntry *
237 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
238 {
239   return GNUNET_CONTAINER_multihashmap_get (neighbours,
240                                             &pid->hashPubKey);
241 }
242
243
244 /**
245  * Task invoked to start a transmission to another peer.
246  *
247  * @param cls the 'struct NeighbourMapEntry'
248  * @param tc scheduler context
249  */
250 static void
251 transmission_task (void *cls,
252                    const struct GNUNET_SCHEDULER_TaskContext *tc);
253
254
255 /**
256  * We're done with our transmission attempt, continue processing.
257  *
258  * @param cls the 'struct MessageQueue' of the message
259  * @param receiver intended receiver
260  * @param success whether it worked or not
261  */
262 static void
263 transmit_send_continuation (void *cls,
264                             const struct GNUNET_PeerIdentity *receiver,
265                             int success)
266 {
267   struct MessageQueue *mq;
268   struct NeighbourMapEntry *n;
269   
270   mq = cls;
271   n = mq->n;
272   if (NULL != n) 
273     {
274       GNUNET_assert (n->is_active == mq);
275       n->is_active = NULL;
276       GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
277       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
278                                                        n);
279     }
280   if (NULL != mq->cont)
281     mq->cont (mq->cont_cls,
282               success);
283   GNUNET_free (mq);
284 }
285
286
287 /**
288  * Check the ready list for the given neighbour and if a plugin is
289  * ready for transmission (and if we have a message), do so!
290  *
291  * @param neighbour target peer for which to transmit
292  */
293 static void
294 try_transmission_to_peer (struct NeighbourMapEntry *n)
295 {
296   struct MessageQueue *mq;
297   struct GNUNET_TIME_Relative timeout;
298   ssize_t ret;
299   struct GNUNET_TRANSPORT_PluginFunctions *papi;
300
301   if (n->is_active != NULL)
302     return; /* transmission already pending */
303   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
304     return; /* currently waiting for bandwidth */
305   mq = n->messages_head;
306   while (NULL != (mq = n->messages_head))
307     {
308       timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
309       if (timeout.rel_value > 0)
310         break;
311       transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */
312     }
313   if (NULL == mq)
314     return; /* no more messages */
315
316   papi = GST_plugins_find (n->plugin_name);
317   if (papi == NULL)
318     {
319       GNUNET_break (0);
320       return;
321     }
322   GNUNET_CONTAINER_DLL_remove (n->messages_head,
323                                n->messages_tail,
324                                mq);
325   n->is_active = mq;
326   mq->n = n;
327   ret = papi->send (papi->cls,
328                     &n->id,
329                     mq->message_buf,
330                     mq->message_buf_size,
331                     0 /* priority -- remove from plugin API? */,
332                     timeout,
333                     n->session,
334                     n->addr,
335                     n->addrlen,
336                     GNUNET_YES,
337                     &transmit_send_continuation, mq);
338   if (ret == -1)
339     {
340       /* failure, but 'send' would not call continuation in this case,
341          so we need to do it here! */
342       transmit_send_continuation (mq,
343                                   &n->id,
344                                   GNUNET_SYSERR);
345       n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
346                                                        n);
347     }
348 }
349
350
351 /**
352  * Task invoked to start a transmission to another peer.
353  *
354  * @param cls the 'struct NeighbourMapEntry'
355  * @param tc scheduler context
356  */
357 static void
358 transmission_task (void *cls,
359                    const struct GNUNET_SCHEDULER_TaskContext *tc)
360 {
361   struct NeighbourMapEntry *n = cls;
362
363   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
364   try_transmission_to_peer (n);
365 }
366
367
368 /**
369  * Initialize the neighbours subsystem.
370  *
371  * @param cls closure for callbacks
372  * @param connect_cb function to call if we connect to a peer
373  * @param disconnect_cb function to call if we disconnect from a peer
374  */
375 void 
376 GST_neighbours_start (void *cls,
377                       GNUNET_TRANSPORT_NotifyConnect connect_cb,
378                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
379 {
380   callback_cls = cls;
381   connect_notify_cb = connect_cb;
382   disconnect_notify_cb = disconnect_cb;
383   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
384 }
385
386
387 /**
388  * Disconnect from the given neighbour, clean up the record.
389  *
390  * @param n neighbour to disconnect from
391  */
392 static void
393 disconnect_neighbour (struct NeighbourMapEntry *n)
394 {
395   struct MessageQueue *mq;
396
397   if (GNUNET_YES == n->in_disconnect)
398     return;
399   n->in_disconnect = GNUNET_YES;
400   while (NULL != (mq = n->messages_head))
401     {
402       GNUNET_CONTAINER_DLL_remove (n->messages_head,
403                                    n->messages_tail,
404                                    mq);
405       mq->cont (mq->cont_cls, GNUNET_SYSERR);
406       GNUNET_free (mq);
407     }
408   if (NULL != n->is_active)
409     {
410       n->is_active->n = NULL;
411       n->is_active = NULL;
412     }
413   if (GNUNET_YES == n->is_connected)
414     {
415       n->is_connected = GNUNET_NO;
416       disconnect_notify_cb (callback_cls,
417                             &n->id);
418     }
419   GNUNET_assert (GNUNET_YES ==
420                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
421                                                        &n->id.hashPubKey,
422                                                        n));
423   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
424     {
425       GNUNET_SCHEDULER_cancel (n->timeout_task);
426       n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
427     }
428   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
429     {
430       GNUNET_SCHEDULER_cancel (n->timeout_task);
431       n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
432     }
433   if (NULL != n->asc)
434     {
435       GNUNET_ATS_suggest_address_cancel (n->asc);
436       n->asc = NULL;
437     }
438   GNUNET_array_grow (n->ats,
439                      n->ats_count,
440                      0);
441   if (NULL != n->plugin_name)
442     {
443       GNUNET_free (n->plugin_name);
444       n->plugin_name = NULL;
445     }
446   if (NULL != n->addr)
447     {
448       GNUNET_free (n->addr);
449       n->addr = NULL;
450       n->addrlen = 0;
451     }
452   n->session = NULL;
453   GNUNET_free (n);
454 }
455
456
457 /**
458  * Peer has been idle for too long. Disconnect.
459  *
460  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
461  * @param tc scheduler context
462  */
463 static void
464 neighbour_timeout_task (void *cls,
465                         const struct GNUNET_SCHEDULER_TaskContext *tc)
466 {
467   struct NeighbourMapEntry *n = cls;
468
469   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
470   disconnect_neighbour (n);
471 }
472
473
474 /**
475  * Disconnect from the given neighbour.
476  *
477  * @param cls unused
478  * @param key hash of neighbour's public key (not used)
479  * @param value the 'struct NeighbourMapEntry' of the neighbour
480  */
481 static int
482 disconnect_all_neighbours (void *cls,
483                            const GNUNET_HashCode *key,
484                            void *value)
485 {
486   struct NeighbourMapEntry *n = value;
487
488 #if DEBUG_TRANSPORT
489   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
490               "Disconnecting peer `%4s', %s\n",
491               GNUNET_i2s(&n->id),
492               "SHUTDOWN_TASK");
493 #endif
494   disconnect_neighbour (n);
495   return GNUNET_OK;
496 }
497
498
499 /**
500  * Cleanup the neighbours subsystem.
501  */
502 void
503 GST_neighbours_stop ()
504 {
505   GNUNET_CONTAINER_multihashmap_iterate (neighbours,
506                                          &disconnect_all_neighbours,
507                                          NULL);
508   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
509   neighbours = NULL;
510   callback_cls = NULL;
511   connect_notify_cb = NULL;
512   disconnect_notify_cb = NULL;
513 }
514
515
516 /**
517  * For an existing neighbour record, set the active connection to
518  * the given address.
519  *
520  * @param peer identity of the peer to switch the address for
521  * @param plugin_name name of transport that delivered the PONG
522  * @param address address of the other peer, NULL if other peer
523  *                       connected to us
524  * @param address_len number of bytes in address
525  * @param session session to use (or NULL)
526  * @param ats performance data
527  * @param ats_count number of entries in ats (excluding 0-termination)
528  */
529 void
530 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
531                                   const char *plugin_name,
532                                   const void *address,
533                                   size_t address_len,
534                                   struct Session *session,
535                                   const struct GNUNET_TRANSPORT_ATS_Information *ats,
536                                   uint32_t ats_count)
537 {
538   struct NeighbourMapEntry *n;
539   struct GNUNET_MessageHeader connect_msg;
540
541   n = lookup_neighbour (peer);
542   if (NULL == n)
543     {
544       GNUNET_break (0);
545       return;
546     }
547   GNUNET_free_non_null (n->addr);
548   n->addr = GNUNET_malloc (address_len);
549   memcpy (n->addr, address, address_len);
550   n->addrlen = address_len;
551   n->session = session;
552   GNUNET_array_grow (n->ats,
553                      n->ats_count,
554                      ats_count);
555   memcpy (n->ats,
556           ats,
557           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
558   GNUNET_free_non_null (n->plugin_name);
559   n->plugin_name = GNUNET_strdup (plugin_name);
560   GNUNET_SCHEDULER_cancel (n->timeout_task);
561   n->timeout_task =
562     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
563                                   &neighbour_timeout_task, n);
564   connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
565   connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
566   GST_neighbours_send (peer,
567                        &connect_msg,
568                        sizeof (connect_msg),
569                        GNUNET_TIME_UNIT_FOREVER_REL,
570                        NULL, NULL);
571 }
572
573
574 /**
575  * Try to connect to the target peer using the given address
576  *
577  * @param cls the 'struct NeighbourMapEntry' of the target
578  * @param target identity of the target peer
579  * @param plugin_name name of the plugin
580  * @param plugin_address binary address
581  * @param plugin_address_len length of address
582  * @param bandwidth available bandwidth
583  * @param ats performance data for the address (as far as known)
584  * @param ats_count number of performance records in 'ats'
585  */
586 static void
587 try_connect_using_address (void *cls,
588                            const struct GNUNET_PeerIdentity *target,
589                            const char *plugin_name,
590                            const void *plugin_address,
591                            size_t plugin_address_len,
592                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
593                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
594                            uint32_t ats_count)
595 {
596   struct NeighbourMapEntry *n = cls;
597
598   n->asc = NULL;
599   GST_neighbours_switch_to_address (target,
600                                     plugin_name,
601                                     plugin_address,
602                                     plugin_address_len,
603                                     NULL,
604                                     ats, ats_count);
605   if (GNUNET_YES == n->is_connected)
606     return;
607   n->is_connected = GNUNET_YES;  
608   connect_notify_cb (callback_cls,
609                      target,
610                      n->ats,
611                      n->ats_count);
612 }
613
614
615 /**
616  * Try to create a connection to the given target (eventually).
617  *
618  * @param target peer to try to connect to
619  */
620 void
621 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
622 {
623   struct NeighbourMapEntry *n;
624
625   GNUNET_assert (0 != memcmp (target,
626                               &GST_my_identity,
627                               sizeof (struct GNUNET_PeerIdentity)));
628   n = lookup_neighbour (target);
629   if ( (NULL != n) &&
630        (GNUNET_YES == n->is_connected) )
631     return; /* already connected */
632   if (n == NULL)
633     {
634       n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
635       n->id = *target;
636       GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
637                                      GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
638                                      MAX_BANDWIDTH_CARRY_S);
639       n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
640                                                       &neighbour_timeout_task, n);
641       GNUNET_assert (GNUNET_OK ==
642                      GNUNET_CONTAINER_multihashmap_put (neighbours,
643                                                         &n->id.hashPubKey,
644                                                         n,
645                                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
646     }
647   if (n->asc != NULL)
648     return; /* already trying */
649   n->asc = GNUNET_ATS_suggest_address (GST_ats,
650                                        target,
651                                        &try_connect_using_address,
652                                        n); 
653 }
654
655
656 /**
657  * Test if we're connected to the given peer.
658  * 
659  * @param target peer to test
660  * @return GNUNET_YES if we are connected, GNUNET_NO if not
661  */
662 int
663 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
664 {
665   struct NeighbourMapEntry *n;
666
667   n = lookup_neighbour (target);
668   if ( (NULL == n) ||
669        (n->is_connected == GNUNET_YES) )
670        return GNUNET_NO; /* not connected */
671   return GNUNET_YES;
672 }
673
674
675 /**
676  * A session was terminated. Take note.
677  *
678  * @param peer identity of the peer where the session died
679  * @param session session that is gone
680  */
681 void
682 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
683                                    struct Session *session)
684 {
685   struct NeighbourMapEntry *n;
686
687   n = lookup_neighbour (peer);
688   if (NULL == n)
689     return;
690   if (session != n->session)
691     return; /* doesn't affect us */
692   n->session = NULL;
693   if (GNUNET_YES != n->is_connected)
694     return; /* not connected anymore anyway, shouldn't matter */
695   /* try QUICKLY to re-establish a connection, reduce timeout! */
696   if (NULL != n->ats)
697     {
698       /* how can this be!? */
699       GNUNET_break (0);
700       return;
701     }
702   GNUNET_SCHEDULER_cancel (n->timeout_task);
703   n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
704                                                   &neighbour_timeout_task, n);
705   n->asc = GNUNET_ATS_suggest_address (GST_ats,
706                                        peer,
707                                        &try_connect_using_address,
708                                        n); 
709 }
710
711
712 /**
713  * Transmit a message to the given target using the active connection.
714  *
715  * @param target destination
716  * @param msg message to send
717  * @param msg_size number of bytes in msg
718  * @param timeout when to fail with timeout
719  * @param cont function to call when done
720  * @param cont_cls closure for 'cont'
721  */
722 void
723 GST_neighbours_send (const struct GNUNET_PeerIdentity *target,
724                      const void *msg,
725                      size_t msg_size,
726                      struct GNUNET_TIME_Relative timeout,
727                      GST_NeighbourSendContinuation cont,
728                      void *cont_cls)
729 {
730   struct NeighbourMapEntry *n;
731   struct MessageQueue *mq;
732
733   n = lookup_neighbour (target);
734   if ( (n == NULL) ||
735        (GNUNET_YES != n->is_connected) )
736     {
737       GNUNET_STATISTICS_update (GST_stats,
738                                 gettext_noop ("# SET QUOTA messages ignored (no such peer)"),
739                                 1,
740                                 GNUNET_NO);
741       if (NULL != cont)
742         cont (cont_cls,
743               GNUNET_SYSERR);
744       return;
745     }
746   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
747   GNUNET_STATISTICS_update (GST_stats,
748                             gettext_noop ("# bytes in message queue for other peers"),
749                             msg_size,
750                             GNUNET_NO);
751   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
752   mq->cont = cont;
753   mq->cont_cls = cont_cls;
754   /* FIXME: this memcpy can be up to 7% of our total runtime! */
755   memcpy (&mq[1], msg, msg_size);
756   mq->message_buf = (const char*) &mq[1];
757   mq->message_buf_size = msg_size;
758   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
759   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head,
760                                     n->messages_tail,
761                                     mq);
762   if ( (GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
763        (NULL == n->is_active) )
764     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task,
765                                                      n);
766 }
767
768
769 /**
770  * We have received a message from the given sender.  How long should
771  * we delay before receiving more?  (Also used to keep the peer marked
772  * as live).
773  *
774  * @param sender sender of the message
775  * @param size size of the message
776  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
777  *                   GNUNET_NO if the neighbour is not connected or violates the quota
778  * @return how long to wait before reading more from this sender
779  */
780 struct GNUNET_TIME_Relative
781 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity *sender,
782                                         ssize_t size,
783                                         int *do_forward)
784 {
785   struct NeighbourMapEntry *n;
786   struct GNUNET_TIME_Relative ret;
787
788   n = lookup_neighbour (sender);
789   if (n == NULL)
790     {
791       *do_forward = GNUNET_NO;
792       return GNUNET_TIME_UNIT_ZERO;
793     }
794   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
795                                                       size))
796     {
797       n->quota_violation_count++;
798 #if DEBUG_TRANSPORT
799       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800                   "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
801                   n->in_tracker.available_bytes_per_s__,
802                   n->quota_violation_count);
803 #endif
804       /* Discount 32k per violation */
805       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
806                                         - 32 * 1024);
807     }
808   else
809     {
810       if (n->quota_violation_count > 0)
811         {
812           /* try to add 32k back */
813           GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker,
814                                             32 * 1024);
815           n->quota_violation_count--;
816         }
817     }
818   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
819     {
820       GNUNET_STATISTICS_update (GST_stats,
821                                 gettext_noop ("# bandwidth quota violations by other peers"),
822                                 1,
823                                 GNUNET_NO);
824       *do_forward = GNUNET_NO;
825       return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
826     }
827   *do_forward = GNUNET_YES;
828   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
829   if (ret.rel_value > 0)
830     {
831 #if DEBUG_TRANSPORT 
832       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
833                   "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
834                   (unsigned long long) n->in_tracker.consumption_since_last_update__,
835                   (unsigned int) n->in_tracker.available_bytes_per_s__,
836                   (unsigned long long) ret.rel_value);
837 #endif
838       GNUNET_STATISTICS_update (GST_stats,
839                                 gettext_noop ("# ms throttling suggested"),
840                                 (int64_t) ret.rel_value,
841                                 GNUNET_NO);
842     }
843   return ret;
844 }
845
846
847 /**
848  * Keep the connection to the given neighbour alive longer,
849  * we received a KEEPALIVE (or equivalent).
850  *
851  * @param neighbour neighbour to keep alive
852  */
853 void
854 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
855 {
856   struct NeighbourMapEntry *n;
857
858   n = lookup_neighbour (neighbour);
859   if (NULL == n)
860     {
861       GNUNET_STATISTICS_update (GST_stats,
862                                 gettext_noop ("# KEEPALIVE messages discarded (not connected)"),
863                                 1,
864                                 GNUNET_NO);
865       return;
866     }
867   GNUNET_SCHEDULER_cancel (n->timeout_task);
868   n->timeout_task =
869     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
870                                   &neighbour_timeout_task, n);  
871 }
872
873
874 /**
875  * Change the incoming quota for the given peer.
876  *
877  * @param neighbour identity of peer to change qutoa for
878  * @param quota new quota 
879  */
880 void
881 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
882                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
883 {
884   struct NeighbourMapEntry *n;
885
886   n = lookup_neighbour (neighbour);
887   if (n == NULL)
888     {
889       GNUNET_STATISTICS_update (GST_stats,
890                                 gettext_noop ("# SET QUOTA messages ignored (no such peer)"),
891                                 1,
892                                 GNUNET_NO);
893       return;
894     }
895   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker,
896                                          quota);
897   if (0 != ntohl (quota.value__))
898     return;
899 #if DEBUG_TRANSPORT
900   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
901               "Disconnecting peer `%4s' due to `%s'\n",
902               GNUNET_i2s(&n->id),
903               "SET_QUOTA");
904 #endif
905   GNUNET_STATISTICS_update (GST_stats,
906                             gettext_noop ("# disconnects due to quota of 0"),
907                             1,
908                             GNUNET_NO);
909   disconnect_neighbour (n);
910 }
911
912
913 /**
914  * Closure for the neighbours_iterate function.
915  */
916 struct IteratorContext
917 {
918   /**
919    * Function to call on each connected neighbour.
920    */
921   GST_NeighbourIterator cb;
922
923   /**
924    * Closure for 'cb'.
925    */
926   void *cb_cls;
927 };
928
929
930 /**
931  * Call the callback from the closure for each connected neighbour.
932  *
933  * @param cls the 'struct IteratorContext'
934  * @param key the hash of the public key of the neighbour
935  * @param value the 'struct NeighbourMapEntry'
936  * @return GNUNET_OK (continue to iterate)
937  */
938 static int
939 neighbours_iterate (void *cls,
940                     const GNUNET_HashCode *key,
941                     void *value)
942 {
943   struct IteratorContext *ic = cls;
944   struct NeighbourMapEntry *n = value;
945
946   if (GNUNET_YES != n->is_connected)
947     return GNUNET_OK; 
948   GNUNET_assert (n->ats_count > 0);
949   ic->cb (ic->cb_cls,
950           &n->id,
951           n->ats,
952           n->ats_count - 1);
953   return GNUNET_OK;
954 }
955
956
957 /**
958  * Iterate over all connected neighbours.
959  *
960  * @param cb function to call 
961  * @param cb_cls closure for cb
962  */
963 void
964 GST_neighbours_iterate (GST_NeighbourIterator cb,
965                         void *cb_cls)
966 {
967   struct IteratorContext ic;
968
969   ic.cb = cb;
970   ic.cb_cls = cb_cls;
971   GNUNET_CONTAINER_multihashmap_iterate (neighbours,
972                                          &neighbours_iterate,
973                                          &ic);
974 }
975
976
977 /**
978  * If we have an active connection to the given target, it must be shutdown.
979  *
980  * @param target peer to disconnect from
981  */
982 void
983 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
984 {
985   struct NeighbourMapEntry *n;
986   struct GNUNET_TRANSPORT_PluginFunctions *papi;
987   struct GNUNET_MessageHeader disconnect_msg;
988
989   n = lookup_neighbour (target);
990   if (NULL == n) 
991     return; /* not active */
992   if (GNUNET_YES == n->is_connected)
993     {
994       /* we're actually connected, send DISCONNECT message */
995       disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
996       disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
997       papi = GST_plugins_find (n->plugin_name);
998       if (papi != NULL)
999         papi->send (papi->cls,
1000                     target,
1001                     (const void*) &disconnect_msg,
1002                     sizeof (struct GNUNET_MessageHeader),
1003                     UINT32_MAX /* priority */,
1004                     GNUNET_TIME_UNIT_FOREVER_REL,
1005                     n->session,
1006                     n->addr,
1007                     n->addrlen,
1008                     GNUNET_YES,
1009                     NULL, NULL);
1010     }
1011   disconnect_neighbour (n);
1012 }
1013
1014
1015 /* end of file gnunet-service-transport_neighbours.c */