indentation
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48
49 /**
50  * Entry in neighbours. 
51  */
52 struct NeighbourMapEntry;
53
54
55 /**
56  * For each neighbour we keep a list of messages
57  * that we still want to transmit to the neighbour.
58  */
59 struct MessageQueue
60 {
61
62   /**
63    * This is a doubly linked list.
64    */
65   struct MessageQueue *next;
66
67   /**
68    * This is a doubly linked list.
69    */
70   struct MessageQueue *prev;
71
72   /**
73    * Once this message is actively being transmitted, which
74    * neighbour is it associated with?
75    */
76   struct NeighbourMapEntry *n;
77
78   /**
79    * Function to call once we're done.
80    */
81   GST_NeighbourSendContinuation cont;
82
83   /**
84    * Closure for 'cont'
85    */
86   void *cont_cls;
87
88   /**
89    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
90    * stuck together in memory.  Allocated at the end of this struct.
91    */
92   const char *message_buf;
93
94   /**
95    * Size of the message buf
96    */
97   size_t message_buf_size;
98
99   /**
100    * At what time should we fail?
101    */
102   struct GNUNET_TIME_Absolute timeout;
103
104 };
105
106
107 /**
108  * Entry in neighbours. 
109  */
110 struct NeighbourMapEntry
111 {
112
113   /**
114    * Head of list of messages we would like to send to this peer;
115    * must contain at most one message per client.
116    */
117   struct MessageQueue *messages_head;
118
119   /**
120    * Tail of list of messages we would like to send to this peer; must
121    * contain at most one message per client.
122    */
123   struct MessageQueue *messages_tail;
124
125   /**
126    * Context for address suggestion.
127    * NULL after we are connected.
128    */
129   struct GNUNET_ATS_SuggestionContext *asc;
130
131   /**
132    * Performance data for the peer.
133    */
134   struct GNUNET_TRANSPORT_ATS_Information *ats;
135
136   /**
137    * Are we currently trying to send a message? If so, which one?
138    */
139   struct MessageQueue *is_active;
140
141   /**
142    * Active session for communicating with the peer.
143    */
144   struct Session *session;
145
146   /**
147    * Name of the plugin we currently use.
148    */
149   char *plugin_name;
150
151   /**
152    * Address used for communicating with the peer, NULL for inbound connections.
153    */
154   void *addr;
155
156   /**
157    * Number of bytes in 'addr'.
158    */
159   size_t addrlen;
160
161   /**
162    * Identity of this neighbour.
163    */
164   struct GNUNET_PeerIdentity id;
165
166   /**
167    * ID of task scheduled to run when this peer is about to
168    * time out (will free resources associated with the peer).
169    */
170   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
171
172   /**
173    * ID of task scheduled to run when we should try transmitting
174    * the head of the message queue.  
175    */
176   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
177
178   /**
179    * Tracker for inbound bandwidth.
180    */
181   struct GNUNET_BANDWIDTH_Tracker in_tracker;
182
183   /**
184    * How often has the other peer (recently) violated the inbound
185    * traffic limit?  Incremented by 10 per violation, decremented by 1
186    * per non-violation (for each time interval).
187    */
188   unsigned int quota_violation_count;
189
190   /**
191    * Number of values in 'ats' array.
192    */
193   unsigned int ats_count;
194
195   /**
196    * Are we already in the process of disconnecting this neighbour?
197    */
198   int in_disconnect;
199
200   /**
201    * Do we currently consider this neighbour connected? (as far as
202    * the connect/disconnect callbacks are concerned)?
203    */
204   int is_connected;
205
206 };
207
208
209 /**
210  * All known neighbours and their HELLOs.
211  */
212 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
213
214 /**
215  * Closure for connect_notify_cb and disconnect_notify_cb
216  */
217 static void *callback_cls;
218
219 /**
220  * Function to call when we connected to a neighbour.
221  */
222 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
223
224 /**
225  * Function to call when we disconnected from a neighbour.
226  */
227 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
228
229
230 /**
231  * Lookup a neighbour entry in the neighbours hash map.
232  *
233  * @param pid identity of the peer to look up
234  * @return the entry, NULL if there is no existing record
235  */
236 static struct NeighbourMapEntry *
237 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
238 {
239   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
240 }
241
242
243 /**
244  * Task invoked to start a transmission to another peer.
245  *
246  * @param cls the 'struct NeighbourMapEntry'
247  * @param tc scheduler context
248  */
249 static void
250 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
251
252
253 /**
254  * We're done with our transmission attempt, continue processing.
255  *
256  * @param cls the 'struct MessageQueue' of the message
257  * @param receiver intended receiver
258  * @param success whether it worked or not
259  */
260 static void
261 transmit_send_continuation (void *cls,
262                             const struct GNUNET_PeerIdentity *receiver,
263                             int success)
264 {
265   struct MessageQueue *mq;
266   struct NeighbourMapEntry *n;
267
268   mq = cls;
269   n = mq->n;
270   if (NULL != n)
271   {
272     GNUNET_assert (n->is_active == mq);
273     n->is_active = NULL;
274     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
275     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
276   }
277   if (NULL != mq->cont)
278     mq->cont (mq->cont_cls, success);
279   GNUNET_free (mq);
280 }
281
282
283 /**
284  * Check the ready list for the given neighbour and if a plugin is
285  * ready for transmission (and if we have a message), do so!
286  *
287  * @param neighbour target peer for which to transmit
288  */
289 static void
290 try_transmission_to_peer (struct NeighbourMapEntry *n)
291 {
292   struct MessageQueue *mq;
293   struct GNUNET_TIME_Relative timeout;
294   ssize_t ret;
295   struct GNUNET_TRANSPORT_PluginFunctions *papi;
296
297   if (n->is_active != NULL)
298     return;                     /* transmission already pending */
299   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
300     return;                     /* currently waiting for bandwidth */
301   mq = n->messages_head;
302   while (NULL != (mq = n->messages_head))
303   {
304     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
305     if (timeout.rel_value > 0)
306       break;
307     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
308   }
309   if (NULL == mq)
310     return;                     /* no more messages */
311
312   papi = GST_plugins_find (n->plugin_name);
313   if (papi == NULL)
314   {
315     GNUNET_break (0);
316     return;
317   }
318   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
319   n->is_active = mq;
320   mq->n = n;
321   ret = papi->send (papi->cls,
322                     &n->id,
323                     mq->message_buf,
324                     mq->message_buf_size,
325                     0 /* priority -- remove from plugin API? */ ,
326                     timeout,
327                     n->session,
328                     n->addr,
329                     n->addrlen, GNUNET_YES, &transmit_send_continuation, mq);
330   if (ret == -1)
331   {
332     /* failure, but 'send' would not call continuation in this case,
333      * so we need to do it here! */
334     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
335     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
336   }
337 }
338
339
340 /**
341  * Task invoked to start a transmission to another peer.
342  *
343  * @param cls the 'struct NeighbourMapEntry'
344  * @param tc scheduler context
345  */
346 static void
347 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
348 {
349   struct NeighbourMapEntry *n = cls;
350
351   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
352   try_transmission_to_peer (n);
353 }
354
355
356 /**
357  * Initialize the neighbours subsystem.
358  *
359  * @param cls closure for callbacks
360  * @param connect_cb function to call if we connect to a peer
361  * @param disconnect_cb function to call if we disconnect from a peer
362  */
363 void
364 GST_neighbours_start (void *cls,
365                       GNUNET_TRANSPORT_NotifyConnect connect_cb,
366                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
367 {
368   callback_cls = cls;
369   connect_notify_cb = connect_cb;
370   disconnect_notify_cb = disconnect_cb;
371   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
372 }
373
374
375 /**
376  * Disconnect from the given neighbour, clean up the record.
377  *
378  * @param n neighbour to disconnect from
379  */
380 static void
381 disconnect_neighbour (struct NeighbourMapEntry *n)
382 {
383   struct MessageQueue *mq;
384
385   if (GNUNET_YES == n->in_disconnect)
386     return;
387   n->in_disconnect = GNUNET_YES;
388   while (NULL != (mq = n->messages_head))
389   {
390     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
391     mq->cont (mq->cont_cls, GNUNET_SYSERR);
392     GNUNET_free (mq);
393   }
394   if (NULL != n->is_active)
395   {
396     n->is_active->n = NULL;
397     n->is_active = NULL;
398   }
399   if (GNUNET_YES == n->is_connected)
400   {
401     n->is_connected = GNUNET_NO;
402     disconnect_notify_cb (callback_cls, &n->id);
403   }
404   GNUNET_assert (GNUNET_YES ==
405                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
406                                                        &n->id.hashPubKey, n));
407   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
408   {
409     GNUNET_SCHEDULER_cancel (n->timeout_task);
410     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
411   }
412   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
413   {
414     GNUNET_SCHEDULER_cancel (n->timeout_task);
415     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
416   }
417   if (NULL != n->asc)
418   {
419     GNUNET_ATS_suggest_address_cancel (n->asc);
420     n->asc = NULL;
421   }
422   GNUNET_array_grow (n->ats, n->ats_count, 0);
423   if (NULL != n->plugin_name)
424   {
425     GNUNET_free (n->plugin_name);
426     n->plugin_name = NULL;
427   }
428   if (NULL != n->addr)
429   {
430     GNUNET_free (n->addr);
431     n->addr = NULL;
432     n->addrlen = 0;
433   }
434   n->session = NULL;
435   GNUNET_free (n);
436 }
437
438
439 /**
440  * Peer has been idle for too long. Disconnect.
441  *
442  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
443  * @param tc scheduler context
444  */
445 static void
446 neighbour_timeout_task (void *cls,
447                         const struct GNUNET_SCHEDULER_TaskContext *tc)
448 {
449   struct NeighbourMapEntry *n = cls;
450
451   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
452   disconnect_neighbour (n);
453 }
454
455
456 /**
457  * Disconnect from the given neighbour.
458  *
459  * @param cls unused
460  * @param key hash of neighbour's public key (not used)
461  * @param value the 'struct NeighbourMapEntry' of the neighbour
462  */
463 static int
464 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
465 {
466   struct NeighbourMapEntry *n = value;
467
468 #if DEBUG_TRANSPORT
469   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
470               "Disconnecting peer `%4s', %s\n",
471               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
472 #endif
473   disconnect_neighbour (n);
474   return GNUNET_OK;
475 }
476
477
478 /**
479  * Cleanup the neighbours subsystem.
480  */
481 void
482 GST_neighbours_stop ()
483 {
484   GNUNET_CONTAINER_multihashmap_iterate (neighbours,
485                                          &disconnect_all_neighbours, NULL);
486   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
487   neighbours = NULL;
488   callback_cls = NULL;
489   connect_notify_cb = NULL;
490   disconnect_notify_cb = NULL;
491 }
492
493
494 /**
495  * For an existing neighbour record, set the active connection to
496  * the given address.
497  *
498  * @param peer identity of the peer to switch the address for
499  * @param plugin_name name of transport that delivered the PONG
500  * @param address address of the other peer, NULL if other peer
501  *                       connected to us
502  * @param address_len number of bytes in address
503  * @param session session to use (or NULL)
504  * @param ats performance data
505  * @param ats_count number of entries in ats (excluding 0-termination)
506  */
507 void
508 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
509                                   const char *plugin_name,
510                                   const void *address,
511                                   size_t address_len,
512                                   struct Session *session,
513                                   const struct GNUNET_TRANSPORT_ATS_Information
514                                   *ats, uint32_t ats_count)
515 {
516   struct NeighbourMapEntry *n;
517   struct GNUNET_MessageHeader connect_msg;
518
519   n = lookup_neighbour (peer);
520   if (NULL == n)
521   {
522     GNUNET_break (0);
523     return;
524   }
525   GNUNET_free_non_null (n->addr);
526   n->addr = GNUNET_malloc (address_len);
527   memcpy (n->addr, address, address_len);
528   n->addrlen = address_len;
529   n->session = session;
530   GNUNET_array_grow (n->ats, n->ats_count, ats_count);
531   memcpy (n->ats,
532           ats, ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
533   GNUNET_free_non_null (n->plugin_name);
534   n->plugin_name = GNUNET_strdup (plugin_name);
535   GNUNET_SCHEDULER_cancel (n->timeout_task);
536   n->timeout_task =
537       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
538                                     &neighbour_timeout_task, n);
539   connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
540   connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
541   GST_neighbours_send (peer,
542                        &connect_msg,
543                        sizeof (connect_msg),
544                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
545 }
546
547
548 /**
549  * Try to connect to the target peer using the given address
550  *
551  * @param cls the 'struct NeighbourMapEntry' of the target
552  * @param target identity of the target peer
553  * @param plugin_name name of the plugin
554  * @param plugin_address binary address
555  * @param plugin_address_len length of address
556  * @param bandwidth available bandwidth
557  * @param ats performance data for the address (as far as known)
558  * @param ats_count number of performance records in 'ats'
559  */
560 static void
561 try_connect_using_address (void *cls,
562                            const struct GNUNET_PeerIdentity *target,
563                            const char *plugin_name,
564                            const void *plugin_address,
565                            size_t plugin_address_len,
566                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
567                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
568                            uint32_t ats_count)
569 {
570   struct NeighbourMapEntry *n = cls;
571
572   n->asc = NULL;
573   GST_neighbours_switch_to_address (target,
574                                     plugin_name,
575                                     plugin_address,
576                                     plugin_address_len, NULL, ats, ats_count);
577   if (GNUNET_YES == n->is_connected)
578     return;
579   n->is_connected = GNUNET_YES;
580   connect_notify_cb (callback_cls, target, n->ats, n->ats_count);
581 }
582
583
584 /**
585  * Try to create a connection to the given target (eventually).
586  *
587  * @param target peer to try to connect to
588  */
589 void
590 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
591 {
592   struct NeighbourMapEntry *n;
593
594   GNUNET_assert (0 != memcmp (target,
595                               &GST_my_identity,
596                               sizeof (struct GNUNET_PeerIdentity)));
597   n = lookup_neighbour (target);
598   if ((NULL != n) && (GNUNET_YES == n->is_connected))
599     return;                     /* already connected */
600   if (n == NULL)
601   {
602     n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
603     n->id = *target;
604     GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
605                                    GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
606                                    MAX_BANDWIDTH_CARRY_S);
607     n->timeout_task =
608         GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
609                                       &neighbour_timeout_task, n);
610     GNUNET_assert (GNUNET_OK ==
611                    GNUNET_CONTAINER_multihashmap_put (neighbours,
612                                                       &n->id.hashPubKey, n,
613                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
614   }
615   if (n->asc != NULL)
616     return;                     /* already trying */
617   n->asc = GNUNET_ATS_suggest_address (GST_ats,
618                                        target, &try_connect_using_address, n);
619 }
620
621
622 /**
623  * Test if we're connected to the given peer.
624  * 
625  * @param target peer to test
626  * @return GNUNET_YES if we are connected, GNUNET_NO if not
627  */
628 int
629 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
630 {
631   struct NeighbourMapEntry *n;
632
633   n = lookup_neighbour (target);
634   if ((NULL == n) || (n->is_connected == GNUNET_YES))
635     return GNUNET_NO;           /* not connected */
636   return GNUNET_YES;
637 }
638
639
640 /**
641  * A session was terminated. Take note.
642  *
643  * @param peer identity of the peer where the session died
644  * @param session session that is gone
645  */
646 void
647 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
648                                    struct Session *session)
649 {
650   struct NeighbourMapEntry *n;
651
652   n = lookup_neighbour (peer);
653   if (NULL == n)
654     return;
655   if (session != n->session)
656     return;                     /* doesn't affect us */
657   n->session = NULL;
658   if (GNUNET_YES != n->is_connected)
659     return;                     /* not connected anymore anyway, shouldn't matter */
660   /* try QUICKLY to re-establish a connection, reduce timeout! */
661   if (NULL != n->ats)
662   {
663     /* how can this be!? */
664     GNUNET_break (0);
665     return;
666   }
667   GNUNET_SCHEDULER_cancel (n->timeout_task);
668   n->timeout_task =
669       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
670                                     &neighbour_timeout_task, n);
671   n->asc =
672       GNUNET_ATS_suggest_address (GST_ats, peer, &try_connect_using_address, n);
673 }
674
675
676 /**
677  * Transmit a message to the given target using the active connection.
678  *
679  * @param target destination
680  * @param msg message to send
681  * @param msg_size number of bytes in msg
682  * @param timeout when to fail with timeout
683  * @param cont function to call when done
684  * @param cont_cls closure for 'cont'
685  */
686 void
687 GST_neighbours_send (const struct GNUNET_PeerIdentity *target,
688                      const void *msg,
689                      size_t msg_size,
690                      struct GNUNET_TIME_Relative timeout,
691                      GST_NeighbourSendContinuation cont, void *cont_cls)
692 {
693   struct NeighbourMapEntry *n;
694   struct MessageQueue *mq;
695
696   n = lookup_neighbour (target);
697   if ((n == NULL) || (GNUNET_YES != n->is_connected))
698   {
699     GNUNET_STATISTICS_update (GST_stats,
700                               gettext_noop
701                               ("# SET QUOTA messages ignored (no such peer)"),
702                               1, GNUNET_NO);
703     if (NULL != cont)
704       cont (cont_cls, GNUNET_SYSERR);
705     return;
706   }
707   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
708   GNUNET_STATISTICS_update (GST_stats,
709                             gettext_noop
710                             ("# bytes in message queue for other peers"),
711                             msg_size, GNUNET_NO);
712   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
713   mq->cont = cont;
714   mq->cont_cls = cont_cls;
715   /* FIXME: this memcpy can be up to 7% of our total runtime! */
716   memcpy (&mq[1], msg, msg_size);
717   mq->message_buf = (const char *) &mq[1];
718   mq->message_buf_size = msg_size;
719   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
720   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
721   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
722       (NULL == n->is_active))
723     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
724 }
725
726
727 /**
728  * We have received a message from the given sender.  How long should
729  * we delay before receiving more?  (Also used to keep the peer marked
730  * as live).
731  *
732  * @param sender sender of the message
733  * @param size size of the message
734  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
735  *                   GNUNET_NO if the neighbour is not connected or violates the quota
736  * @return how long to wait before reading more from this sender
737  */
738 struct GNUNET_TIME_Relative
739 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
740                                         *sender, ssize_t size, int *do_forward)
741 {
742   struct NeighbourMapEntry *n;
743   struct GNUNET_TIME_Relative ret;
744
745   n = lookup_neighbour (sender);
746   if (n == NULL)
747   {
748     *do_forward = GNUNET_NO;
749     return GNUNET_TIME_UNIT_ZERO;
750   }
751   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
752   {
753     n->quota_violation_count++;
754 #if DEBUG_TRANSPORT
755     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
756                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
757                 n->in_tracker.available_bytes_per_s__,
758                 n->quota_violation_count);
759 #endif
760     /* Discount 32k per violation */
761     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
762   }
763   else
764   {
765     if (n->quota_violation_count > 0)
766     {
767       /* try to add 32k back */
768       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
769       n->quota_violation_count--;
770     }
771   }
772   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
773   {
774     GNUNET_STATISTICS_update (GST_stats,
775                               gettext_noop
776                               ("# bandwidth quota violations by other peers"),
777                               1, GNUNET_NO);
778     *do_forward = GNUNET_NO;
779     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
780   }
781   *do_forward = GNUNET_YES;
782   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
783   if (ret.rel_value > 0)
784   {
785 #if DEBUG_TRANSPORT
786     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
788                 (unsigned long long) n->
789                 in_tracker.consumption_since_last_update__,
790                 (unsigned int) n->in_tracker.available_bytes_per_s__,
791                 (unsigned long long) ret.rel_value);
792 #endif
793     GNUNET_STATISTICS_update (GST_stats,
794                               gettext_noop ("# ms throttling suggested"),
795                               (int64_t) ret.rel_value, GNUNET_NO);
796   }
797   return ret;
798 }
799
800
801 /**
802  * Keep the connection to the given neighbour alive longer,
803  * we received a KEEPALIVE (or equivalent).
804  *
805  * @param neighbour neighbour to keep alive
806  */
807 void
808 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
809 {
810   struct NeighbourMapEntry *n;
811
812   n = lookup_neighbour (neighbour);
813   if (NULL == n)
814   {
815     GNUNET_STATISTICS_update (GST_stats,
816                               gettext_noop
817                               ("# KEEPALIVE messages discarded (not connected)"),
818                               1, GNUNET_NO);
819     return;
820   }
821   GNUNET_SCHEDULER_cancel (n->timeout_task);
822   n->timeout_task =
823       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
824                                     &neighbour_timeout_task, n);
825 }
826
827
828 /**
829  * Change the incoming quota for the given peer.
830  *
831  * @param neighbour identity of peer to change qutoa for
832  * @param quota new quota 
833  */
834 void
835 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
836                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
837 {
838   struct NeighbourMapEntry *n;
839
840   n = lookup_neighbour (neighbour);
841   if (n == NULL)
842   {
843     GNUNET_STATISTICS_update (GST_stats,
844                               gettext_noop
845                               ("# SET QUOTA messages ignored (no such peer)"),
846                               1, GNUNET_NO);
847     return;
848   }
849   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
850   if (0 != ntohl (quota.value__))
851     return;
852 #if DEBUG_TRANSPORT
853   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
854               "Disconnecting peer `%4s' due to `%s'\n",
855               GNUNET_i2s (&n->id), "SET_QUOTA");
856 #endif
857   GNUNET_STATISTICS_update (GST_stats,
858                             gettext_noop ("# disconnects due to quota of 0"),
859                             1, GNUNET_NO);
860   disconnect_neighbour (n);
861 }
862
863
864 /**
865  * Closure for the neighbours_iterate function.
866  */
867 struct IteratorContext
868 {
869   /**
870    * Function to call on each connected neighbour.
871    */
872   GST_NeighbourIterator cb;
873
874   /**
875    * Closure for 'cb'.
876    */
877   void *cb_cls;
878 };
879
880
881 /**
882  * Call the callback from the closure for each connected neighbour.
883  *
884  * @param cls the 'struct IteratorContext'
885  * @param key the hash of the public key of the neighbour
886  * @param value the 'struct NeighbourMapEntry'
887  * @return GNUNET_OK (continue to iterate)
888  */
889 static int
890 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
891 {
892   struct IteratorContext *ic = cls;
893   struct NeighbourMapEntry *n = value;
894
895   if (GNUNET_YES != n->is_connected)
896     return GNUNET_OK;
897   GNUNET_assert (n->ats_count > 0);
898   ic->cb (ic->cb_cls, &n->id, n->ats, n->ats_count - 1);
899   return GNUNET_OK;
900 }
901
902
903 /**
904  * Iterate over all connected neighbours.
905  *
906  * @param cb function to call 
907  * @param cb_cls closure for cb
908  */
909 void
910 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
911 {
912   struct IteratorContext ic;
913
914   ic.cb = cb;
915   ic.cb_cls = cb_cls;
916   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
917 }
918
919
920 /**
921  * If we have an active connection to the given target, it must be shutdown.
922  *
923  * @param target peer to disconnect from
924  */
925 void
926 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
927 {
928   struct NeighbourMapEntry *n;
929   struct GNUNET_TRANSPORT_PluginFunctions *papi;
930   struct GNUNET_MessageHeader disconnect_msg;
931
932   n = lookup_neighbour (target);
933   if (NULL == n)
934     return;                     /* not active */
935   if (GNUNET_YES == n->is_connected)
936   {
937     /* we're actually connected, send DISCONNECT message */
938     disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
939     disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
940     papi = GST_plugins_find (n->plugin_name);
941     if (papi != NULL)
942       papi->send (papi->cls,
943                   target,
944                   (const void *) &disconnect_msg,
945                   sizeof (struct GNUNET_MessageHeader),
946                   UINT32_MAX /* priority */ ,
947                   GNUNET_TIME_UNIT_FOREVER_REL,
948                   n->session, n->addr, n->addrlen, GNUNET_YES, NULL, NULL);
949   }
950   disconnect_neighbour (n);
951 }
952
953
954 /* end of file gnunet-service-transport_neighbours.c */