adding statistics about connected peers
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport.h"
32 #include "gnunet_peerinfo_service.h"
33 #include "gnunet_constants.h"
34 #include "transport.h"
35
36
37 /**
38  * Size of the neighbour hash map.
39  */
40 #define NEIGHBOUR_TABLE_SIZE 256
41
42 /**
43  * How often must a peer violate bandwidth quotas before we start
44  * to simply drop its messages?
45  */
46 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
47
48
49 /**
50  * Entry in neighbours.
51  */
52 struct NeighbourMapEntry;
53
54
55 /**
56  * For each neighbour we keep a list of messages
57  * that we still want to transmit to the neighbour.
58  */
59 struct MessageQueue
60 {
61
62   /**
63    * This is a doubly linked list.
64    */
65   struct MessageQueue *next;
66
67   /**
68    * This is a doubly linked list.
69    */
70   struct MessageQueue *prev;
71
72   /**
73    * Once this message is actively being transmitted, which
74    * neighbour is it associated with?
75    */
76   struct NeighbourMapEntry *n;
77
78   /**
79    * Function to call once we're done.
80    */
81   GST_NeighbourSendContinuation cont;
82
83   /**
84    * Closure for 'cont'
85    */
86   void *cont_cls;
87
88   /**
89    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
90    * stuck together in memory.  Allocated at the end of this struct.
91    */
92   const char *message_buf;
93
94   /**
95    * Size of the message buf
96    */
97   size_t message_buf_size;
98
99   /**
100    * At what time should we fail?
101    */
102   struct GNUNET_TIME_Absolute timeout;
103
104 };
105
106
107 /**
108  * Entry in neighbours.
109  */
110 struct NeighbourMapEntry
111 {
112
113   /**
114    * Head of list of messages we would like to send to this peer;
115    * must contain at most one message per client.
116    */
117   struct MessageQueue *messages_head;
118
119   /**
120    * Tail of list of messages we would like to send to this peer; must
121    * contain at most one message per client.
122    */
123   struct MessageQueue *messages_tail;
124
125   /**
126    * Context for address suggestion.
127    * NULL after we are connected.
128    */
129   struct GNUNET_ATS_SuggestionContext *asc;
130
131   /**
132    * Performance data for the peer.
133    */
134   struct GNUNET_TRANSPORT_ATS_Information *ats;
135
136   /**
137    * Are we currently trying to send a message? If so, which one?
138    */
139   struct MessageQueue *is_active;
140
141   /**
142    * Active session for communicating with the peer.
143    */
144   struct Session *session;
145
146   /**
147    * Name of the plugin we currently use.
148    */
149   char *plugin_name;
150
151   /**
152    * Address used for communicating with the peer, NULL for inbound connections.
153    */
154   void *addr;
155
156   /**
157    * Number of bytes in 'addr'.
158    */
159   size_t addrlen;
160
161   /**
162    * Identity of this neighbour.
163    */
164   struct GNUNET_PeerIdentity id;
165
166   /**
167    * ID of task scheduled to run when this peer is about to
168    * time out (will free resources associated with the peer).
169    */
170   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
171
172   /**
173    * ID of task scheduled to run when we should try transmitting
174    * the head of the message queue.
175    */
176   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
177
178   /**
179    * Tracker for inbound bandwidth.
180    */
181   struct GNUNET_BANDWIDTH_Tracker in_tracker;
182
183   /**
184    * How often has the other peer (recently) violated the inbound
185    * traffic limit?  Incremented by 10 per violation, decremented by 1
186    * per non-violation (for each time interval).
187    */
188   unsigned int quota_violation_count;
189
190   /**
191    * Number of values in 'ats' array.
192    */
193   unsigned int ats_count;
194
195   /**
196    * Are we already in the process of disconnecting this neighbour?
197    */
198   int in_disconnect;
199
200   /**
201    * Do we currently consider this neighbour connected? (as far as
202    * the connect/disconnect callbacks are concerned)?
203    */
204   int is_connected;
205
206 };
207
208
209 /**
210  * All known neighbours and their HELLOs.
211  */
212 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
213
214 /**
215  * Closure for connect_notify_cb and disconnect_notify_cb
216  */
217 static void *callback_cls;
218
219 /**
220  * Function to call when we connected to a neighbour.
221  */
222 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
223
224 /**
225  * Function to call when we disconnected from a neighbour.
226  */
227 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
228
229 /**
230  * counter for connected neighbours
231  */
232 static int neighbours_connected;
233
234 /**
235  * Lookup a neighbour entry in the neighbours hash map.
236  *
237  * @param pid identity of the peer to look up
238  * @return the entry, NULL if there is no existing record
239  */
240 static struct NeighbourMapEntry *
241 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
242 {
243   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
244 }
245
246
247 /**
248  * Task invoked to start a transmission to another peer.
249  *
250  * @param cls the 'struct NeighbourMapEntry'
251  * @param tc scheduler context
252  */
253 static void
254 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
255
256
257 /**
258  * We're done with our transmission attempt, continue processing.
259  *
260  * @param cls the 'struct MessageQueue' of the message
261  * @param receiver intended receiver
262  * @param success whether it worked or not
263  */
264 void
265 transmit_send_continuation (void *cls,
266                             const struct GNUNET_PeerIdentity *receiver,
267                             int success)
268 {
269   struct MessageQueue *mq;
270   struct NeighbourMapEntry *n;
271
272   mq = cls;
273   n = mq->n;
274   if (NULL != n)
275   {
276     GNUNET_assert (n->is_active == mq);
277     n->is_active = NULL;
278     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
279     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
280   }
281   if (NULL != mq->cont)
282     mq->cont (mq->cont_cls, success);
283   GNUNET_free (mq);
284 }
285
286
287 /**
288  * Check the ready list for the given neighbour and if a plugin is
289  * ready for transmission (and if we have a message), do so!
290  *
291  * @param neighbour target peer for which to transmit
292  */
293 static void
294 try_transmission_to_peer (struct NeighbourMapEntry *n)
295 {
296   struct MessageQueue *mq;
297   struct GNUNET_TIME_Relative timeout;
298   ssize_t ret;
299   struct GNUNET_TRANSPORT_PluginFunctions *papi;
300
301   if (n->is_active != NULL)
302     return;                     /* transmission already pending */
303   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
304     return;                     /* currently waiting for bandwidth */
305   mq = n->messages_head;
306   while (NULL != (mq = n->messages_head))
307   {
308     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
309     if (timeout.rel_value > 0)
310       break;
311     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
312   }
313   if (NULL == mq)
314     return;                     /* no more messages */
315
316   papi = GST_plugins_find (n->plugin_name);
317   if (papi == NULL)
318   {
319     GNUNET_break (0);
320     return;
321   }
322   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
323   n->is_active = mq;
324   mq->n = n;
325
326   ret =
327       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
328                   0 /* priority -- remove from plugin API? */ ,
329                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
330                   &transmit_send_continuation, mq);
331   if (ret == -1)
332   {
333     /* failure, but 'send' would not call continuation in this case,
334      * so we need to do it here! */
335     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
336     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
337   }
338 }
339
340
341 /**
342  * Task invoked to start a transmission to another peer.
343  *
344  * @param cls the 'struct NeighbourMapEntry'
345  * @param tc scheduler context
346  */
347 static void
348 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
349 {
350   struct NeighbourMapEntry *n = cls;
351
352   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
353   try_transmission_to_peer (n);
354 }
355
356
357 /**
358  * Initialize the neighbours subsystem.
359  *
360  * @param cls closure for callbacks
361  * @param connect_cb function to call if we connect to a peer
362  * @param disconnect_cb function to call if we disconnect from a peer
363  */
364 void
365 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
366                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
367 {
368   callback_cls = cls;
369   connect_notify_cb = connect_cb;
370   disconnect_notify_cb = disconnect_cb;
371   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
372 }
373
374
375 /**
376  * Disconnect from the given neighbour, clean up the record.
377  *
378  * @param n neighbour to disconnect from
379  */
380 static void
381 disconnect_neighbour (struct NeighbourMapEntry *n)
382 {
383   struct MessageQueue *mq;
384
385   if (GNUNET_YES == n->in_disconnect)
386     return;
387   n->in_disconnect = GNUNET_YES;
388   while (NULL != (mq = n->messages_head))
389   {
390     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
391     mq->cont (mq->cont_cls, GNUNET_SYSERR);
392     GNUNET_free (mq);
393   }
394   if (NULL != n->is_active)
395   {
396     n->is_active->n = NULL;
397     n->is_active = NULL;
398   }
399   if (GNUNET_YES == n->is_connected)
400   {
401     n->is_connected = GNUNET_NO;
402
403     GNUNET_assert (neighbours_connected > 0);
404     neighbours_connected--;
405
406     GNUNET_STATISTICS_update (GST_stats,
407                               gettext_noop
408                               ("# peers connected"),
409                               -1, GNUNET_NO);
410     disconnect_notify_cb (callback_cls, &n->id);
411   }
412   GNUNET_assert (GNUNET_YES ==
413                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
414                                                        &n->id.hashPubKey, n));
415   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
416   {
417     GNUNET_SCHEDULER_cancel (n->timeout_task);
418     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
419   }
420   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
421   {
422     GNUNET_SCHEDULER_cancel (n->timeout_task);
423     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
424   }
425   if (NULL != n->asc)
426   {
427     GNUNET_ATS_suggest_address_cancel (n->asc);
428     n->asc = NULL;
429   }
430   GNUNET_array_grow (n->ats, n->ats_count, 0);
431   if (NULL != n->plugin_name)
432   {
433     GNUNET_free (n->plugin_name);
434     n->plugin_name = NULL;
435   }
436   if (NULL != n->addr)
437   {
438     GNUNET_free (n->addr);
439     n->addr = NULL;
440     n->addrlen = 0;
441   }
442   n->session = NULL;
443   GNUNET_free (n);
444 }
445
446
447 /**
448  * Peer has been idle for too long. Disconnect.
449  *
450  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
451  * @param tc scheduler context
452  */
453 static void
454 neighbour_timeout_task (void *cls,
455                         const struct GNUNET_SCHEDULER_TaskContext *tc)
456 {
457   struct NeighbourMapEntry *n = cls;
458
459   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
460   disconnect_neighbour (n);
461 }
462
463
464 /**
465  * Disconnect from the given neighbour.
466  *
467  * @param cls unused
468  * @param key hash of neighbour's public key (not used)
469  * @param value the 'struct NeighbourMapEntry' of the neighbour
470  */
471 static int
472 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
473 {
474   struct NeighbourMapEntry *n = value;
475
476 #if DEBUG_TRANSPORT
477   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
478               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
479 #endif
480   disconnect_neighbour (n);
481   return GNUNET_OK;
482 }
483
484
485 /**
486  * Cleanup the neighbours subsystem.
487  */
488 void
489 GST_neighbours_stop ()
490 {
491   GNUNET_assert (neighbours != NULL);
492
493   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
494                                          NULL);
495   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
496   GNUNET_assert (neighbours_connected == 0);
497   neighbours = NULL;
498   callback_cls = NULL;
499   connect_notify_cb = NULL;
500   disconnect_notify_cb = NULL;
501 }
502
503
504 /**
505  * For an existing neighbour record, set the active connection to
506  * the given address.
507  *
508  * @param peer identity of the peer to switch the address for
509  * @param plugin_name name of transport that delivered the PONG
510  * @param address address of the other peer, NULL if other peer
511  *                       connected to us
512  * @param address_len number of bytes in address
513  * @param session session to use (or NULL)
514  * @param ats performance data
515  * @param ats_count number of entries in ats (excluding 0-termination)
516  */
517 void
518 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
519                                   const char *plugin_name, const void *address,
520                                   size_t address_len, struct Session *session,
521                                   const struct GNUNET_TRANSPORT_ATS_Information
522                                   *ats, uint32_t ats_count)
523 {
524   struct NeighbourMapEntry *n;
525   struct GNUNET_MessageHeader connect_msg;
526
527   GNUNET_assert (neighbours != NULL);
528
529   n = lookup_neighbour (peer);
530   if (NULL == n)
531   {
532     GNUNET_break (0);
533     return;
534   }
535
536 #if DEBUG_TRANSPORT
537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538               "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
539               GNUNET_i2s (peer), plugin_name,
540               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
541                                                                   address,
542                                                                   address_len),
543               session);
544 #endif
545
546   GNUNET_free_non_null (n->addr);
547   n->addr = GNUNET_malloc (address_len);
548   memcpy (n->addr, address, address_len);
549   n->addrlen = address_len;
550   n->session = session;
551   GNUNET_array_grow (n->ats, n->ats_count, ats_count);
552   memcpy (n->ats, ats,
553           ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
554   GNUNET_free_non_null (n->plugin_name);
555   n->plugin_name = GNUNET_strdup (plugin_name);
556   GNUNET_SCHEDULER_cancel (n->timeout_task);
557   n->timeout_task =
558       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
559                                     &neighbour_timeout_task, n);
560   connect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
561   connect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
562   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
563                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
564 }
565
566
567 /**
568  * Try to connect to the target peer using the given address
569  *
570  * @param cls the 'struct NeighbourMapEntry' of the target
571  * @param target identity of the target peer
572  * @param plugin_name name of the plugin
573  * @param plugin_address binary address
574  * @param plugin_address_len length of address
575  * @param bandwidth available bandwidth
576  * @param ats performance data for the address (as far as known)
577  * @param ats_count number of performance records in 'ats'
578  */
579 static void
580 try_connect_using_address (void *cls, const struct GNUNET_PeerIdentity *target,
581                            const char *plugin_name, const void *plugin_address,
582                            size_t plugin_address_len, struct Session *session,
583                            struct GNUNET_BANDWIDTH_Value32NBO bandwidth,
584                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
585                            uint32_t ats_count)
586 {
587   struct NeighbourMapEntry *n = cls;
588   int was_connected;
589
590   n->asc = NULL;
591   was_connected = n->is_connected;
592   n->is_connected = GNUNET_YES;
593   GST_neighbours_switch_to_address (target, plugin_name, plugin_address,
594                                     plugin_address_len, session, ats,
595                                     ats_count);
596   if (GNUNET_YES == was_connected)
597     return;
598
599   neighbours_connected++;
600   GNUNET_STATISTICS_update (GST_stats,
601                             gettext_noop
602                             ("# peers connected"),
603                             1, GNUNET_NO);
604   connect_notify_cb (callback_cls, target, n->ats, n->ats_count);
605 }
606
607
608 /**
609  * Try to create a connection to the given target (eventually).
610  *
611  * @param target peer to try to connect to
612  */
613 void
614 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
615 {
616   struct NeighbourMapEntry *n;
617
618   GNUNET_assert (neighbours != NULL);
619
620 #if DEBUG_TRANSPORT
621   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
622               GNUNET_i2s (target));
623 #endif
624
625   GNUNET_assert (0 !=
626                  memcmp (target, &GST_my_identity,
627                          sizeof (struct GNUNET_PeerIdentity)));
628   n = lookup_neighbour (target);
629   if ((NULL != n) && (GNUNET_YES == n->is_connected))
630     return;                     /* already connected */
631   if (n == NULL)
632   {
633 #if DEBUG_TRANSPORT
634     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635                 "Unknown peer `%s', creating new neighbour\n",
636                 GNUNET_i2s (target));
637 #endif
638     n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
639     n->id = *target;
640     GNUNET_array_grow (n->ats, n->ats_count, 1);
641     n->ats[0].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);;
642     n->ats[0].value = htonl (0);
643     GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
644                                    GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
645                                    MAX_BANDWIDTH_CARRY_S);
646     n->timeout_task =
647         GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
648                                       &neighbour_timeout_task, n);
649     GNUNET_assert (GNUNET_OK ==
650                    GNUNET_CONTAINER_multihashmap_put (neighbours,
651                                                       &n->id.hashPubKey, n,
652                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
653   }
654   if (n->asc != NULL)
655     return;                     /* already trying */
656 #if DEBUG_TRANSPORT
657   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
658               "Asking ATS for suggested address to connect to peer `%s'\n",
659               GNUNET_i2s (target));
660 #endif
661   n->asc =
662       GNUNET_ATS_suggest_address (GST_ats, target, &try_connect_using_address,
663                                   n);
664 }
665
666
667 /**
668  * Test if we're connected to the given peer.
669  *
670  * @param target peer to test
671  * @return GNUNET_YES if we are connected, GNUNET_NO if not
672  */
673 int
674 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
675 {
676   struct NeighbourMapEntry *n;
677
678   GNUNET_assert (neighbours != NULL);
679
680   n = lookup_neighbour (target);
681   if ((NULL == n) || (n->is_connected != GNUNET_YES))
682     return GNUNET_NO;           /* not connected */
683   return GNUNET_YES;
684 }
685
686
687 /**
688  * A session was terminated. Take note.
689  *
690  * @param peer identity of the peer where the session died
691  * @param session session that is gone
692  */
693 void
694 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
695                                    struct Session *session)
696 {
697   struct NeighbourMapEntry *n;
698
699   GNUNET_assert (neighbours != NULL);
700
701   n = lookup_neighbour (peer);
702   if (NULL == n)
703     return;
704   if (session != n->session)
705     return;                     /* doesn't affect us */
706   n->session = NULL;
707   if (GNUNET_YES != n->is_connected)
708     return;                     /* not connected anymore anyway, shouldn't matter */
709
710   GNUNET_SCHEDULER_cancel (n->timeout_task);
711   n->timeout_task =
712       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
713                                     &neighbour_timeout_task, n);
714   /* try QUICKLY to re-establish a connection, reduce timeout! */
715   if (NULL != n->ats)
716   {
717     /* how can this be!? */
718     //GNUNET_break (0);
719     return;
720   }
721   n->asc =
722       GNUNET_ATS_suggest_address (GST_ats, peer, &try_connect_using_address, n);
723 }
724
725
726 /**
727  * Transmit a message to the given target using the active connection.
728  *
729  * @param target destination
730  * @param msg message to send
731  * @param msg_size number of bytes in msg
732  * @param timeout when to fail with timeout
733  * @param cont function to call when done
734  * @param cont_cls closure for 'cont'
735  */
736 void
737 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
738                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
739                      GST_NeighbourSendContinuation cont, void *cont_cls)
740 {
741   struct NeighbourMapEntry *n;
742   struct MessageQueue *mq;
743
744   GNUNET_assert (neighbours != NULL);
745
746   n = lookup_neighbour (target);
747   if ((n == NULL) || (GNUNET_YES != n->is_connected))
748   {
749     GNUNET_STATISTICS_update (GST_stats,
750                               gettext_noop
751                               ("# messages not sent (no such peer or not connected)"),
752                               1, GNUNET_NO);
753 #if DEBUG_TRANSPORT
754     if (n == NULL)
755       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
756                   "Could not send message to peer `%s': unknown neighbor",
757                   GNUNET_i2s (target));
758     if (GNUNET_YES != n->is_connected)
759       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
760                   "Could not send message to peer `%s': not connected\n",
761                   GNUNET_i2s (target));
762 #endif
763     if (NULL != cont)
764       cont (cont_cls, GNUNET_SYSERR);
765     return;
766   }
767
768   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
769   GNUNET_STATISTICS_update (GST_stats,
770                             gettext_noop
771                             ("# bytes in message queue for other peers"),
772                             msg_size, GNUNET_NO);
773   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
774   mq->cont = cont;
775   mq->cont_cls = cont_cls;
776   /* FIXME: this memcpy can be up to 7% of our total runtime! */
777   memcpy (&mq[1], msg, msg_size);
778   mq->message_buf = (const char *) &mq[1];
779   mq->message_buf_size = msg_size;
780   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
781   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
782   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
783       (NULL == n->is_active))
784     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
785 }
786
787
788 /**
789  * We have received a message from the given sender.  How long should
790  * we delay before receiving more?  (Also used to keep the peer marked
791  * as live).
792  *
793  * @param sender sender of the message
794  * @param size size of the message
795  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
796  *                   GNUNET_NO if the neighbour is not connected or violates the quota
797  * @return how long to wait before reading more from this sender
798  */
799 struct GNUNET_TIME_Relative
800 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
801                                         *sender, ssize_t size, int *do_forward)
802 {
803   struct NeighbourMapEntry *n;
804   struct GNUNET_TIME_Relative ret;
805
806   GNUNET_assert (neighbours != NULL);
807
808   n = lookup_neighbour (sender);
809   if (n == NULL)
810   {
811     *do_forward = GNUNET_NO;
812     return GNUNET_TIME_UNIT_ZERO;
813   }
814   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
815   {
816     n->quota_violation_count++;
817 #if DEBUG_TRANSPORT
818     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
819                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
820                 n->in_tracker.available_bytes_per_s__,
821                 n->quota_violation_count);
822 #endif
823     /* Discount 32k per violation */
824     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
825   }
826   else
827   {
828     if (n->quota_violation_count > 0)
829     {
830       /* try to add 32k back */
831       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
832       n->quota_violation_count--;
833     }
834   }
835   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
836   {
837     GNUNET_STATISTICS_update (GST_stats,
838                               gettext_noop
839                               ("# bandwidth quota violations by other peers"),
840                               1, GNUNET_NO);
841     *do_forward = GNUNET_NO;
842     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
843   }
844   *do_forward = GNUNET_YES;
845   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
846   if (ret.rel_value > 0)
847   {
848 #if DEBUG_TRANSPORT
849     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
851                 (unsigned long long) n->in_tracker.
852                 consumption_since_last_update__,
853                 (unsigned int) n->in_tracker.available_bytes_per_s__,
854                 (unsigned long long) ret.rel_value);
855 #endif
856     GNUNET_STATISTICS_update (GST_stats,
857                               gettext_noop ("# ms throttling suggested"),
858                               (int64_t) ret.rel_value, GNUNET_NO);
859   }
860   return ret;
861 }
862
863
864 /**
865  * Keep the connection to the given neighbour alive longer,
866  * we received a KEEPALIVE (or equivalent).
867  *
868  * @param neighbour neighbour to keep alive
869  */
870 void
871 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
872 {
873   struct NeighbourMapEntry *n;
874
875   GNUNET_assert (neighbours != NULL);
876
877   n = lookup_neighbour (neighbour);
878   if (NULL == n)
879   {
880     GNUNET_STATISTICS_update (GST_stats,
881                               gettext_noop
882                               ("# KEEPALIVE messages discarded (not connected)"),
883                               1, GNUNET_NO);
884     return;
885   }
886   GNUNET_SCHEDULER_cancel (n->timeout_task);
887   n->timeout_task =
888       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
889                                     &neighbour_timeout_task, n);
890 }
891
892
893 /**
894  * Change the incoming quota for the given peer.
895  *
896  * @param neighbour identity of peer to change qutoa for
897  * @param quota new quota
898  */
899 void
900 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
901                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
902 {
903   struct NeighbourMapEntry *n;
904
905   GNUNET_assert (neighbours != NULL);
906
907   n = lookup_neighbour (neighbour);
908   if (n == NULL)
909   {
910     GNUNET_STATISTICS_update (GST_stats,
911                               gettext_noop
912                               ("# SET QUOTA messages ignored (no such peer)"),
913                               1, GNUNET_NO);
914     return;
915   }
916   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
917   if (0 != ntohl (quota.value__))
918     return;
919 #if DEBUG_TRANSPORT
920   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
921               GNUNET_i2s (&n->id), "SET_QUOTA");
922 #endif
923   GNUNET_STATISTICS_update (GST_stats,
924                             gettext_noop ("# disconnects due to quota of 0"), 1,
925                             GNUNET_NO);
926   disconnect_neighbour (n);
927 }
928
929
930 /**
931  * Closure for the neighbours_iterate function.
932  */
933 struct IteratorContext
934 {
935   /**
936    * Function to call on each connected neighbour.
937    */
938   GST_NeighbourIterator cb;
939
940   /**
941    * Closure for 'cb'.
942    */
943   void *cb_cls;
944 };
945
946
947 /**
948  * Call the callback from the closure for each connected neighbour.
949  *
950  * @param cls the 'struct IteratorContext'
951  * @param key the hash of the public key of the neighbour
952  * @param value the 'struct NeighbourMapEntry'
953  * @return GNUNET_OK (continue to iterate)
954  */
955 static int
956 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
957 {
958   struct IteratorContext *ic = cls;
959   struct NeighbourMapEntry *n = value;
960
961   if (GNUNET_YES != n->is_connected)
962     return GNUNET_OK;
963
964   GNUNET_assert (n->ats_count > 0);
965   ic->cb (ic->cb_cls, &n->id, n->ats, n->ats_count);
966   return GNUNET_OK;
967 }
968
969
970 /**
971  * Iterate over all connected neighbours.
972  *
973  * @param cb function to call
974  * @param cb_cls closure for cb
975  */
976 void
977 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
978 {
979   struct IteratorContext ic;
980
981   GNUNET_assert (neighbours != NULL);
982
983   ic.cb = cb;
984   ic.cb_cls = cb_cls;
985   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
986 }
987
988
989 /**
990  * If we have an active connection to the given target, it must be shutdown.
991  *
992  * @param target peer to disconnect from
993  */
994 void
995 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
996 {
997   struct NeighbourMapEntry *n;
998   struct GNUNET_TRANSPORT_PluginFunctions *papi;
999   struct GNUNET_MessageHeader disconnect_msg;
1000
1001   GNUNET_assert (neighbours != NULL);
1002
1003   n = lookup_neighbour (target);
1004   if (NULL == n)
1005     return;                     /* not active */
1006   if (GNUNET_YES == n->is_connected)
1007   {
1008     /* we're actually connected, send DISCONNECT message */
1009     disconnect_msg.size = htons (sizeof (struct GNUNET_MessageHeader));
1010     disconnect_msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
1011     papi = GST_plugins_find (n->plugin_name);
1012     if (papi != NULL)
1013       papi->send (papi->cls, target, (const void *) &disconnect_msg,
1014                   sizeof (struct GNUNET_MessageHeader),
1015                   UINT32_MAX /* priority */ ,
1016                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
1017                   GNUNET_YES, NULL, NULL);
1018   }
1019   disconnect_neighbour (n);
1020 }
1021
1022
1023 /* end of file gnunet-service-transport_neighbours.c */