330991d04c27434ef3e152dfdbcd00cd32b80399
[oweals/gnunet.git] / src / transport / gnunet-service-transport_neighbours.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/gnunet-service-transport_neighbours.c
23  * @brief neighbour management
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "gnunet-service-transport_neighbours.h"
29 #include "gnunet-service-transport_plugins.h"
30 #include "gnunet-service-transport_validation.h"
31 #include "gnunet-service-transport_clients.h"
32 #include "gnunet-service-transport.h"
33 #include "gnunet_peerinfo_service.h"
34 #include "gnunet_constants.h"
35 #include "transport.h"
36
37
38 /**
39  * Size of the neighbour hash map.
40  */
41 #define NEIGHBOUR_TABLE_SIZE 256
42
43 /**
44  * How often must a peer violate bandwidth quotas before we start
45  * to simply drop its messages?
46  */
47 #define QUOTA_VIOLATION_DROP_THRESHOLD 10
48
49 /**
50  * How often do we send KEEPALIVE messages to each of our neighbours?
51  * (idle timeout is 5 minutes or 300 seconds, so with 90s interval we
52  * send 3 keepalives in each interval, so 3 messages would need to be
53  * lost in a row for a disconnect).
54  */
55 #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90)
56
57
58 /**
59  * Entry in neighbours.
60  */
61 struct NeighbourMapEntry;
62
63 /**
64  * Message a peer sends to another to indicate its
65  * preference for communicating via a particular
66  * session (and the desire to establish a real
67  * connection).
68  */
69 struct SessionConnectMessage
70 {
71   /**
72    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT'
73    */
74   struct GNUNET_MessageHeader header;
75
76   /**
77    * Always zero.
78    */
79   uint32_t reserved GNUNET_PACKED;
80
81   /**
82    * Absolute time at the sender.  Only the most recent connect
83    * message implies which session is preferred by the sender.
84    */
85   struct GNUNET_TIME_AbsoluteNBO timestamp;
86
87 };
88
89
90 struct SessionDisconnectMessage
91 {
92   /**
93    * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT'
94    */
95   struct GNUNET_MessageHeader header;
96
97   /**
98    * Always zero.
99    */
100   uint32_t reserved GNUNET_PACKED;
101
102   /**
103    * Purpose of the signature.  Extends over the timestamp.
104    * Purpose should be GNUNET_SIGNATURE_PURPOSE_TRANSPORT_DISCONNECT.
105    */
106   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
107
108   /**
109    * Absolute time at the sender.  Only the most recent connect
110    * message implies which session is preferred by the sender.
111    */
112   struct GNUNET_TIME_AbsoluteNBO timestamp;
113
114   /**
115    * Signature of the peer that sends us the disconnect.  Only
116    * valid if the timestamp is AFTER the timestamp from the
117    * corresponding 'CONNECT' message.
118    */
119   struct GNUNET_CRYPTO_RsaSignature signature;
120 };
121
122
123 /**
124  * For each neighbour we keep a list of messages
125  * that we still want to transmit to the neighbour.
126  */
127 struct MessageQueue
128 {
129
130   /**
131    * This is a doubly linked list.
132    */
133   struct MessageQueue *next;
134
135   /**
136    * This is a doubly linked list.
137    */
138   struct MessageQueue *prev;
139
140   /**
141    * Once this message is actively being transmitted, which
142    * neighbour is it associated with?
143    */
144   struct NeighbourMapEntry *n;
145
146   /**
147    * Function to call once we're done.
148    */
149   GST_NeighbourSendContinuation cont;
150
151   /**
152    * Closure for 'cont'
153    */
154   void *cont_cls;
155
156   /**
157    * The message(s) we want to transmit, GNUNET_MessageHeader(s)
158    * stuck together in memory.  Allocated at the end of this struct.
159    */
160   const char *message_buf;
161
162   /**
163    * Size of the message buf
164    */
165   size_t message_buf_size;
166
167   /**
168    * At what time should we fail?
169    */
170   struct GNUNET_TIME_Absolute timeout;
171
172 };
173
174
175 /**
176  * Entry in neighbours.
177  */
178 struct NeighbourMapEntry
179 {
180
181   /**
182    * Head of list of messages we would like to send to this peer;
183    * must contain at most one message per client.
184    */
185   struct MessageQueue *messages_head;
186
187   /**
188    * Tail of list of messages we would like to send to this peer; must
189    * contain at most one message per client.
190    */
191   struct MessageQueue *messages_tail;
192
193   /**
194    * Performance data for the peer.
195    */
196   //struct GNUNET_ATS_Information *ats;
197
198   /**
199    * Are we currently trying to send a message? If so, which one?
200    */
201   struct MessageQueue *is_active;
202
203   /**
204    * Active session for communicating with the peer.
205    */
206   struct Session *session;
207
208   /**
209    * Name of the plugin we currently use.
210    */
211   char *plugin_name;
212
213   /**
214    * Address used for communicating with the peer, NULL for inbound connections.
215    */
216   void *addr;
217
218   /**
219    * Number of bytes in 'addr'.
220    */
221   size_t addrlen;
222
223   /**
224    * Identity of this neighbour.
225    */
226   struct GNUNET_PeerIdentity id;
227
228   /**
229    * ID of task scheduled to run when this peer is about to
230    * time out (will free resources associated with the peer).
231    */
232   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
233
234   /**
235    * ID of task scheduled to send keepalives.
236    */
237   GNUNET_SCHEDULER_TaskIdentifier keepalive_task;
238
239   /**
240    * ID of task scheduled to run when we should try transmitting
241    * the head of the message queue.
242    */
243   GNUNET_SCHEDULER_TaskIdentifier transmission_task;
244
245   /**
246    * Tracker for inbound bandwidth.
247    */
248   struct GNUNET_BANDWIDTH_Tracker in_tracker;
249
250   /**
251    * Timestamp of the 'SESSION_CONNECT' message we got from the other peer
252    */
253   struct GNUNET_TIME_Absolute connect_ts;
254
255   /**
256    * How often has the other peer (recently) violated the inbound
257    * traffic limit?  Incremented by 10 per violation, decremented by 1
258    * per non-violation (for each time interval).
259    */
260   unsigned int quota_violation_count;
261
262   /**
263    * Number of values in 'ats' array.
264    */
265   //unsigned int ats_count;
266
267   /**
268    * Are we already in the process of disconnecting this neighbour?
269    */
270   int in_disconnect;
271
272   /**
273    * Do we currently consider this neighbour connected? (as far as
274    * the connect/disconnect callbacks are concerned)?
275    */
276   int is_connected;
277
278 };
279
280
281 /**
282  * All known neighbours and their HELLOs.
283  */
284 static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
285
286 /**
287  * Closure for connect_notify_cb and disconnect_notify_cb
288  */
289 static void *callback_cls;
290
291 /**
292  * Function to call when we connected to a neighbour.
293  */
294 static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb;
295
296 /**
297  * Function to call when we disconnected from a neighbour.
298  */
299 static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb;
300
301 /**
302  * counter for connected neighbours
303  */
304 static int neighbours_connected;
305
306 /**
307  * Lookup a neighbour entry in the neighbours hash map.
308  *
309  * @param pid identity of the peer to look up
310  * @return the entry, NULL if there is no existing record
311  */
312 static struct NeighbourMapEntry *
313 lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
314 {
315   return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey);
316 }
317
318
319 /**
320  * Task invoked to start a transmission to another peer.
321  *
322  * @param cls the 'struct NeighbourMapEntry'
323  * @param tc scheduler context
324  */
325 static void
326 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
327
328
329 /**
330  * We're done with our transmission attempt, continue processing.
331  *
332  * @param cls the 'struct MessageQueue' of the message
333  * @param receiver intended receiver
334  * @param success whether it worked or not
335  */
336 static void
337 transmit_send_continuation (void *cls,
338                             const struct GNUNET_PeerIdentity *receiver,
339                             int success)
340 {
341   struct MessageQueue *mq;
342   struct NeighbourMapEntry *n;
343
344   mq = cls;
345   n = mq->n;
346   if (NULL != n)
347   {
348     GNUNET_assert (n->is_active == mq);
349     n->is_active = NULL;
350     GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK);
351     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
352   }
353   if (NULL != mq->cont)
354     mq->cont (mq->cont_cls, success);
355   GNUNET_free (mq);
356 }
357
358
359 /**
360  * Check the ready list for the given neighbour and if a plugin is
361  * ready for transmission (and if we have a message), do so!
362  *
363  * @param n target peer for which to transmit
364  */
365 static void
366 try_transmission_to_peer (struct NeighbourMapEntry *n)
367 {
368   struct MessageQueue *mq;
369   struct GNUNET_TIME_Relative timeout;
370   ssize_t ret;
371   struct GNUNET_TRANSPORT_PluginFunctions *papi;
372
373   if (n->is_active != NULL)
374     return;                     /* transmission already pending */
375   if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK)
376     return;                     /* currently waiting for bandwidth */
377   while (NULL != (mq = n->messages_head))
378   {
379     timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout);
380     if (timeout.rel_value > 0)
381       break;
382     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
383     n->is_active = mq;
384     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);     /* timeout */
385   }
386   if (NULL == mq)
387     return;                     /* no more messages */
388
389   papi = GST_plugins_find (n->plugin_name);
390   if (papi == NULL)
391   {
392     GNUNET_break (0);
393     return;
394   }
395   GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
396   n->is_active = mq;
397   mq->n = n;
398
399   if  (((n->session == NULL) && (n->addr == NULL) && (n->addrlen == 0)))
400   {
401     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No address peer for peer `%s'\n",
402                 GNUNET_i2s (&n->id));
403     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
404     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
405     return;
406   }
407
408   ret =
409       papi->send (papi->cls, &n->id, mq->message_buf, mq->message_buf_size,
410                   0 /* priority -- remove from plugin API? */ ,
411                   timeout, n->session, n->addr, n->addrlen, GNUNET_YES,
412                   &transmit_send_continuation, mq);
413   if (ret == -1)
414   {
415     /* failure, but 'send' would not call continuation in this case,
416      * so we need to do it here! */
417     transmit_send_continuation (mq, &n->id, GNUNET_SYSERR);
418     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
419   }
420 }
421
422
423 /**
424  * Task invoked to start a transmission to another peer.
425  *
426  * @param cls the 'struct NeighbourMapEntry'
427  * @param tc scheduler context
428  */
429 static void
430 transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
431 {
432   struct NeighbourMapEntry *n = cls;
433
434   n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
435   try_transmission_to_peer (n);
436 }
437
438
439 /**
440  * Initialize the neighbours subsystem.
441  *
442  * @param cls closure for callbacks
443  * @param connect_cb function to call if we connect to a peer
444  * @param disconnect_cb function to call if we disconnect from a peer
445  */
446 void
447 GST_neighbours_start (void *cls, GNUNET_TRANSPORT_NotifyConnect connect_cb,
448                       GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb)
449 {
450   callback_cls = cls;
451   connect_notify_cb = connect_cb;
452   disconnect_notify_cb = disconnect_cb;
453   neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE);
454 }
455
456
457 /**
458  * Disconnect from the given neighbour, clean up the record.
459  *
460  * @param n neighbour to disconnect from
461  */
462 static void
463 disconnect_neighbour (struct NeighbourMapEntry *n)
464 {
465   struct MessageQueue *mq;
466
467   if (GNUNET_YES == n->in_disconnect)
468     return;
469   n->in_disconnect = GNUNET_YES;
470   while (NULL != (mq = n->messages_head))
471   {
472     GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq);
473     if (NULL != mq->cont)
474       mq->cont (mq->cont_cls, GNUNET_SYSERR);
475     GNUNET_free (mq);
476   }
477   if (NULL != n->is_active)
478   {
479     n->is_active->n = NULL;
480     n->is_active = NULL;
481   }
482   if (GNUNET_YES == n->is_connected)
483   {
484     n->is_connected = GNUNET_NO;
485     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task);
486     GNUNET_SCHEDULER_cancel (n->keepalive_task);
487     n->keepalive_task = GNUNET_SCHEDULER_NO_TASK;  
488     GNUNET_assert (neighbours_connected > 0);
489     neighbours_connected--;
490     GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1,
491                               GNUNET_NO);
492     disconnect_notify_cb (callback_cls, &n->id);
493   }
494   GNUNET_assert (GNUNET_YES ==
495                  GNUNET_CONTAINER_multihashmap_remove (neighbours,
496                                                        &n->id.hashPubKey, n));
497   if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task)
498   {
499     GNUNET_SCHEDULER_cancel (n->timeout_task);
500     n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
501   }
502   if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task)
503   {
504     GNUNET_SCHEDULER_cancel (n->transmission_task);
505     n->transmission_task = GNUNET_SCHEDULER_NO_TASK;
506   }
507   if (NULL != n->plugin_name)
508   {
509     GNUNET_free (n->plugin_name);
510     n->plugin_name = NULL;
511   }
512   if (NULL != n->addr)
513   {
514     GNUNET_free (n->addr);
515     n->addr = NULL;
516     n->addrlen = 0;
517   }
518   n->session = NULL;
519   GNUNET_free (n);
520 }
521
522
523 /**
524  * Peer has been idle for too long. Disconnect.
525  *
526  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
527  * @param tc scheduler context
528  */
529 static void
530 neighbour_timeout_task (void *cls,
531                         const struct GNUNET_SCHEDULER_TaskContext *tc)
532 {
533   struct NeighbourMapEntry *n = cls;
534
535   n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
536   if (GNUNET_YES == n->is_connected)
537     GNUNET_STATISTICS_update (GST_stats,
538                             gettext_noop ("# peers disconnected due to timeout"), 1,
539                             GNUNET_NO);
540   disconnect_neighbour (n);
541 }
542
543
544 /**
545  * Send another keepalive message.
546  *
547  * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle
548  * @param tc scheduler context
549  */
550 static void
551 neighbour_keepalive_task (void *cls,
552                           const struct GNUNET_SCHEDULER_TaskContext *tc)
553 {
554   struct NeighbourMapEntry *n = cls;
555   struct GNUNET_MessageHeader m;
556   struct GNUNET_TRANSPORT_PluginFunctions *papi;
557
558   n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
559                                                     &neighbour_keepalive_task,
560                                                     n);
561   GNUNET_assert (GNUNET_YES == n->is_connected);
562   GNUNET_STATISTICS_update (GST_stats,
563                             gettext_noop ("# keepalives sent"), 1,
564                             GNUNET_NO);
565   m.size = htons (sizeof (struct GNUNET_MessageHeader));
566   m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE);
567   papi = GST_plugins_find (n->plugin_name);
568   if (papi != NULL)
569     papi->send (papi->cls, 
570                 &n->id, (const void *) &m,
571                 sizeof (m),
572                 UINT32_MAX /* priority */ ,
573                 GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
574                 GNUNET_YES, NULL, NULL);
575 }
576
577
578 /**
579  * Disconnect from the given neighbour.
580  *
581  * @param cls unused
582  * @param key hash of neighbour's public key (not used)
583  * @param value the 'struct NeighbourMapEntry' of the neighbour
584  */
585 static int
586 disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value)
587 {
588   struct NeighbourMapEntry *n = value;
589
590 #if DEBUG_TRANSPORT
591   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n",
592               GNUNET_i2s (&n->id), "SHUTDOWN_TASK");
593 #endif
594   if (GNUNET_YES == n->is_connected)
595     GNUNET_STATISTICS_update (GST_stats,
596                               gettext_noop ("# peers disconnected due to global disconnect"), 1,
597                               GNUNET_NO);
598   disconnect_neighbour (n);
599   return GNUNET_OK;
600 }
601
602
603 /**
604  * Cleanup the neighbours subsystem.
605  */
606 void
607 GST_neighbours_stop ()
608 {
609   GNUNET_assert (neighbours != NULL);
610
611   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours,
612                                          NULL);
613   GNUNET_CONTAINER_multihashmap_destroy (neighbours);
614   GNUNET_assert (neighbours_connected == 0);
615   neighbours = NULL;
616   callback_cls = NULL;
617   connect_notify_cb = NULL;
618   disconnect_notify_cb = NULL;
619 }
620
621
622 /**
623  * For an existing neighbour record, set the active connection to
624  * the given address.
625  *
626  * @param peer identity of the peer to switch the address for
627  * @param plugin_name name of transport that delivered the PONG
628  * @param address address of the other peer, NULL if other peer
629  *                       connected to us
630  * @param address_len number of bytes in address
631  * @param session session to use (or NULL)
632  * @param ats performance data
633  * @param ats_count number of entries in ats (excluding 0-termination)
634  */
635 void
636 GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer,
637                                   const char *plugin_name, const void *address,
638                                   size_t address_len, struct Session *session,
639                                   const struct GNUNET_ATS_Information
640                                   *ats, uint32_t ats_count)
641 {
642   struct NeighbourMapEntry *n;
643   struct SessionConnectMessage connect_msg;
644   int was_connected;
645
646   GNUNET_assert (neighbours != NULL);
647   n = lookup_neighbour (peer);
648   if (NULL == n)
649   {
650     if (NULL != session)
651       GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
652                        "transport-ats",
653                        "Telling ATS to destroy session %p of peer %s\n",
654                        session,                
655                        GNUNET_i2s (peer));
656     GNUNET_ATS_address_destroyed (GST_ats,
657                                   peer,
658                                   plugin_name, address,
659                                   address_len, session);
660     return;
661   }
662   was_connected = n->is_connected;
663   n->is_connected = GNUNET_YES;
664   if (GNUNET_YES != was_connected)
665     n->keepalive_task = GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY,
666                                                       &neighbour_keepalive_task,
667                                                       n);
668
669 #if DEBUG_TRANSPORT
670   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
671               "SWITCH! Peer `%4s' switches to plugin `%s' address '%s' session %X\n",
672               GNUNET_i2s (peer), plugin_name,
673               (address_len == 0) ? "<inbound>" : GST_plugins_a2s (plugin_name,
674                                                                   address,
675                                                                   address_len),
676               session);
677 #endif
678   GNUNET_free_non_null (n->addr);
679   n->addr = GNUNET_malloc (address_len);
680   memcpy (n->addr, address, address_len);
681   n->addrlen = address_len;
682   n->session = session;
683   GNUNET_free_non_null (n->plugin_name);
684   n->plugin_name = GNUNET_strdup (plugin_name);
685   GNUNET_SCHEDULER_cancel (n->timeout_task);
686   n->timeout_task =
687       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
688                                     &neighbour_timeout_task, n);
689   connect_msg.header.size = htons (sizeof (struct SessionConnectMessage));
690   connect_msg.header.type =
691       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT);
692   connect_msg.reserved = htonl (0);
693   connect_msg.timestamp =
694       GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
695   GST_neighbours_send (peer, &connect_msg, sizeof (connect_msg),
696                        GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
697   if (GNUNET_YES == was_connected)
698     return;
699   /* First tell clients about connected neighbours...*/
700   neighbours_connected++;
701   GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1,
702                             GNUNET_NO);
703   connect_notify_cb (callback_cls, peer, ats, ats_count);
704 }
705
706
707 /**
708  * Create an entry in the neighbour map for the given peer
709  * 
710  * @param peer peer to create an entry for
711  * @return new neighbour map entry
712  */
713 static struct NeighbourMapEntry *
714 setup_neighbour (const struct GNUNET_PeerIdentity *peer)
715 {
716   struct NeighbourMapEntry *n;
717
718 #if DEBUG_TRANSPORT
719   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720               "Unknown peer `%s', creating new neighbour\n",
721               GNUNET_i2s (peer));
722 #endif
723   n = GNUNET_malloc (sizeof (struct NeighbourMapEntry));
724   n->id = *peer;
725   GNUNET_BANDWIDTH_tracker_init (&n->in_tracker,
726                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
727                                  MAX_BANDWIDTH_CARRY_S);
728   n->timeout_task =
729     GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
730                                   &neighbour_timeout_task, n);
731   GNUNET_assert (GNUNET_OK ==
732                  GNUNET_CONTAINER_multihashmap_put (neighbours,
733                                                     &n->id.hashPubKey, n,
734                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
735   return n;
736 }
737
738
739 /**
740  * Try to create a connection to the given target (eventually).
741  *
742  * @param target peer to try to connect to
743  */
744 void
745 GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target)
746 {
747   struct NeighbourMapEntry *n;
748
749   GNUNET_assert (neighbours != NULL);
750 #if DEBUG_TRANSPORT
751   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n",
752               GNUNET_i2s (target));
753 #endif
754   GNUNET_assert (0 !=
755                  memcmp (target, &GST_my_identity,
756                          sizeof (struct GNUNET_PeerIdentity)));
757   n = lookup_neighbour (target);
758   if ((NULL != n) && (GNUNET_YES == n->is_connected))
759     return;                     /* already connected */
760   if (n == NULL)
761     n = setup_neighbour (target);
762 #if DEBUG_TRANSPORT
763   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764               "Asking ATS for suggested address to connect to peer `%s'\n",
765               GNUNET_i2s (&n->id));
766 #endif
767    GNUNET_ATS_suggest_address (GST_ats, &n->id);
768 }
769
770
771 /**
772  * Test if we're connected to the given peer.
773  *
774  * @param target peer to test
775  * @return GNUNET_YES if we are connected, GNUNET_NO if not
776  */
777 int
778 GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target)
779 {
780   struct NeighbourMapEntry *n;
781
782   GNUNET_assert (neighbours != NULL);
783
784   n = lookup_neighbour (target);
785   if ((NULL == n) || (n->is_connected != GNUNET_YES))
786     return GNUNET_NO;           /* not connected */
787   return GNUNET_YES;
788 }
789
790
791 /**
792  * A session was terminated. Take note.
793  *
794  * @param peer identity of the peer where the session died
795  * @param session session that is gone
796  */
797 void
798 GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer,
799                                    struct Session *session)
800 {
801   struct NeighbourMapEntry *n;
802
803   GNUNET_assert (neighbours != NULL);
804
805 #if DEBUG_TRANSPORT
806   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807               "Session %X to peer `%s' ended \n",
808               session, GNUNET_i2s (peer));
809 #endif
810   if (NULL != session)
811     GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
812                      "transport-ats",
813                      "Telling ATS to destroy session %p of peer %s\n",
814                      session,               
815                      GNUNET_i2s (peer));
816   GNUNET_ATS_address_destroyed (GST_ats,
817                                 peer,
818                                 NULL, NULL, 0,
819                                 session);
820   n = lookup_neighbour (peer);
821   if (NULL == n)
822     return;
823   if (session != n->session)
824     return;                     /* doesn't affect us */
825
826   n->session = NULL;
827   GNUNET_free (n->addr);
828   n->addr = NULL;
829   n->addrlen = 0;
830
831
832   if (GNUNET_YES != n->is_connected)
833     return;                     /* not connected anymore anyway, shouldn't matter */
834   /* fast disconnect unless ATS suggests a new address */
835   GNUNET_SCHEDULER_cancel (n->timeout_task);
836   n->timeout_task =
837       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT,
838                                     &neighbour_timeout_task, n);
839   /* try QUICKLY to re-establish a connection, reduce timeout! */
840   GNUNET_ATS_suggest_address (GST_ats, peer);
841 }
842
843
844 /**
845  * Transmit a message to the given target using the active connection.
846  *
847  * @param target destination
848  * @param msg message to send
849  * @param msg_size number of bytes in msg
850  * @param timeout when to fail with timeout
851  * @param cont function to call when done
852  * @param cont_cls closure for 'cont'
853  */
854 void
855 GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg,
856                      size_t msg_size, struct GNUNET_TIME_Relative timeout,
857                      GST_NeighbourSendContinuation cont, void *cont_cls)
858 {
859   struct NeighbourMapEntry *n;
860   struct MessageQueue *mq;
861
862   GNUNET_assert (neighbours != NULL);
863
864   n = lookup_neighbour (target);
865   if ((n == NULL) || (GNUNET_YES != n->is_connected))
866   {
867     GNUNET_STATISTICS_update (GST_stats,
868                               gettext_noop
869                               ("# messages not sent (no such peer or not connected)"),
870                               1, GNUNET_NO);
871 #if DEBUG_TRANSPORT
872     if (n == NULL)
873       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874                   "Could not send message to peer `%s': unknown neighbor",
875                   GNUNET_i2s (target));
876     else if (GNUNET_YES != n->is_connected)
877       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878                   "Could not send message to peer `%s': not connected\n",
879                   GNUNET_i2s (target));
880 #endif
881     if (NULL != cont)
882       cont (cont_cls, GNUNET_SYSERR);
883     return;
884   }
885
886   if ((n->session == NULL) && (n->addr == NULL) && (n->addrlen ==0))
887   {
888     GNUNET_STATISTICS_update (GST_stats,
889                               gettext_noop
890                               ("# messages not sent (no such peer or not connected)"),
891                               1, GNUNET_NO);
892 #if DEBUG_TRANSPORT
893       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
894                   "Could not send message to peer `%s': no address available\n",
895                   GNUNET_i2s (target));
896 #endif
897
898     if (NULL != cont)
899       cont (cont_cls, GNUNET_SYSERR);
900     return;
901   }
902   GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader));
903   GNUNET_STATISTICS_update (GST_stats,
904                             gettext_noop
905                             ("# bytes in message queue for other peers"),
906                             msg_size, GNUNET_NO);
907   mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size);
908   mq->cont = cont;
909   mq->cont_cls = cont_cls;
910   /* FIXME: this memcpy can be up to 7% of our total runtime! */
911   memcpy (&mq[1], msg, msg_size);
912   mq->message_buf = (const char *) &mq[1];
913   mq->message_buf_size = msg_size;
914   mq->timeout = GNUNET_TIME_relative_to_absolute (timeout);
915   GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq);
916   if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) &&
917       (NULL == n->is_active))
918     n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n);
919 }
920
921
922 /**
923  * We have received a message from the given sender.  How long should
924  * we delay before receiving more?  (Also used to keep the peer marked
925  * as live).
926  *
927  * @param sender sender of the message
928  * @param size size of the message
929  * @param do_forward set to GNUNET_YES if the message should be forwarded to clients
930  *                   GNUNET_NO if the neighbour is not connected or violates the quota,
931  *                   GNUNET_SYSERR if the connection is not fully up yet
932  * @return how long to wait before reading more from this sender
933  */
934 struct GNUNET_TIME_Relative
935 GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity
936                                         *sender, ssize_t size, int *do_forward)
937 {
938   struct NeighbourMapEntry *n;
939   struct GNUNET_TIME_Relative ret;
940
941   GNUNET_assert (neighbours != NULL);
942
943   n = lookup_neighbour (sender);
944   if (n == NULL)
945   {
946     *do_forward = GNUNET_NO;
947     return GNUNET_TIME_UNIT_ZERO;
948   }
949   if (GNUNET_YES != n->is_connected)
950   {
951     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
952                 _("Plugin gave us %d bytes of data but somehow the session is not marked as UP yet!\n"),
953                 (int) size);
954     *do_forward = GNUNET_SYSERR;
955     return GNUNET_TIME_UNIT_ZERO;
956   }
957   if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size))
958   {
959     n->quota_violation_count++;
960 #if DEBUG_TRANSPORT
961     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962                 "Bandwidth quota (%u b/s) violation detected (total of %u).\n",
963                 n->in_tracker.available_bytes_per_s__,
964                 n->quota_violation_count);
965 #endif
966     /* Discount 32k per violation */
967     GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024);
968   }
969   else
970   {
971     if (n->quota_violation_count > 0)
972     {
973       /* try to add 32k back */
974       GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024);
975       n->quota_violation_count--;
976     }
977   }
978   if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
979   {
980     GNUNET_STATISTICS_update (GST_stats,
981                               gettext_noop
982                               ("# bandwidth quota violations by other peers"),
983                               1, GNUNET_NO);
984     *do_forward = GNUNET_NO;
985     return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT;
986   }
987   *do_forward = GNUNET_YES;
988   ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 0);
989   if (ret.rel_value > 0)
990   {
991 #if DEBUG_TRANSPORT
992     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
993                 "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n",
994                 (unsigned long long) n->in_tracker.
995                 consumption_since_last_update__,
996                 (unsigned int) n->in_tracker.available_bytes_per_s__,
997                 (unsigned long long) ret.rel_value);
998 #endif
999     GNUNET_STATISTICS_update (GST_stats,
1000                               gettext_noop ("# ms throttling suggested"),
1001                               (int64_t) ret.rel_value, GNUNET_NO);
1002   }
1003   return ret;
1004 }
1005
1006
1007 /**
1008  * Keep the connection to the given neighbour alive longer,
1009  * we received a KEEPALIVE (or equivalent).
1010  *
1011  * @param neighbour neighbour to keep alive
1012  */
1013 void
1014 GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour)
1015 {
1016   struct NeighbourMapEntry *n;
1017
1018   GNUNET_assert (neighbours != NULL);
1019
1020   n = lookup_neighbour (neighbour);
1021   if (NULL == n)
1022   {
1023     GNUNET_STATISTICS_update (GST_stats,
1024                               gettext_noop
1025                               ("# KEEPALIVE messages discarded (not connected)"),
1026                               1, GNUNET_NO);
1027     return;
1028   }
1029   GNUNET_SCHEDULER_cancel (n->timeout_task);
1030   n->timeout_task =
1031       GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
1032                                     &neighbour_timeout_task, n);
1033 }
1034
1035
1036 /**
1037  * Change the incoming quota for the given peer.
1038  *
1039  * @param neighbour identity of peer to change qutoa for
1040  * @param quota new quota
1041  */
1042 void
1043 GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour,
1044                                    struct GNUNET_BANDWIDTH_Value32NBO quota)
1045 {
1046   struct NeighbourMapEntry *n;
1047
1048   GNUNET_assert (neighbours != NULL);
1049
1050   n = lookup_neighbour (neighbour);
1051   if (n == NULL)
1052   {
1053     GNUNET_STATISTICS_update (GST_stats,
1054                               gettext_noop
1055                               ("# SET QUOTA messages ignored (no such peer)"),
1056                               1, GNUNET_NO);
1057     return;
1058   }
1059   GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota);
1060   if (0 != ntohl (quota.value__))
1061     return;
1062 #if DEBUG_TRANSPORT
1063   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n",
1064               GNUNET_i2s (&n->id), "SET_QUOTA");
1065 #endif
1066   if (GNUNET_YES == n->is_connected)
1067     GNUNET_STATISTICS_update (GST_stats,
1068                               gettext_noop ("# disconnects due to quota of 0"), 1,
1069                               GNUNET_NO);
1070   disconnect_neighbour (n);
1071 }
1072
1073
1074 /**
1075  * Closure for the neighbours_iterate function.
1076  */
1077 struct IteratorContext
1078 {
1079   /**
1080    * Function to call on each connected neighbour.
1081    */
1082   GST_NeighbourIterator cb;
1083
1084   /**
1085    * Closure for 'cb'.
1086    */
1087   void *cb_cls;
1088 };
1089
1090
1091 /**
1092  * Call the callback from the closure for each connected neighbour.
1093  *
1094  * @param cls the 'struct IteratorContext'
1095  * @param key the hash of the public key of the neighbour
1096  * @param value the 'struct NeighbourMapEntry'
1097  * @return GNUNET_OK (continue to iterate)
1098  */
1099 static int
1100 neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value)
1101 {
1102   struct IteratorContext *ic = cls;
1103   struct NeighbourMapEntry *n = value;
1104
1105   if (GNUNET_YES != n->is_connected)
1106     return GNUNET_OK;
1107
1108   ic->cb (ic->cb_cls, &n->id, NULL, 0, n->plugin_name, n->addr, n->addrlen);
1109   return GNUNET_OK;
1110 }
1111
1112
1113 /**
1114  * Iterate over all connected neighbours.
1115  *
1116  * @param cb function to call
1117  * @param cb_cls closure for cb
1118  */
1119 void
1120 GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls)
1121 {
1122   struct IteratorContext ic;
1123
1124   GNUNET_assert (neighbours != NULL);
1125
1126   ic.cb = cb;
1127   ic.cb_cls = cb_cls;
1128   GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic);
1129 }
1130
1131
1132 /**
1133  * If we have an active connection to the given target, it must be shutdown.
1134  *
1135  * @param target peer to disconnect from
1136  */
1137 void
1138 GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target)
1139 {
1140   struct NeighbourMapEntry *n;
1141   struct GNUNET_TRANSPORT_PluginFunctions *papi;
1142   struct SessionDisconnectMessage disconnect_msg;
1143
1144   GNUNET_assert (neighbours != NULL);
1145
1146   n = lookup_neighbour (target);
1147   if (NULL == n)
1148     return;                     /* not active */
1149   if (GNUNET_YES == n->is_connected)
1150   {
1151     /* we're actually connected, send DISCONNECT message */
1152     disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage));
1153     disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
1154     disconnect_msg.reserved = htonl (0);
1155     disconnect_msg.purpose.size = htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1156                                          sizeof (struct GNUNET_TIME_AbsoluteNBO));
1157     disconnect_msg.purpose.purpose = htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT);
1158     disconnect_msg.timestamp = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ());
1159     GNUNET_assert (GNUNET_OK ==
1160                    GNUNET_CRYPTO_rsa_sign (GST_my_private_key,
1161                                            &disconnect_msg.purpose,
1162                                            &disconnect_msg.signature));
1163     papi = GST_plugins_find (n->plugin_name);
1164     if (papi != NULL)
1165       papi->send (papi->cls, target, (const void *) &disconnect_msg,
1166                   sizeof (disconnect_msg),
1167                   UINT32_MAX /* priority */ ,
1168                   GNUNET_TIME_UNIT_FOREVER_REL, n->session, n->addr, n->addrlen,
1169                   GNUNET_YES, NULL, NULL);
1170     GNUNET_STATISTICS_update (GST_stats,
1171                               gettext_noop ("# peers disconnected due to external request"), 1,
1172                               GNUNET_NO);
1173   }
1174   disconnect_neighbour (n);
1175 }
1176
1177
1178 /**
1179  * We received a 'SESSION_CONNECT' message from the other peer.
1180  * Consider switching to it.
1181  *
1182  * @param message possibly a 'struct SessionConnectMessage' (check format)
1183  * @param peer identity of the peer to switch the address for
1184  * @param plugin_name name of transport that delivered the PONG
1185  * @param address address of the other peer, NULL if other peer
1186  *                       connected to us
1187  * @param address_len number of bytes in address
1188  * @param session session to use (or NULL)
1189  * @param ats performance data
1190  * @param ats_count number of entries in ats (excluding 0-termination)
1191   */
1192 void
1193 GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message,
1194                                const struct GNUNET_PeerIdentity *peer,
1195                                const char *plugin_name,
1196                                const char *sender_address, uint16_t sender_address_len,
1197                                struct Session *session,
1198                                const struct GNUNET_ATS_Information *ats,
1199                                uint32_t ats_count)
1200 {
1201   const struct SessionConnectMessage *scm;
1202   struct GNUNET_TIME_Absolute ts;
1203   struct NeighbourMapEntry *n;
1204
1205   if (ntohs (message->size) != sizeof (struct SessionConnectMessage))
1206   {
1207     GNUNET_break_op (0);
1208     return;
1209   }
1210   scm = (const struct SessionConnectMessage *) message;
1211   GNUNET_break_op (ntohl (scm->reserved) == 0);
1212   ts = GNUNET_TIME_absolute_ntoh (scm->timestamp);
1213   n = lookup_neighbour (peer);
1214   if (NULL == n) 
1215     n = setup_neighbour (peer);
1216   if (ts.abs_value > n->connect_ts.abs_value)
1217   {
1218     if (NULL != session)
1219       GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1220                        "transport-ats",
1221                        "Giving ATS session %p of plugin %s for peer %s\n",
1222                        session,
1223                        plugin_name,
1224                        GNUNET_i2s (peer));
1225     GNUNET_ATS_address_update (GST_ats,
1226                                peer,
1227                                plugin_name, sender_address, sender_address_len,
1228                                session, ats, ats_count);
1229     n->connect_ts = ts;
1230   }
1231 }
1232
1233
1234 /* end of file gnunet-service-transport_neighbours.c */