ensure neighbour map is existing
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48
49 /**
50  * Entry in neighbours.
51  */
52 struct NeighbourMapEntry;
53
54
55 /**
56  * For each neighbour we keep a list of messages
57  * that we still want to transmit to the neighbour.
58  */
59 struct MessageQueue
60 {
61
62   /**
63    * This is a doubly linked list.
64    */
65   struct MessageQueue *next;
66
67   /**
68    * This is a doubly linked list.
69    */
70   struct MessageQueue *prev;
71
72   /**
73    * Once this message is actively being transmitted, which
74    * neighbour is it associated with?
75    */
76   struct NeighbourMapEntry *n;
77
78   /**
79    * Function to call once we're done.
80    */
81   GST_NeighbourSendContinuation cont;
82
83   /**
84    * Closure for 'cont'
85    */
86   void *cont_cls;
87
88   /**
89    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
90    * stuck together in memory.  Allocated at the end of this struct.
91    */
92   const char *message_buf;
93
94   /**
95    * Size of the message buf
96    */
97   size_t message_buf_size;
98
99   /**
100    * At what time should we fail?
101    */
102   struct GNUNET_TIME_Absolute timeout;
103
104 };
105
106
107 /**
108  * Entry in neighbours.
109  */
110 struct NeighbourMapEntry
111 {
112
113   /**
114    * Head of list of messages we would like to send to this peer;
115    * must contain at most one message per client.
116    */
117   struct MessageQueue *messages_head;
118
119   /**
120    * Tail of list of messages we would like to send to this peer; must
121    * contain at most one message per client.
122    */
123   struct MessageQueue *messages_tail;
124
125   /**
126    * Context for address suggestion.
127    * NULL after we are connected.
128    */
129   struct GNUNET_ATS_SuggestionContext *asc;
130
131   /**
132    * Performance data for the peer.
133    */
134   struct GNUNET_TRANSPORT_ATS_Information *ats;
135
136   /**
137    * Are we currently trying to send a message? If so, which one?
138    */
139   struct MessageQueue *is_active;
140
141   /**
142    * Active session for communicating with the peer.
143    */
144   struct Session *session;
145
146   /**
147    * Name of the plugin we currently use.
148    */
149   char *plugin_name;
150
151   /**
152    * Address used for communicating with the peer, NULL for inbound connections.
153    */
154   void *addr;
155
156   /**
157    * Number of bytes in 'addr'.
158    */
159   size_t addrlen;
160
161   /**
162    * Identity of this neighbour.
163    */
164   struct GNUNET_PeerIdentity id;
165
166   /**
167    * ID of task scheduled to run when this peer is about to
168    * time out (will free resources associated with the peer).
169    */
170   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
171
172   /**
173    * ID of task scheduled to run when we should try transmitting
174    * the head of the message queue.
175    */
176   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
177
178   /**
179    * Tracker for inbound bandwidth.
180    */
181   struct GNUNET_BANDWIDTH_Tracker in_tracker;
182
183   /**
184    * How often has the other peer (recently) violated the inbound
185    * traffic limit?  Incremented by 10 per violation, decremented by 1
186    * per non-violation (for each time interval).
187    */
188   unsigned int quota_violation_count;
189
190   /**
191    * Number of values in 'ats' array.
192    */
193   unsigned int ats_count;
194
195   /**
196    * Are we already in the process of disconnecting this neighbour?
197    */
198   int in_disconnect;
199
200   /**
201    * Do we currently consider this neighbour connected? (as far as
202    * the connect/disconnect callbacks are concerned)?
203    */
204   int is_connected;
205
206 };
207
208
209 /**
210  * All known neighbours and their HELLOs.
211  */
212 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
213
214 /**
215  * Closure for connect_notify_cb and disconnect_notify_cb
216  */
217 static void *callback_cls;
218
219 /**
220  * Function to call when we connected to a neighbour.
221  */
222 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
223
224 /**
225  * Function to call when we disconnected from a neighbour.
226  */
227 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
228
229 /**
230  * Lookup a neighbour entry in the neighbours hash map.
231  *
232  * @param pid identity of the peer to look up
233  * @return the entry, NULL if there is no existing record
234  */
235 static struct NeighbourMapEntry *
236 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
237 {
238   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
239 }
240
241
242 /**
243  * Task invoked to start a transmission to another peer.
244  *
245  * @param cls the 'struct NeighbourMapEntry'
246  * @param tc scheduler context
247  */
248 static void
249 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
250
251
252 /**
253  * We're done with our transmission attempt, continue processing.
254  *
255  * @param cls the 'struct MessageQueue' of the message
256  * @param receiver intended receiver
257  * @param success whether it worked or not
258  */
259 static void
260 transmit_send_continuation (void *cls,
261                             const struct GNUNET_PeerIdentity *receiver,
262                             int success)
263 {
264   struct MessageQueue *mq;
265   struct NeighbourMapEntry *n;
266
267   mq = cls;
268   n = mq->n;
269   if (NULL != n)
270   {
271     GNUNET_assert (n->is_active == mq);
272     n->is_active = NULL;
273     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
274     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
275   }
276   if (NULL != mq->cont)
277     mq->cont (mq->cont_cls, success);
278   GNUNET_free (mq);
279 }
280
281
282 /**
283  * Check the ready list for the given neighbour and if a plugin is
284  * ready for transmission (and if we have a message), do so!
285  *
286  * @param neighbour target peer for which to transmit
287  */
288 static void
289 try_transmission_to_peer (struct NeighbourMapEntry *n)
290 {
291   struct MessageQueue *mq;
292   struct GNUNET_TIME_Relative timeout;
293   ssize_t ret;
294   struct GNUNET_TRANSPORT_PluginFunctions *papi;
295
296   if (n->is_active != NULL)
297     return;                     /* transmission already pending */
298   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
299     return;                     /* currently waiting for bandwidth */
300   mq = n->messages_head;
301   while (NULL != (mq = n->messages_head))
302   {
303     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
304     if (timeout.rel_value > 0)
305       break;
306     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
307   }
308   if (NULL == mq)
309     return;                     /* no more messages */
310
311   papi = GST_plugins_find (n->plugin_name);
312   if (papi == NULL)
313   {
314     GNUNET_break (0);
315     return;
316   }
317   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
318   n->is_active = mq;
319   mq->n = n;
320
321
322   ret =
323       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
324                   0 /* priority -- remove from plugin API? */ ,
325                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
326                   &transmit_send_continuation, mq);
327   if (ret == -1)
328   {
329     /* failure, but 'send' would not call continuation in this case,
330      * so we need to do it here! */
331     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
332     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
333   }
334 }
335
336
337 /**
338  * Task invoked to start a transmission to another peer.
339  *
340  * @param cls the 'struct NeighbourMapEntry'
341  * @param tc scheduler context
342  */
343 static void
344 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
345 {
346   struct NeighbourMapEntry *n = cls;
347
348   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
349   try_transmission_to_peer (n);
350 }
351
352
353 /**
354  * Initialize the neighbours subsystem.
355  *
356  * @param cls closure for callbacks
357  * @param connect_cb function to call if we connect to a peer
358  * @param disconnect_cb function to call if we disconnect from a peer
359  */
360 void
361 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
362                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
363 {
364   callback_cls = cls;
365   connect_notify_cb = connect_cb;
366   disconnect_notify_cb = disconnect_cb;
367   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
368 }
369
370
371 /**
372  * Disconnect from the given neighbour, clean up the record.
373  *
374  * @param n neighbour to disconnect from
375  */
376 static void
377 disconnect_neighbour (struct NeighbourMapEntry *n)
378 {
379   struct MessageQueue *mq;
380
381   if (GNUNET_YES == n->in_disconnect)
382     return;
383   n->in_disconnect = GNUNET_YES;
384   while (NULL != (mq = n->messages_head))
385   {
386     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
387     mq->cont (mq->cont_cls, GNUNET_SYSERR);
388     GNUNET_free (mq);
389   }
390   if (NULL != n->is_active)
391   {
392     n->is_active->n = NULL;
393     n->is_active = NULL;
394   }
395   if (GNUNET_YES == n->is_connected)
396   {
397     n->is_connected = GNUNET_NO;
398     disconnect_notify_cb (callback_cls, &n->id);
399   }
400   GNUNET_assert (GNUNET_YES ==
401                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
402                                                        &n->id.hashPubKey, n));
403   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
404   {
405     GNUNET_SCHEDULER_cancel (n->timeout_task);
406     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
407   }
408   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
409   {
410     GNUNET_SCHEDULER_cancel (n->timeout_task);
411     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
412   }
413   if (NULL != n->asc)
414   {
415     GNUNET_ATS_suggest_address_cancel (n->asc);
416     n->asc = NULL;
417   }
418   GNUNET_array_grow (n->ats, n->ats_count, 0);
419   if (NULL != n->plugin_name)
420   {
421     GNUNET_free (n->plugin_name);
422     n->plugin_name = NULL;
423   }
424   if (NULL != n->addr)
425   {
426     GNUNET_free (n->addr);
427     n->addr = NULL;
428     n->addrlen = 0;
429   }
430   n->session = NULL;
431   GNUNET_free (n);
432 }
433
434
435 /**
436  * Peer has been idle for too long. Disconnect.
437  *
438  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
439  * @param tc scheduler context
440  */
441 static void
442 neighbour_timeout_task (void *cls,
443                         const struct GNUNET_SCHEDULER_TaskContext *tc)
444 {
445   struct NeighbourMapEntry *n = cls;
446
447   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
448   disconnect_neighbour (n);
449 }
450
451
452 /**
453  * Disconnect from the given neighbour.
454  *
455  * @param cls unused
456  * @param key hash of neighbour's public key (not used)
457  * @param value the 'struct NeighbourMapEntry' of the neighbour
458  */
459 static int
460 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
461 {
462   struct NeighbourMapEntry *n = value;
463
464 #if DEBUG_TRANSPORT
465   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
466               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
467 #endif
468   disconnect_neighbour (n);
469   return GNUNET_OK;
470 }
471
472
473 /**
474  * Cleanup the neighbours subsystem.
475  */
476 void
477 GST_neighbours_stop ()
478 {
479   GNUNET_assert (neighbours != NULL);
480
481   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
482                                          NULL);
483   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
484   neighbours = NULL;
485   callback_cls = NULL;
486   connect_notify_cb = NULL;
487   disconnect_notify_cb = NULL;
488 }
489
490
491 /**
492  * For an existing neighbour record, set the active connection to
493  * the given address.
494  *
495  * @param peer identity of the peer to switch the address for
496  * @param plugin_name name of transport that delivered the PONG
497  * @param address address of the other peer, NULL if other peer
498  *                       connected to us
499  * @param address_len number of bytes in address
500  * @param session session to use (or NULL)
501  * @param ats performance data
502  * @param ats_count number of entries in ats (excluding 0-termination)
503  */
504 void
505 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
506                                   const char *plugin_name, const void *address,
507                                   size_t address_len, struct Session *session,
508                                   const struct GNUNET_TRANSPORT_ATS_Information
509                                   *ats, uint32_t ats_count)
510 {
511   struct NeighbourMapEntry *n;
512   struct GNUNET_MessageHeader connect_msg;
513
514   GNUNET_assert (neighbours != NULL);
515
516   n = lookup_neighbour (peer);
517   if (NULL == n)
518   {
519     GNUNET_break (0);
520     return;
521   }
522
523 #if DEBUG_TRANSPORT
524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525               "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
526               GNUNET_i2s (peer), plugin_name,
527               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
528                                                                   address,
529                                                                   address_len),
530               session);
531 #endif
532
533   GNUNET_free_non_null (n->addr);
534   n->addr = GNUNET_malloc (address_len);
535   memcpy (n->addr, address, address_len);
536   n->addrlen = address_len;
537   n->session = session;
538   GNUNET_array_grow (n->ats, n->ats_count, ats_count);
539   memcpy (n->ats, ats,
540           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
541   GNUNET_free_non_null (n->plugin_name);
542   n->plugin_name = GNUNET_strdup (plugin_name);
543   GNUNET_SCHEDULER_cancel (n->timeout_task);
544   n->timeout_task =
545       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
546                                     &neighbour_timeout_task, n);
547   connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
548   connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
549   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
550                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
551 }
552
553
554 /**
555  * Try to connect to the target peer using the given address
556  *
557  * @param cls the 'struct NeighbourMapEntry' of the target
558  * @param target identity of the target peer
559  * @param plugin_name name of the plugin
560  * @param plugin_address binary address
561  * @param plugin_address_len length of address
562  * @param bandwidth available bandwidth
563  * @param ats performance data for the address (as far as known)
564  * @param ats_count number of performance records in 'ats'
565  */
566 static void
567 try_connect_using_address (void *cls, const struct GNUNET_PeerIdentity *target,
568                            const char *plugin_name, const void *plugin_address,
569                            size_t plugin_address_len, struct Session *session,
570                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
571                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
572                            uint32_t ats_count)
573 {
574   struct NeighbourMapEntry *n = cls;
575   int was_connected;
576
577   n->asc = NULL;
578   was_connected = n->is_connected;
579   n->is_connected = GNUNET_YES;
580   GST_neighbours_switch_to_address (target, plugin_name, plugin_address,
581                                     plugin_address_len, session, ats,
582                                     ats_count);
583   if (GNUNET_YES == was_connected)
584     return;
585   connect_notify_cb (callback_cls, target, n->ats, n->ats_count);
586 }
587
588
589 /**
590  * Try to create a connection to the given target (eventually).
591  *
592  * @param target peer to try to connect to
593  */
594 void
595 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
596 {
597   struct NeighbourMapEntry *n;
598
599   GNUNET_assert (neighbours != NULL);
600
601   GNUNET_assert (0 !=
602                  memcmp (target, &GST_my_identity,
603                          sizeof (struct GNUNET_PeerIdentity)));
604   n = lookup_neighbour (target);
605   if ((NULL != n) && (GNUNET_YES == n->is_connected))
606     return;                     /* already connected */
607   if (n == NULL)
608   {
609     n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
610     n->id = *target;
611     GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
612                                    GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
613                                    MAX_BANDWIDTH_CARRY_S);
614     n->timeout_task =
615         GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
616                                       &neighbour_timeout_task, n);
617     GNUNET_assert (GNUNET_OK ==
618                    GNUNET_CONTAINER_multihashmap_put (neighbours,
619                                                       &n->id.hashPubKey, n,
620                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
621   }
622   if (n->asc != NULL)
623     return;                     /* already trying */
624   n->asc =
625       GNUNET_ATS_suggest_address (GST_ats, target, &try_connect_using_address,
626                                   n);
627 }
628
629
630 /**
631  * Test if we're connected to the given peer.
632  *
633  * @param target peer to test
634  * @return GNUNET_YES if we are connected, GNUNET_NO if not
635  */
636 int
637 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
638 {
639   struct NeighbourMapEntry *n;
640
641   GNUNET_assert (neighbours != NULL);
642
643   n = lookup_neighbour (target);
644   if ((NULL == n) || (n->is_connected != GNUNET_YES))
645     return GNUNET_NO;           /* not connected */
646   return GNUNET_YES;
647 }
648
649
650 /**
651  * A session was terminated. Take note.
652  *
653  * @param peer identity of the peer where the session died
654  * @param session session that is gone
655  */
656 void
657 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
658                                    struct Session *session)
659 {
660   struct NeighbourMapEntry *n;
661
662   GNUNET_assert (neighbours != NULL);
663
664   n = lookup_neighbour (peer);
665   if (NULL == n)
666     return;
667   if (session != n->session)
668     return;                     /* doesn't affect us */
669   n->session = NULL;
670   if (GNUNET_YES != n->is_connected)
671     return;                     /* not connected anymore anyway, shouldn't matter */
672
673   GNUNET_SCHEDULER_cancel (n->timeout_task);
674   n->timeout_task =
675       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
676                                     &neighbour_timeout_task, n);
677   /* try QUICKLY to re-establish a connection, reduce timeout! */
678   if (NULL != n->ats)
679   {
680     /* how can this be!? */
681     //GNUNET_break (0);
682     return;
683   }
684   n->asc =
685       GNUNET_ATS_suggest_address (GST_ats, peer, &try_connect_using_address, n);
686 }
687
688
689 /**
690  * Transmit a message to the given target using the active connection.
691  *
692  * @param target destination
693  * @param msg message to send
694  * @param msg_size number of bytes in msg
695  * @param timeout when to fail with timeout
696  * @param cont function to call when done
697  * @param cont_cls closure for 'cont'
698  */
699 void
700 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
701                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
702                      GST_NeighbourSendContinuation cont, void *cont_cls)
703 {
704   struct NeighbourMapEntry *n;
705   struct MessageQueue *mq;
706
707   GNUNET_assert (neighbours != NULL);
708
709   n = lookup_neighbour (target);
710   if ((n == NULL) || (GNUNET_YES != n->is_connected))
711   {
712     GNUNET_STATISTICS_update (GST_stats,
713                               gettext_noop
714                               ("# messages not sent (no such peer or not connected)"),
715                               1, GNUNET_NO);
716 #if DEBUG_TRANSPORT
717     if (n == NULL)
718       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
719                   "Could not send message to peer `%s': unknown neighbor",
720                   GNUNET_i2s (target));
721     if (GNUNET_YES != n->is_connected)
722       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
723                   "Could not send message to peer `%s': not connected\n",
724                   GNUNET_i2s (target));
725 #endif
726     if (NULL != cont)
727       cont (cont_cls, GNUNET_SYSERR);
728     return;
729   }
730   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
731   GNUNET_STATISTICS_update (GST_stats,
732                             gettext_noop
733                             ("# bytes in message queue for other peers"),
734                             msg_size, GNUNET_NO);
735   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
736   mq->cont = cont;
737   mq->cont_cls = cont_cls;
738   /* FIXME: this memcpy can be up to 7% of our total runtime! */
739   memcpy (&mq[1], msg, msg_size);
740   mq->message_buf = (const char *) &mq[1];
741   mq->message_buf_size = msg_size;
742   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
743   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
744   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
745       (NULL == n->is_active))
746     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
747 }
748
749
750 /**
751  * We have received a message from the given sender.  How long should
752  * we delay before receiving more?  (Also used to keep the peer marked
753  * as live).
754  *
755  * @param sender sender of the message
756  * @param size size of the message
757  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
758  *                   GNUNET_NO if the neighbour is not connected or violates the quota
759  * @return how long to wait before reading more from this sender
760  */
761 struct GNUNET_TIME_Relative
762 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
763                                         *sender, ssize_t size, int *do_forward)
764 {
765   struct NeighbourMapEntry *n;
766   struct GNUNET_TIME_Relative ret;
767
768   GNUNET_assert (neighbours != NULL);
769
770   n = lookup_neighbour (sender);
771   if (n == NULL)
772   {
773     *do_forward = GNUNET_NO;
774     return GNUNET_TIME_UNIT_ZERO;
775   }
776   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
777   {
778     n->quota_violation_count++;
779 #if DEBUG_TRANSPORT
780     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
782                 n->in_tracker.available_bytes_per_s__,
783                 n->quota_violation_count);
784 #endif
785     /* Discount 32k per violation */
786     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
787   }
788   else
789   {
790     if (n->quota_violation_count > 0)
791     {
792       /* try to add 32k back */
793       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
794       n->quota_violation_count--;
795     }
796   }
797   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
798   {
799     GNUNET_STATISTICS_update (GST_stats,
800                               gettext_noop
801                               ("# bandwidth quota violations by other peers"),
802                               1, GNUNET_NO);
803     *do_forward = GNUNET_NO;
804     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
805   }
806   *do_forward = GNUNET_YES;
807   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
808   if (ret.rel_value > 0)
809   {
810 #if DEBUG_TRANSPORT
811     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
813                 (unsigned long long) n->in_tracker.
814                 consumption_since_last_update__,
815                 (unsigned int) n->in_tracker.available_bytes_per_s__,
816                 (unsigned long long) ret.rel_value);
817 #endif
818     GNUNET_STATISTICS_update (GST_stats,
819                               gettext_noop ("# ms throttling suggested"),
820                               (int64_t) ret.rel_value, GNUNET_NO);
821   }
822   return ret;
823 }
824
825
826 /**
827  * Keep the connection to the given neighbour alive longer,
828  * we received a KEEPALIVE (or equivalent).
829  *
830  * @param neighbour neighbour to keep alive
831  */
832 void
833 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
834 {
835   struct NeighbourMapEntry *n;
836
837   GNUNET_assert (neighbours != NULL);
838
839   n = lookup_neighbour (neighbour);
840   if (NULL == n)
841   {
842     GNUNET_STATISTICS_update (GST_stats,
843                               gettext_noop
844                               ("# KEEPALIVE messages discarded (not connected)"),
845                               1, GNUNET_NO);
846     return;
847   }
848   GNUNET_SCHEDULER_cancel (n->timeout_task);
849   n->timeout_task =
850       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
851                                     &neighbour_timeout_task, n);
852 }
853
854
855 /**
856  * Change the incoming quota for the given peer.
857  *
858  * @param neighbour identity of peer to change qutoa for
859  * @param quota new quota
860  */
861 void
862 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
863                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
864 {
865   struct NeighbourMapEntry *n;
866
867   GNUNET_assert (neighbours != NULL);
868
869   n = lookup_neighbour (neighbour);
870   if (n == NULL)
871   {
872     GNUNET_STATISTICS_update (GST_stats,
873                               gettext_noop
874                               ("# SET QUOTA messages ignored (no such peer)"),
875                               1, GNUNET_NO);
876     return;
877   }
878   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
879   if (0 != ntohl (quota.value__))
880     return;
881 #if DEBUG_TRANSPORT
882   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
883               GNUNET_i2s (&n->id), "SET_QUOTA");
884 #endif
885   GNUNET_STATISTICS_update (GST_stats,
886                             gettext_noop ("# disconnects due to quota of 0"), 1,
887                             GNUNET_NO);
888   disconnect_neighbour (n);
889 }
890
891
892 /**
893  * Closure for the neighbours_iterate function.
894  */
895 struct IteratorContext
896 {
897   /**
898    * Function to call on each connected neighbour.
899    */
900   GST_NeighbourIterator cb;
901
902   /**
903    * Closure for 'cb'.
904    */
905   void *cb_cls;
906 };
907
908
909 /**
910  * Call the callback from the closure for each connected neighbour.
911  *
912  * @param cls the 'struct IteratorContext'
913  * @param key the hash of the public key of the neighbour
914  * @param value the 'struct NeighbourMapEntry'
915  * @return GNUNET_OK (continue to iterate)
916  */
917 static int
918 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
919 {
920   struct IteratorContext *ic = cls;
921   struct NeighbourMapEntry *n = value;
922
923   if (GNUNET_YES != n->is_connected)
924     return GNUNET_OK;
925   GNUNET_assert (n->ats_count > 0);
926   ic->cb (ic->cb_cls, &n->id, n->ats, n->ats_count - 1);
927   return GNUNET_OK;
928 }
929
930
931 /**
932  * Iterate over all connected neighbours.
933  *
934  * @param cb function to call
935  * @param cb_cls closure for cb
936  */
937 void
938 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
939 {
940   struct IteratorContext ic;
941
942   GNUNET_assert (neighbours != NULL);
943
944   ic.cb = cb;
945   ic.cb_cls = cb_cls;
946   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
947 }
948
949
950 /**
951  * If we have an active connection to the given target, it must be shutdown.
952  *
953  * @param target peer to disconnect from
954  */
955 void
956 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
957 {
958   struct NeighbourMapEntry *n;
959   struct GNUNET_TRANSPORT_PluginFunctions *papi;
960   struct GNUNET_MessageHeader disconnect_msg;
961
962   GNUNET_assert (neighbours != NULL);
963
964   n = lookup_neighbour (target);
965   if (NULL == n)
966     return;                     /* not active */
967   if (GNUNET_YES == n->is_connected)
968   {
969     /* we're actually connected, send DISCONNECT message */
970     disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
971     disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
972     papi = GST_plugins_find (n->plugin_name);
973     if (papi != NULL)
974       papi->send (papi->cls, target, (const void *) &disconnect_msg,
975                   sizeof (struct GNUNET_MessageHeader),
976                   UINT32_MAX /* priority */ ,
977                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
978                   GNUNET_YES, NULL, NULL);
979   }
980   disconnect_neighbour (n);
981 }
982
983
984 /* end of file gnunet-service-transport_neighbours.c */