-more logging, avoid duplicate re-scheduling
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 /**
46  * After how much inactivity should a UDP session time out?
47  */
48 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
49
50 /**
51  * Number of messages we can defragment in parallel.  We only really
52  * defragment 1 message at a time, but if messages get re-ordered, we
53  * may want to keep knowledge about the previous message to avoid
54  * discarding the current message in favor of a single fragment of a
55  * previous message.  3 should be good since we don't expect massive
56  * message reorderings with UDP.
57  */
58 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
59
60 /**
61  * We keep a defragmentation queue per sender address.  How many
62  * sender addresses do we support at the same time? Memory consumption
63  * is roughly a factor of 32k * #UDP_MAX_MESSAGES_IN_DEFRAG times this
64  * value. (So 128 corresponds to 12 MB and should suffice for
65  * connecting to roughly 128 peers via UDP).
66  */
67 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
68
69
70 /**
71  * UDP Message-Packet header (after defragmentation).
72  */
73 struct UDPMessage
74 {
75   /**
76    * Message header.
77    */
78   struct GNUNET_MessageHeader header;
79
80   /**
81    * Always zero for now.
82    */
83   uint32_t reserved;
84
85   /**
86    * What is the identity of the sender
87    */
88   struct GNUNET_PeerIdentity sender;
89
90 };
91
92
93 /**
94  * Closure for #append_port().
95  */
96 struct PrettyPrinterContext
97 {
98   /**
99    * DLL
100    */
101   struct PrettyPrinterContext *next;
102
103   /**
104    * DLL
105    */
106   struct PrettyPrinterContext *prev;
107
108   /**
109    * Our plugin.
110    */
111   struct Plugin *plugin;
112
113   /**
114    * Resolver handle
115    */
116   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
117
118   /**
119    * Function to call with the result.
120    */
121   GNUNET_TRANSPORT_AddressStringCallback asc;
122
123   /**
124    * Clsoure for @e asc.
125    */
126   void *asc_cls;
127
128   /**
129    * Timeout task
130    */
131   struct GNUNET_SCHEDULER_Task *timeout_task;
132
133   /**
134    * Is this an IPv6 address?
135    */
136   int ipv6;
137
138   /**
139    * Options
140    */
141   uint32_t options;
142
143   /**
144    * Port to add after the IP address.
145    */
146   uint16_t port;
147
148 };
149
150
151 /**
152  * Session with another peer.
153  */
154 struct GNUNET_ATS_Session
155 {
156   /**
157    * Which peer is this session for?
158    */
159   struct GNUNET_PeerIdentity target;
160
161   /**
162    * Plugin this session belongs to.
163    */
164   struct Plugin *plugin;
165
166   /**
167    * Context for dealing with fragments.
168    */
169   struct UDP_FragmentationContext *frag_ctx;
170
171   /**
172    * Desired delay for next sending we send to other peer
173    */
174   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
175
176   /**
177    * Desired delay for transmissions we received from other peer.
178    * Adjusted to be per fragment (UDP_MTU), even though on the
179    * wire it was for "full messages".
180    */
181   struct GNUNET_TIME_Relative flow_delay_from_other_peer;
182
183   /**
184    * Session timeout task
185    */
186   struct GNUNET_SCHEDULER_Task *timeout_task;
187
188   /**
189    * When does this session time out?
190    */
191   struct GNUNET_TIME_Absolute timeout;
192
193   /**
194    * What time did we last transmit?
195    */
196   struct GNUNET_TIME_Absolute last_transmit_time;
197
198   /**
199    * expected delay for ACKs
200    */
201   struct GNUNET_TIME_Relative last_expected_ack_delay;
202
203   /**
204    * desired delay between UDP messages
205    */
206   struct GNUNET_TIME_Relative last_expected_msg_delay;
207
208   /**
209    * Our own address.
210    */
211   struct GNUNET_HELLO_Address *address;
212
213   /**
214    * Number of bytes waiting for transmission to this peer.
215    */
216   unsigned long long bytes_in_queue;
217
218   /**
219    * Number of messages waiting for transmission to this peer.
220    */
221   unsigned int msgs_in_queue;
222
223   /**
224    * Reference counter to indicate that this session is
225    * currently being used and must not be destroyed;
226    * setting @e in_destroy will destroy it as soon as
227    * possible.
228    */
229   unsigned int rc;
230
231   /**
232    * Network type of the address.
233    */
234   enum GNUNET_ATS_Network_Type scope;
235
236   /**
237    * Is this session about to be destroyed (sometimes we cannot
238    * destroy a session immediately as below us on the stack
239    * there might be code that still uses it; in this case,
240    * @e rc is non-zero).
241    */
242   int in_destroy;
243 };
244
245
246
247 /**
248  * Data structure to track defragmentation contexts based
249  * on the source of the UDP traffic.
250  */
251 struct DefragContext
252 {
253
254   /**
255    * Defragmentation context.
256    */
257   struct GNUNET_DEFRAGMENT_Context *defrag;
258
259   /**
260    * Reference to master plugin struct.
261    */
262   struct Plugin *plugin;
263
264   /**
265    * Node in the defrag heap.
266    */
267   struct GNUNET_CONTAINER_HeapNode *hnode;
268
269   /**
270    * Source address this receive context is for (allocated at the
271    * end of the struct).
272    */
273   const union UdpAddress *udp_addr;
274
275   /**
276    * Who's message(s) are we defragmenting here?
277    * Only initialized once we succeeded and
278    * @e have_sender is set.
279    */
280   struct GNUNET_PeerIdentity sender;
281
282   /**
283    * Length of @e udp_addr.
284    */
285   size_t udp_addr_len;
286
287   /**
288    * Network type the address belongs to.
289    */
290   enum GNUNET_ATS_Network_Type network_type;
291
292   /**
293    * Has the @e sender field been initialized yet?
294    */
295   int have_sender;
296 };
297
298
299 /**
300  * Context to send fragmented messages
301  */
302 struct UDP_FragmentationContext
303 {
304   /**
305    * Next in linked list
306    */
307   struct UDP_FragmentationContext *next;
308
309   /**
310    * Previous in linked list
311    */
312   struct UDP_FragmentationContext *prev;
313
314   /**
315    * The plugin
316    */
317   struct Plugin *plugin;
318
319   /**
320    * Handle for fragmentation.
321    */
322   struct GNUNET_FRAGMENT_Context *frag;
323
324   /**
325    * The session this fragmentation context belongs to
326    */
327   struct GNUNET_ATS_Session *session;
328
329   /**
330    * Function to call upon completion of the transmission.
331    */
332   GNUNET_TRANSPORT_TransmitContinuation cont;
333
334   /**
335    * Closure for @e cont.
336    */
337   void *cont_cls;
338
339   /**
340    * Start time.
341    */
342   struct GNUNET_TIME_Absolute start_time;
343
344   /**
345    * Transmission time for the next fragment.  Incremented by
346    * the "flow_delay_from_other_peer" for each fragment when
347    * we setup the fragments.
348    */
349   struct GNUNET_TIME_Absolute next_frag_time;
350
351   /**
352    * Message timeout
353    */
354   struct GNUNET_TIME_Absolute timeout;
355
356   /**
357    * Payload size of original unfragmented message
358    */
359   size_t payload_size;
360
361   /**
362    * Bytes used to send all fragments on wire including UDP overhead
363    */
364   size_t on_wire_size;
365
366 };
367
368
369 /**
370  * Function called when a message is removed from the
371  * transmission queue.
372  *
373  * @param cls closure
374  * @param udpw message wrapper finished
375  * @param result #GNUNET_OK on success (message was sent)
376  *               #GNUNET_SYSERR if the target disconnected
377  *               or we had a timeout or other trouble sending
378  */
379 typedef void
380 (*QueueContinuation) (void *cls,
381                       struct UDP_MessageWrapper *udpw,
382                       int result);
383
384
385 /**
386  * Information we track for each message in the queue.
387  */
388 struct UDP_MessageWrapper
389 {
390   /**
391    * Session this message belongs to
392    */
393   struct GNUNET_ATS_Session *session;
394
395   /**
396    * DLL of messages, previous element
397    */
398   struct UDP_MessageWrapper *prev;
399
400   /**
401    * DLL of messages, next element
402    */
403   struct UDP_MessageWrapper *next;
404
405   /**
406    * Message with @e msg_size bytes including UDP-specific overhead.
407    */
408   char *msg_buf;
409
410   /**
411    * Function to call once the message wrapper is being removed
412    * from the queue (with success or failure).
413    */
414   QueueContinuation qc;
415
416   /**
417    * Closure for @e qc.
418    */
419   void *qc_cls;
420
421   /**
422    * External continuation to call upon completion of the
423    * transmission, NULL if this queue entry is not for a
424    * message from the application.
425    */
426   GNUNET_TRANSPORT_TransmitContinuation cont;
427
428   /**
429    * Closure for @e cont.
430    */
431   void *cont_cls;
432
433   /**
434    * Fragmentation context.
435    * frag_ctx == NULL if transport <= MTU
436    * frag_ctx != NULL if transport > MTU
437    */
438   struct UDP_FragmentationContext *frag_ctx;
439
440   /**
441    * Message enqueue time.
442    */
443   struct GNUNET_TIME_Absolute start_time;
444
445   /**
446    * Desired transmission time for this message, based on the
447    * flow limiting information we got from the other peer.
448    */
449   struct GNUNET_TIME_Absolute transmission_time;
450
451   /**
452    * Message timeout.
453    */
454   struct GNUNET_TIME_Absolute timeout;
455
456   /**
457    * Size of UDP message to send, including UDP-specific overhead.
458    */
459   size_t msg_size;
460
461   /**
462    * Payload size of original message.
463    */
464   size_t payload_size;
465
466 };
467
468
469 GNUNET_NETWORK_STRUCT_BEGIN
470
471 /**
472  * UDP ACK Message-Packet header.
473  */
474 struct UDP_ACK_Message
475 {
476   /**
477    * Message header.
478    */
479   struct GNUNET_MessageHeader header;
480
481   /**
482    * Desired delay for flow control, in us (in NBO).
483    */
484   uint32_t delay GNUNET_PACKED;
485
486   /**
487    * What is the identity of the sender
488    */
489   struct GNUNET_PeerIdentity sender;
490
491 };
492
493 GNUNET_NETWORK_STRUCT_END
494
495
496 /* ************************* Monitoring *********** */
497
498
499 /**
500  * If a session monitor is attached, notify it about the new
501  * session state.
502  *
503  * @param plugin our plugin
504  * @param session session that changed state
505  * @param state new state of the session
506  */
507 static void
508 notify_session_monitor (struct Plugin *plugin,
509                         struct GNUNET_ATS_Session *session,
510                         enum GNUNET_TRANSPORT_SessionState state)
511 {
512   struct GNUNET_TRANSPORT_SessionInfo info;
513
514   if (NULL == plugin->sic)
515     return;
516   if (GNUNET_YES == session->in_destroy)
517     return; /* already destroyed, just RC>0 left-over actions */
518   memset (&info,
519           0,
520           sizeof (info));
521   info.state = state;
522   info.is_inbound = GNUNET_SYSERR; /* hard to say */
523   info.num_msg_pending = session->msgs_in_queue;
524   info.num_bytes_pending = session->bytes_in_queue;
525   /* info.receive_delay remains zero as this is not supported by UDP
526      (cannot selectively not receive from 'some' peer while continuing
527      to receive from others) */
528   info.session_timeout = session->timeout;
529   info.address = session->address;
530   plugin->sic (plugin->sic_cls,
531                session,
532                &info);
533 }
534
535
536 /**
537  * Return information about the given session to the monitor callback.
538  *
539  * @param cls the `struct Plugin` with the monitor callback (`sic`)
540  * @param peer peer we send information about
541  * @param value our `struct GNUNET_ATS_Session` to send information about
542  * @return #GNUNET_OK (continue to iterate)
543  */
544 static int
545 send_session_info_iter (void *cls,
546                         const struct GNUNET_PeerIdentity *peer,
547                         void *value)
548 {
549   struct Plugin *plugin = cls;
550   struct GNUNET_ATS_Session *session = value;
551
552   notify_session_monitor (plugin,
553                           session,
554                           GNUNET_TRANSPORT_SS_INIT);
555   notify_session_monitor (plugin,
556                           session,
557                           GNUNET_TRANSPORT_SS_UP);
558   return GNUNET_OK;
559 }
560
561
562 /**
563  * Begin monitoring sessions of a plugin.  There can only
564  * be one active monitor per plugin (i.e. if there are
565  * multiple monitors, the transport service needs to
566  * multiplex the generated events over all of them).
567  *
568  * @param cls closure of the plugin
569  * @param sic callback to invoke, NULL to disable monitor;
570  *            plugin will being by iterating over all active
571  *            sessions immediately and then enter monitor mode
572  * @param sic_cls closure for @a sic
573  */
574 static void
575 udp_plugin_setup_monitor (void *cls,
576                           GNUNET_TRANSPORT_SessionInfoCallback sic,
577                           void *sic_cls)
578 {
579   struct Plugin *plugin = cls;
580
581   plugin->sic = sic;
582   plugin->sic_cls = sic_cls;
583   if (NULL != sic)
584   {
585     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
586                                            &send_session_info_iter,
587                                            plugin);
588     /* signal end of first iteration */
589     sic (sic_cls,
590          NULL,
591          NULL);
592   }
593 }
594
595
596 /* ****************** Little Helpers ****************** */
597
598
599 /**
600  * Function to free last resources associated with a session.
601  *
602  * @param s session to free
603  */
604 static void
605 free_session (struct GNUNET_ATS_Session *s)
606 {
607   if (NULL != s->address)
608   {
609     GNUNET_HELLO_address_free (s->address);
610     s->address = NULL;
611   }
612   if (NULL != s->frag_ctx)
613   {
614     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag,
615                                      NULL,
616                                      NULL);
617     GNUNET_free (s->frag_ctx);
618     s->frag_ctx = NULL;
619   }
620   GNUNET_free (s);
621 }
622
623
624 /**
625  * Function that is called to get the keepalive factor.
626  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
627  * calculate the interval between keepalive packets.
628  *
629  * @param cls closure with the `struct Plugin`
630  * @return keepalive factor
631  */
632 static unsigned int
633 udp_query_keepalive_factor (void *cls)
634 {
635   return 15;
636 }
637
638
639 /**
640  * Function obtain the network type for a session
641  *
642  * @param cls closure (`struct Plugin *`)
643  * @param session the session
644  * @return the network type
645  */
646 static enum GNUNET_ATS_Network_Type
647 udp_plugin_get_network (void *cls,
648                         struct GNUNET_ATS_Session *session)
649 {
650   return session->scope;
651 }
652
653
654 /**
655  * Function obtain the network type for an address.
656  *
657  * @param cls closure (`struct Plugin *`)
658  * @param address the address
659  * @return the network type
660  */
661 static enum GNUNET_ATS_Network_Type
662 udp_plugin_get_network_for_address (void *cls,
663                                     const struct GNUNET_HELLO_Address *address)
664 {
665   struct Plugin *plugin = cls;
666   size_t addrlen;
667   struct sockaddr_in a4;
668   struct sockaddr_in6 a6;
669   const struct IPv4UdpAddress *u4;
670   const struct IPv6UdpAddress *u6;
671   const void *sb;
672   size_t sbs;
673
674   addrlen = address->address_length;
675   if (addrlen == sizeof(struct IPv6UdpAddress))
676   {
677     GNUNET_assert (NULL != address->address); /* make static analysis happy */
678     u6 = address->address;
679     memset (&a6, 0, sizeof(a6));
680 #if HAVE_SOCKADDR_IN_SIN_LEN
681     a6.sin6_len = sizeof (a6);
682 #endif
683     a6.sin6_family = AF_INET6;
684     a6.sin6_port = u6->u6_port;
685     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
686     sb = &a6;
687     sbs = sizeof(a6);
688   }
689   else if (addrlen == sizeof(struct IPv4UdpAddress))
690   {
691     GNUNET_assert (NULL != address->address); /* make static analysis happy */
692     u4 = address->address;
693     memset (&a4, 0, sizeof(a4));
694 #if HAVE_SOCKADDR_IN_SIN_LEN
695     a4.sin_len = sizeof (a4);
696 #endif
697     a4.sin_family = AF_INET;
698     a4.sin_port = u4->u4_port;
699     a4.sin_addr.s_addr = u4->ipv4_addr;
700     sb = &a4;
701     sbs = sizeof(a4);
702   }
703   else
704   {
705     GNUNET_break (0);
706     return GNUNET_ATS_NET_UNSPECIFIED;
707   }
708   return plugin->env->get_address_type (plugin->env->cls,
709                                         sb,
710                                         sbs);
711 }
712
713
714 /* ******************* Event loop ******************** */
715
716 /**
717  * We have been notified that our readset has something to read.  We don't
718  * know which socket needs to be read, so we have to check each one
719  * Then reschedule this function to be called again once more is available.
720  *
721  * @param cls the plugin handle
722  * @param tc the scheduling context (for rescheduling this function again)
723  */
724 static void
725 udp_plugin_select_v4 (void *cls,
726                       const struct GNUNET_SCHEDULER_TaskContext *tc);
727
728
729 /**
730  * We have been notified that our readset has something to read.  We don't
731  * know which socket needs to be read, so we have to check each one
732  * Then reschedule this function to be called again once more is available.
733  *
734  * @param cls the plugin handle
735  * @param tc the scheduling context (for rescheduling this function again)
736  */
737 static void
738 udp_plugin_select_v6 (void *cls,
739                       const struct GNUNET_SCHEDULER_TaskContext *tc);
740
741
742 /**
743  * (re)schedule IPv4-select tasks for this plugin.
744  *
745  * @param plugin plugin to reschedule
746  */
747 static void
748 schedule_select_v4 (struct Plugin *plugin)
749 {
750   struct GNUNET_TIME_Relative min_delay;
751   struct GNUNET_TIME_Relative delay;
752   struct UDP_MessageWrapper *udpw;
753   struct UDP_MessageWrapper *min_udpw;
754
755   if ( (GNUNET_YES == plugin->enable_ipv4) &&
756        (NULL != plugin->sockv4) )
757   {
758     /* Find a message ready to send:
759      * Flow delay from other peer is expired or not set (0) */
760     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
761     min_udpw = NULL;
762     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
763     {
764       delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
765       if (delay.rel_value_us < min_delay.rel_value_us)
766       {
767         min_delay = delay;
768         min_udpw = udpw;
769       }
770     }
771     if (NULL != plugin->select_task_v4)
772       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
773     if (NULL != min_udpw)
774     {
775       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
776       {
777         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
778                     "Calculated flow delay for UDPv4 at %s for %s\n",
779                     GNUNET_STRINGS_relative_time_to_string (min_delay,
780                                                             GNUNET_YES),
781                     GNUNET_i2s (&udpw->session->target));
782       }
783       else
784       {
785         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786                     "Calculated flow delay for UDPv4 at %s for %s\n",
787                     GNUNET_STRINGS_relative_time_to_string (min_delay,
788                                                             GNUNET_YES),
789                     GNUNET_i2s (&udpw->session->target));
790       }
791     }
792     plugin->select_task_v4
793       = GNUNET_SCHEDULER_add_read_net (min_delay,
794                                        plugin->sockv4,
795                                        &udp_plugin_select_v4,
796                                        plugin);
797   }
798 }
799
800
801 /**
802  * (re)schedule IPv6-select tasks for this plugin.
803  *
804  * @param plugin plugin to reschedule
805  */
806 static void
807 schedule_select_v6 (struct Plugin *plugin)
808 {
809   struct GNUNET_TIME_Relative min_delay;
810   struct GNUNET_TIME_Relative delay;
811   struct UDP_MessageWrapper *udpw;
812   struct UDP_MessageWrapper *min_udpw;
813
814   if ( (GNUNET_YES == plugin->enable_ipv6) &&
815        (NULL != plugin->sockv6) )
816   {
817     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
818     min_udpw = NULL;
819     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
820     {
821       delay = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
822       if (delay.rel_value_us < min_delay.rel_value_us)
823       {
824         min_delay = delay;
825         min_udpw = udpw;
826       }
827     }
828     if (NULL != plugin->select_task_v6)
829       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
830     if (NULL != min_udpw)
831     {
832       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
833       {
834         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
835                     "Calculated flow delay for UDPv6 at %s for %s\n",
836                     GNUNET_STRINGS_relative_time_to_string (min_delay,
837                                                             GNUNET_YES),
838                     GNUNET_i2s (&udpw->session->target));
839       }
840       else
841       {
842         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
843                     "Calculated flow delay for UDPv6 at %s for %s\n",
844                     GNUNET_STRINGS_relative_time_to_string (min_delay,
845                                                             GNUNET_YES),
846                     GNUNET_i2s (&udpw->session->target));
847       }
848     }
849     plugin->select_task_v6
850       = GNUNET_SCHEDULER_add_read_net (min_delay,
851                                        plugin->sockv6,
852                                        &udp_plugin_select_v6,
853                                        plugin);
854   }
855 }
856
857
858 /* ******************* Address to string and back ***************** */
859
860
861 /**
862  * Function called for a quick conversion of the binary address to
863  * a numeric address.  Note that the caller must not free the
864  * address and that the next call to this function is allowed
865  * to override the address again.
866  *
867  * @param cls closure
868  * @param addr binary address (a `union UdpAddress`)
869  * @param addrlen length of the @a addr
870  * @return string representing the same address
871  */
872 const char *
873 udp_address_to_string (void *cls,
874                        const void *addr,
875                        size_t addrlen)
876 {
877   static char rbuf[INET6_ADDRSTRLEN + 10];
878   char buf[INET6_ADDRSTRLEN];
879   const void *sb;
880   struct in_addr a4;
881   struct in6_addr a6;
882   const struct IPv4UdpAddress *t4;
883   const struct IPv6UdpAddress *t6;
884   int af;
885   uint16_t port;
886   uint32_t options;
887
888   if (NULL == addr)
889   {
890     GNUNET_break_op (0);
891     return NULL;
892   }
893
894   if (addrlen == sizeof(struct IPv6UdpAddress))
895   {
896     t6 = addr;
897     af = AF_INET6;
898     options = ntohl (t6->options);
899     port = ntohs (t6->u6_port);
900     a6 = t6->ipv6_addr;
901     sb = &a6;
902   }
903   else if (addrlen == sizeof(struct IPv4UdpAddress))
904   {
905     t4 = addr;
906     af = AF_INET;
907     options = ntohl (t4->options);
908     port = ntohs (t4->u4_port);
909     a4.s_addr = t4->ipv4_addr;
910     sb = &a4;
911   }
912   else
913   {
914     GNUNET_break_op (0);
915     return NULL;
916   }
917   inet_ntop (af,
918              sb,
919              buf,
920              INET6_ADDRSTRLEN);
921   GNUNET_snprintf (rbuf,
922                    sizeof(rbuf),
923                    (af == AF_INET6)
924                    ? "%s.%u.[%s]:%u"
925                    : "%s.%u.%s:%u",
926                    PLUGIN_NAME,
927                    options,
928                    buf,
929                    port);
930   return rbuf;
931 }
932
933
934 /**
935  * Function called to convert a string address to a binary address.
936  *
937  * @param cls closure (`struct Plugin *`)
938  * @param addr string address
939  * @param addrlen length of the address
940  * @param buf location to store the buffer
941  * @param added location to store the number of bytes in the buffer.
942  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
943  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
944  */
945 static int
946 udp_string_to_address (void *cls,
947                        const char *addr,
948                        uint16_t addrlen,
949                        void **buf,
950                        size_t *added)
951 {
952   struct sockaddr_storage socket_address;
953   char *address;
954   char *plugin;
955   char *optionstr;
956   uint32_t options;
957
958   /* Format tcp.options.address:port */
959   address = NULL;
960   plugin = NULL;
961   optionstr = NULL;
962
963   if ((NULL == addr) || (0 == addrlen))
964   {
965     GNUNET_break (0);
966     return GNUNET_SYSERR;
967   }
968   if ('\0' != addr[addrlen - 1])
969   {
970     GNUNET_break (0);
971     return GNUNET_SYSERR;
972   }
973   if (strlen (addr) != addrlen - 1)
974   {
975     GNUNET_break (0);
976     return GNUNET_SYSERR;
977   }
978   plugin = GNUNET_strdup (addr);
979   optionstr = strchr (plugin, '.');
980   if (NULL == optionstr)
981   {
982     GNUNET_break (0);
983     GNUNET_free (plugin);
984     return GNUNET_SYSERR;
985   }
986   optionstr[0] = '\0';
987   optionstr++;
988   options = atol (optionstr);
989   address = strchr (optionstr, '.');
990   if (NULL == address)
991   {
992     GNUNET_break (0);
993     GNUNET_free (plugin);
994     return GNUNET_SYSERR;
995   }
996   address[0] = '\0';
997   address++;
998
999   if (GNUNET_OK !=
1000       GNUNET_STRINGS_to_address_ip (address,
1001                                     strlen (address),
1002                                     &socket_address))
1003   {
1004     GNUNET_break (0);
1005     GNUNET_free (plugin);
1006     return GNUNET_SYSERR;
1007   }
1008   GNUNET_free(plugin);
1009
1010   switch (socket_address.ss_family)
1011   {
1012   case AF_INET:
1013     {
1014       struct IPv4UdpAddress *u4;
1015       const struct sockaddr_in *in4 = (const struct sockaddr_in *) &socket_address;
1016
1017       u4 = GNUNET_new (struct IPv4UdpAddress);
1018       u4->options = htonl (options);
1019       u4->ipv4_addr = in4->sin_addr.s_addr;
1020       u4->u4_port = in4->sin_port;
1021       *buf = u4;
1022       *added = sizeof (struct IPv4UdpAddress);
1023       return GNUNET_OK;
1024     }
1025   case AF_INET6:
1026     {
1027       struct IPv6UdpAddress *u6;
1028       const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &socket_address;
1029
1030       u6 = GNUNET_new (struct IPv6UdpAddress);
1031       u6->options = htonl (options);
1032       u6->ipv6_addr = in6->sin6_addr;
1033       u6->u6_port = in6->sin6_port;
1034       *buf = u6;
1035       *added = sizeof (struct IPv6UdpAddress);
1036       return GNUNET_OK;
1037     }
1038   default:
1039     GNUNET_break (0);
1040     return GNUNET_SYSERR;
1041   }
1042 }
1043
1044
1045 /**
1046  * Append our port and forward the result.
1047  *
1048  * @param cls a `struct PrettyPrinterContext *`
1049  * @param hostname result from DNS resolver
1050  */
1051 static void
1052 append_port (void *cls,
1053              const char *hostname)
1054 {
1055   struct PrettyPrinterContext *ppc = cls;
1056   struct Plugin *plugin = ppc->plugin;
1057   char *ret;
1058
1059   if (NULL == hostname)
1060   {
1061     /* Final call, done */
1062     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
1063                                  plugin->ppc_dll_tail,
1064                                  ppc);
1065     ppc->resolver_handle = NULL;
1066     ppc->asc (ppc->asc_cls,
1067               NULL,
1068               GNUNET_OK);
1069     GNUNET_free (ppc);
1070     return;
1071   }
1072   if (GNUNET_YES == ppc->ipv6)
1073     GNUNET_asprintf (&ret,
1074                      "%s.%u.[%s]:%d",
1075                      PLUGIN_NAME,
1076                      ppc->options,
1077                      hostname,
1078                      ppc->port);
1079   else
1080     GNUNET_asprintf (&ret,
1081                      "%s.%u.%s:%d",
1082                      PLUGIN_NAME,
1083                      ppc->options,
1084                      hostname,
1085                      ppc->port);
1086   ppc->asc (ppc->asc_cls,
1087             ret,
1088             GNUNET_OK);
1089   GNUNET_free (ret);
1090 }
1091
1092
1093 /**
1094  * Convert the transports address to a nice, human-readable format.
1095  *
1096  * @param cls closure with the `struct Plugin *`
1097  * @param type name of the transport that generated the address
1098  * @param addr one of the addresses of the host, NULL for the last address
1099  *        the specific address format depends on the transport;
1100  *        a `union UdpAddress`
1101  * @param addrlen length of the address
1102  * @param numeric should (IP) addresses be displayed in numeric form?
1103  * @param timeout after how long should we give up?
1104  * @param asc function to call on each string
1105  * @param asc_cls closure for @a asc
1106  */
1107 static void
1108 udp_plugin_address_pretty_printer (void *cls,
1109                                    const char *type,
1110                                    const void *addr,
1111                                    size_t addrlen,
1112                                    int numeric,
1113                                    struct GNUNET_TIME_Relative timeout,
1114                                    GNUNET_TRANSPORT_AddressStringCallback asc,
1115                                    void *asc_cls)
1116 {
1117   struct Plugin *plugin = cls;
1118   struct PrettyPrinterContext *ppc;
1119   const struct sockaddr *sb;
1120   size_t sbs;
1121   struct sockaddr_in a4;
1122   struct sockaddr_in6 a6;
1123   const struct IPv4UdpAddress *u4;
1124   const struct IPv6UdpAddress *u6;
1125   uint16_t port;
1126   uint32_t options;
1127
1128   if (addrlen == sizeof(struct IPv6UdpAddress))
1129   {
1130     u6 = addr;
1131     memset (&a6,
1132             0,
1133             sizeof (a6));
1134     a6.sin6_family = AF_INET6;
1135 #if HAVE_SOCKADDR_IN_SIN_LEN
1136     a6.sin6_len = sizeof (a6);
1137 #endif
1138     a6.sin6_port = u6->u6_port;
1139     a6.sin6_addr = u6->ipv6_addr;
1140     port = ntohs (u6->u6_port);
1141     options = ntohl (u6->options);
1142     sb = (const struct sockaddr *) &a6;
1143     sbs = sizeof (a6);
1144   }
1145   else if (addrlen == sizeof (struct IPv4UdpAddress))
1146   {
1147     u4 = addr;
1148     memset (&a4,
1149             0,
1150             sizeof(a4));
1151     a4.sin_family = AF_INET;
1152 #if HAVE_SOCKADDR_IN_SIN_LEN
1153     a4.sin_len = sizeof (a4);
1154 #endif
1155     a4.sin_port = u4->u4_port;
1156     a4.sin_addr.s_addr = u4->ipv4_addr;
1157     port = ntohs (u4->u4_port);
1158     options = ntohl (u4->options);
1159     sb = (const struct sockaddr *) &a4;
1160     sbs = sizeof(a4);
1161   }
1162   else
1163   {
1164     /* invalid address */
1165     GNUNET_break_op (0);
1166     asc (asc_cls,
1167          NULL,
1168          GNUNET_SYSERR);
1169     asc (asc_cls,
1170          NULL,
1171          GNUNET_OK);
1172     return;
1173   }
1174   ppc = GNUNET_new (struct PrettyPrinterContext);
1175   ppc->plugin = plugin;
1176   ppc->asc = asc;
1177   ppc->asc_cls = asc_cls;
1178   ppc->port = port;
1179   ppc->options = options;
1180   if (addrlen == sizeof (struct IPv6UdpAddress))
1181     ppc->ipv6 = GNUNET_YES;
1182   else
1183     ppc->ipv6 = GNUNET_NO;
1184   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
1185                                plugin->ppc_dll_tail,
1186                                ppc);
1187   ppc->resolver_handle
1188     = GNUNET_RESOLVER_hostname_get (sb,
1189                                     sbs,
1190                                     ! numeric,
1191                                     timeout,
1192                                     &append_port,
1193                                     ppc);
1194 }
1195
1196
1197 /**
1198  * Check if the given port is plausible (must be either our listen
1199  * port or our advertised port).  If it is neither, we return
1200  * #GNUNET_SYSERR.
1201  *
1202  * @param plugin global variables
1203  * @param in_port port number to check
1204  * @return #GNUNET_OK if port is either our open or advertised port
1205  */
1206 static int
1207 check_port (const struct Plugin *plugin,
1208             uint16_t in_port)
1209 {
1210   if ( (plugin->port == in_port) ||
1211        (plugin->aport == in_port) )
1212     return GNUNET_OK;
1213   return GNUNET_SYSERR;
1214 }
1215
1216
1217 /**
1218  * Function that will be called to check if a binary address for this
1219  * plugin is well-formed and corresponds to an address for THIS peer
1220  * (as per our configuration).  Naturally, if absolutely necessary,
1221  * plugins can be a bit conservative in their answer, but in general
1222  * plugins should make sure that the address does not redirect
1223  * traffic to a 3rd party that might try to man-in-the-middle our
1224  * traffic.
1225  *
1226  * @param cls closure, should be our handle to the Plugin
1227  * @param addr pointer to a `union UdpAddress`
1228  * @param addrlen length of @a addr
1229  * @return #GNUNET_OK if this is a plausible address for this peer
1230  *         and transport, #GNUNET_SYSERR if not
1231  */
1232 static int
1233 udp_plugin_check_address (void *cls,
1234                           const void *addr,
1235                           size_t addrlen)
1236 {
1237   struct Plugin *plugin = cls;
1238   const struct IPv4UdpAddress *v4;
1239   const struct IPv6UdpAddress *v6;
1240
1241   if (sizeof(struct IPv4UdpAddress) == addrlen)
1242   {
1243     v4 = (const struct IPv4UdpAddress *) addr;
1244     if (GNUNET_OK != check_port (plugin,
1245                                  ntohs (v4->u4_port)))
1246       return GNUNET_SYSERR;
1247     if (GNUNET_OK !=
1248         GNUNET_NAT_test_address (plugin->nat,
1249                                  &v4->ipv4_addr,
1250                                  sizeof (struct in_addr)))
1251       return GNUNET_SYSERR;
1252   }
1253   else if (sizeof(struct IPv6UdpAddress) == addrlen)
1254   {
1255     v6 = (const struct IPv6UdpAddress *) addr;
1256     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1257     {
1258       GNUNET_break_op (0);
1259       return GNUNET_SYSERR;
1260     }
1261     if (GNUNET_OK != check_port (plugin,
1262                                  ntohs (v6->u6_port)))
1263       return GNUNET_SYSERR;
1264     if (GNUNET_OK !=
1265         GNUNET_NAT_test_address (plugin->nat,
1266                                  &v6->ipv6_addr,
1267                                  sizeof (struct in6_addr)))
1268       return GNUNET_SYSERR;
1269   }
1270   else
1271   {
1272     GNUNET_break_op (0);
1273     return GNUNET_SYSERR;
1274   }
1275   return GNUNET_OK;
1276 }
1277
1278
1279 /**
1280  * Our external IP address/port mapping has changed.
1281  *
1282  * @param cls closure, the `struct Plugin`
1283  * @param add_remove #GNUNET_YES to mean the new public IP address,
1284  *                   #GNUNET_NO to mean the previous (now invalid) one
1285  * @param addr either the previous or the new public IP address
1286  * @param addrlen actual length of the @a addr
1287  */
1288 static void
1289 udp_nat_port_map_callback (void *cls,
1290                            int add_remove,
1291                            const struct sockaddr *addr,
1292                            socklen_t addrlen)
1293 {
1294   struct Plugin *plugin = cls;
1295   struct GNUNET_HELLO_Address *address;
1296   struct IPv4UdpAddress u4;
1297   struct IPv6UdpAddress u6;
1298   void *arg;
1299   size_t args;
1300
1301   LOG (GNUNET_ERROR_TYPE_DEBUG,
1302        (GNUNET_YES == add_remove)
1303        ? "NAT notification to add address `%s'\n"
1304        : "NAT notification to remove address `%s'\n",
1305        GNUNET_a2s (addr,
1306                    addrlen));
1307   /* convert 'address' to our internal format */
1308   switch (addr->sa_family)
1309   {
1310   case AF_INET:
1311     {
1312       const struct sockaddr_in *i4;
1313
1314       GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
1315       i4 = (const struct sockaddr_in *) addr;
1316       if (0 == ntohs (i4->sin_port))
1317       {
1318         GNUNET_break (0);
1319         return;
1320       }
1321       memset (&u4,
1322               0,
1323               sizeof(u4));
1324       u4.options = htonl (plugin->myoptions);
1325       u4.ipv4_addr = i4->sin_addr.s_addr;
1326       u4.u4_port = i4->sin_port;
1327       arg = &u4;
1328       args = sizeof (struct IPv4UdpAddress);
1329       break;
1330     }
1331   case AF_INET6:
1332     {
1333       const struct sockaddr_in6 *i6;
1334
1335       GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
1336       i6 = (const struct sockaddr_in6 *) addr;
1337       if (0 == ntohs (i6->sin6_port))
1338       {
1339         GNUNET_break (0);
1340         return;
1341       }
1342       memset (&u6,
1343               0,
1344               sizeof(u6));
1345       u6.options = htonl (plugin->myoptions);
1346       u6.ipv6_addr = i6->sin6_addr;
1347       u6.u6_port = i6->sin6_port;
1348       arg = &u6;
1349       args = sizeof (struct IPv6UdpAddress);
1350       break;
1351     }
1352   default:
1353     GNUNET_break (0);
1354     return;
1355   }
1356   /* modify our published address list */
1357   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1358                                            PLUGIN_NAME,
1359                                            arg,
1360                                            args,
1361                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
1362   plugin->env->notify_address (plugin->env->cls,
1363                                add_remove,
1364                                address);
1365   GNUNET_HELLO_address_free (address);
1366 }
1367
1368
1369 /* ********************* Finding sessions ******************* */
1370
1371
1372 /**
1373  * Closure for #session_cmp_it().
1374  */
1375 struct GNUNET_ATS_SessionCompareContext
1376 {
1377   /**
1378    * Set to session matching the address.
1379    */
1380   struct GNUNET_ATS_Session *res;
1381
1382   /**
1383    * Address we are looking for.
1384    */
1385   const struct GNUNET_HELLO_Address *address;
1386 };
1387
1388
1389 /**
1390  * Find a session with a matching address.
1391  *
1392  * @param cls the `struct GNUNET_ATS_SessionCompareContext *`
1393  * @param key peer identity (unused)
1394  * @param value the `struct GNUNET_ATS_Session *`
1395  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1396  */
1397 static int
1398 session_cmp_it (void *cls,
1399                 const struct GNUNET_PeerIdentity *key,
1400                 void *value)
1401 {
1402   struct GNUNET_ATS_SessionCompareContext *cctx = cls;
1403   struct GNUNET_ATS_Session *s = value;
1404
1405   if (0 == GNUNET_HELLO_address_cmp (s->address,
1406                                      cctx->address))
1407   {
1408     GNUNET_assert (GNUNET_NO == s->in_destroy);
1409     cctx->res = s;
1410     return GNUNET_NO;
1411   }
1412   return GNUNET_OK;
1413 }
1414
1415
1416 /**
1417  * Locate an existing session the transport service is using to
1418  * send data to another peer.  Performs some basic sanity checks
1419  * on the address and then tries to locate a matching session.
1420  *
1421  * @param cls the plugin
1422  * @param address the address we should locate the session by
1423  * @return the session if it exists, or NULL if it is not found
1424  */
1425 static struct GNUNET_ATS_Session *
1426 udp_plugin_lookup_session (void *cls,
1427                            const struct GNUNET_HELLO_Address *address)
1428 {
1429   struct Plugin *plugin = cls;
1430   const struct IPv6UdpAddress *udp_a6;
1431   const struct IPv4UdpAddress *udp_a4;
1432   struct GNUNET_ATS_SessionCompareContext cctx;
1433
1434   if (NULL == address->address)
1435   {
1436     GNUNET_break (0);
1437     return NULL;
1438   }
1439   if (sizeof(struct IPv4UdpAddress) == address->address_length)
1440   {
1441     if (NULL == plugin->sockv4)
1442       return NULL;
1443     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1444     if (0 == udp_a4->u4_port)
1445     {
1446       GNUNET_break (0);
1447       return NULL;
1448     }
1449   }
1450   else if (sizeof(struct IPv6UdpAddress) == address->address_length)
1451   {
1452     if (NULL == plugin->sockv6)
1453       return NULL;
1454     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1455     if (0 == udp_a6->u6_port)
1456     {
1457       GNUNET_break (0);
1458       return NULL;
1459     }
1460   }
1461   else
1462   {
1463     GNUNET_break (0);
1464     return NULL;
1465   }
1466
1467   /* check if session already exists */
1468   cctx.address = address;
1469   cctx.res = NULL;
1470   LOG (GNUNET_ERROR_TYPE_DEBUG,
1471        "Looking for existing session for peer `%s' with address `%s'\n",
1472        GNUNET_i2s (&address->peer),
1473        udp_address_to_string (plugin,
1474                               address->address,
1475                               address->address_length));
1476   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1477                                               &address->peer,
1478                                               &session_cmp_it,
1479                                               &cctx);
1480   if (NULL == cctx.res)
1481     return NULL;
1482   LOG (GNUNET_ERROR_TYPE_DEBUG,
1483        "Found existing session %p\n",
1484        cctx.res);
1485   return cctx.res;
1486 }
1487
1488
1489 /* ********************** Timeout ****************** */
1490
1491
1492 /**
1493  * Increment session timeout due to activity.
1494  *
1495  * @param s session to reschedule timeout activity for
1496  */
1497 static void
1498 reschedule_session_timeout (struct GNUNET_ATS_Session *s)
1499 {
1500   if (GNUNET_YES == s->in_destroy)
1501     return;
1502   GNUNET_assert (NULL != s->timeout_task);
1503   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1504 }
1505
1506
1507
1508 /**
1509  * Function that will be called whenever the transport service wants to
1510  * notify the plugin that a session is still active and in use and
1511  * therefore the session timeout for this session has to be updated
1512  *
1513  * @param cls closure with the `struct Plugin`
1514  * @param peer which peer was the session for
1515  * @param session which session is being updated
1516  */
1517 static void
1518 udp_plugin_update_session_timeout (void *cls,
1519                                    const struct GNUNET_PeerIdentity *peer,
1520                                    struct GNUNET_ATS_Session *session)
1521 {
1522   struct Plugin *plugin = cls;
1523
1524   if (GNUNET_YES !=
1525       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1526                                                     peer,
1527                                                     session))
1528   {
1529     GNUNET_break (0);
1530     return;
1531   }
1532   /* Reschedule session timeout */
1533   reschedule_session_timeout (session);
1534 }
1535
1536
1537 /* ************************* Sending ************************ */
1538
1539
1540 /**
1541  * Remove the given message from the transmission queue and
1542  * update all applicable statistics.
1543  *
1544  * @param plugin the UDP plugin
1545  * @param udpw message wrapper to dequeue
1546  */
1547 static void
1548 dequeue (struct Plugin *plugin,
1549          struct UDP_MessageWrapper *udpw)
1550 {
1551   struct GNUNET_ATS_Session *session = udpw->session;
1552
1553   if (plugin->bytes_in_buffer < udpw->msg_size)
1554   {
1555     GNUNET_break (0);
1556   }
1557   else
1558   {
1559     GNUNET_STATISTICS_update (plugin->env->stats,
1560                               "# UDP, total bytes in send buffers",
1561                               - (long long) udpw->msg_size,
1562                               GNUNET_NO);
1563     plugin->bytes_in_buffer -= udpw->msg_size;
1564   }
1565   GNUNET_STATISTICS_update (plugin->env->stats,
1566                             "# UDP, total messages in send buffers",
1567                             -1,
1568                             GNUNET_NO);
1569   if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1570   {
1571     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1572                                  plugin->ipv4_queue_tail,
1573                                  udpw);
1574   }
1575   else if (sizeof(struct IPv6UdpAddress) == udpw->session->address->address_length)
1576   {
1577     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1578                                  plugin->ipv6_queue_tail,
1579                                  udpw);
1580   }
1581   else
1582   {
1583     GNUNET_break (0);
1584     return;
1585   }
1586   GNUNET_assert (session->msgs_in_queue > 0);
1587   session->msgs_in_queue--;
1588   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1589   session->bytes_in_queue -= udpw->msg_size;
1590 }
1591
1592
1593 /**
1594  * Enqueue a message for transmission and update statistics.
1595  *
1596  * @param plugin the UDP plugin
1597  * @param udpw message wrapper to queue
1598  */
1599 static void
1600 enqueue (struct Plugin *plugin,
1601          struct UDP_MessageWrapper *udpw)
1602 {
1603   struct GNUNET_ATS_Session *session = udpw->session;
1604
1605   if (GNUNET_YES == session->in_destroy)
1606   {
1607     GNUNET_break (0);
1608     return;
1609   }
1610   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1611   {
1612     GNUNET_break (0);
1613   }
1614   else
1615   {
1616     GNUNET_STATISTICS_update (plugin->env->stats,
1617                               "# UDP, total bytes in send buffers",
1618                               udpw->msg_size,
1619                               GNUNET_NO);
1620     plugin->bytes_in_buffer += udpw->msg_size;
1621   }
1622   GNUNET_STATISTICS_update (plugin->env->stats,
1623                             "# UDP, total messages in send buffers",
1624                             1,
1625                             GNUNET_NO);
1626   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
1627   {
1628     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1629                                 plugin->ipv4_queue_tail,
1630                                 udpw);
1631   }
1632   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
1633   {
1634     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1635                                  plugin->ipv6_queue_tail,
1636                                  udpw);
1637   }
1638   else
1639   {
1640     GNUNET_break (0);
1641     udpw->cont (udpw->cont_cls,
1642                 &session->target,
1643                 GNUNET_SYSERR,
1644                 udpw->msg_size,
1645                 0);
1646     GNUNET_free (udpw);
1647     return;
1648   }
1649   session->msgs_in_queue++;
1650   session->bytes_in_queue += udpw->msg_size;
1651 }
1652
1653
1654 /**
1655  * We have completed our (attempt) to transmit a message that had to
1656  * be fragmented -- either because we got an ACK saying that all
1657  * fragments were received, or because of timeout / disconnect.  Clean
1658  * up our state.
1659  *
1660  * @param frag_ctx fragmentation context to clean up
1661  * @param result #GNUNET_OK if we succeeded (got ACK),
1662  *               #GNUNET_SYSERR if the transmission failed
1663  */
1664 static void
1665 fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
1666                          int result)
1667 {
1668   struct Plugin *plugin = frag_ctx->plugin;
1669   struct GNUNET_ATS_Session *s = frag_ctx->session;
1670   struct UDP_MessageWrapper *udpw;
1671   struct UDP_MessageWrapper *tmp;
1672   size_t overhead;
1673   struct GNUNET_TIME_Relative delay;
1674
1675   LOG (GNUNET_ERROR_TYPE_DEBUG,
1676        "%p: Fragmented message removed with result %s\n",
1677        frag_ctx,
1678        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1679   /* Call continuation for fragmented message */
1680   if (frag_ctx->on_wire_size >= frag_ctx->payload_size)
1681     overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1682   else
1683     overhead = frag_ctx->on_wire_size;
1684   delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
1685   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1686   {
1687     LOG (GNUNET_ERROR_TYPE_WARNING,
1688          "Fragmented message acknowledged after %s\n",
1689          GNUNET_STRINGS_relative_time_to_string (delay,
1690                                                  GNUNET_YES));
1691   }
1692   else
1693   {
1694     LOG (GNUNET_ERROR_TYPE_DEBUG,
1695          "Fragmented message acknowledged after %s\n",
1696          GNUNET_STRINGS_relative_time_to_string (delay,
1697                                                  GNUNET_YES));
1698   }
1699
1700   if (NULL != frag_ctx->cont)
1701     frag_ctx->cont (frag_ctx->cont_cls,
1702                     &s->target,
1703                     result,
1704                     s->frag_ctx->payload_size,
1705                     frag_ctx->on_wire_size);
1706   GNUNET_STATISTICS_update (plugin->env->stats,
1707                             "# UDP, fragmented messages active",
1708                             -1,
1709                             GNUNET_NO);
1710
1711   if (GNUNET_OK == result)
1712   {
1713     GNUNET_STATISTICS_update (plugin->env->stats,
1714                               "# UDP, fragmented msgs, messages, sent, success",
1715                               1,
1716                               GNUNET_NO);
1717     GNUNET_STATISTICS_update (plugin->env->stats,
1718                               "# UDP, fragmented msgs, bytes payload, sent, success",
1719                               s->frag_ctx->payload_size,
1720                               GNUNET_NO);
1721     GNUNET_STATISTICS_update (plugin->env->stats,
1722                               "# UDP, fragmented msgs, bytes overhead, sent, success",
1723                               overhead,
1724                               GNUNET_NO);
1725     GNUNET_STATISTICS_update (plugin->env->stats,
1726                               "# UDP, total, bytes overhead, sent",
1727                               overhead,
1728                               GNUNET_NO);
1729     GNUNET_STATISTICS_update (plugin->env->stats,
1730                               "# UDP, total, bytes payload, sent",
1731                               s->frag_ctx->payload_size,
1732                               GNUNET_NO);
1733   }
1734   else
1735   {
1736     GNUNET_STATISTICS_update (plugin->env->stats,
1737                               "# UDP, fragmented msgs, messages, sent, failure",
1738                               1,
1739                               GNUNET_NO);
1740     GNUNET_STATISTICS_update (plugin->env->stats,
1741                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1742                               s->frag_ctx->payload_size,
1743                               GNUNET_NO);
1744     GNUNET_STATISTICS_update (plugin->env->stats,
1745                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1746                               overhead,
1747                               GNUNET_NO);
1748     GNUNET_STATISTICS_update (plugin->env->stats,
1749                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1750                               overhead,
1751                               GNUNET_NO);
1752   }
1753
1754   /* Remove remaining fragments from queue, no need to transmit those
1755      any longer. */
1756   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1757   {
1758     udpw = plugin->ipv6_queue_head;
1759     while (NULL != udpw)
1760     {
1761       tmp = udpw->next;
1762       if ( (udpw->frag_ctx != NULL) &&
1763            (udpw->frag_ctx == frag_ctx) )
1764       {
1765         dequeue (plugin,
1766                  udpw);
1767         GNUNET_free (udpw);
1768       }
1769       udpw = tmp;
1770     }
1771   }
1772   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1773   {
1774     udpw = plugin->ipv4_queue_head;
1775     while (NULL != udpw)
1776     {
1777       tmp = udpw->next;
1778       if ( (NULL != udpw->frag_ctx) &&
1779            (udpw->frag_ctx == frag_ctx) )
1780       {
1781         dequeue (plugin,
1782                  udpw);
1783         GNUNET_free (udpw);
1784       }
1785       udpw = tmp;
1786     }
1787   }
1788   notify_session_monitor (s->plugin,
1789                           s,
1790                           GNUNET_TRANSPORT_SS_UPDATE);
1791   GNUNET_FRAGMENT_context_destroy (frag_ctx->frag,
1792                                    &s->last_expected_msg_delay,
1793                                    &s->last_expected_ack_delay);
1794   s->frag_ctx = NULL;
1795   GNUNET_free (frag_ctx);
1796 }
1797
1798
1799 /**
1800  * We are finished with a fragment in the message queue.
1801  * Notify the continuation and update statistics.
1802  *
1803  * @param cls the `struct Plugin *`
1804  * @param udpw the queue entry
1805  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1806  */
1807 static void
1808 qc_fragment_sent (void *cls,
1809                   struct UDP_MessageWrapper *udpw,
1810                   int result)
1811 {
1812   struct Plugin *plugin = cls;
1813
1814   GNUNET_assert (NULL != udpw->frag_ctx);
1815   if (GNUNET_OK == result)
1816   {
1817     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1818     GNUNET_STATISTICS_update (plugin->env->stats,
1819                               "# UDP, fragmented msgs, fragments, sent, success",
1820                               1,
1821                               GNUNET_NO);
1822     GNUNET_STATISTICS_update (plugin->env->stats,
1823                               "# UDP, fragmented msgs, fragments bytes, sent, success",
1824                               udpw->msg_size,
1825                               GNUNET_NO);
1826   }
1827   else
1828   {
1829     fragmented_message_done (udpw->frag_ctx,
1830                              GNUNET_SYSERR);
1831     GNUNET_STATISTICS_update (plugin->env->stats,
1832                               "# UDP, fragmented msgs, fragments, sent, failure",
1833                               1,
1834                               GNUNET_NO);
1835     GNUNET_STATISTICS_update (plugin->env->stats,
1836                               "# UDP, fragmented msgs, fragments bytes, sent, failure",
1837                               udpw->msg_size,
1838                               GNUNET_NO);
1839   }
1840 }
1841
1842
1843 /**
1844  * Function that is called with messages created by the fragmentation
1845  * module.  In the case of the `proc` callback of the
1846  * #GNUNET_FRAGMENT_context_create() function, this function must
1847  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1848  *
1849  * @param cls closure, the `struct UDP_FragmentationContext`
1850  * @param msg the message that was created
1851  */
1852 static void
1853 enqueue_fragment (void *cls,
1854                   const struct GNUNET_MessageHeader *msg)
1855 {
1856   struct UDP_FragmentationContext *frag_ctx = cls;
1857   struct Plugin *plugin = frag_ctx->plugin;
1858   struct UDP_MessageWrapper *udpw;
1859   struct GNUNET_ATS_Session *session = frag_ctx->session;
1860   size_t msg_len = ntohs (msg->size);
1861
1862   LOG (GNUNET_ERROR_TYPE_DEBUG,
1863        "Enqueuing fragment with %u bytes\n",
1864        msg_len);
1865   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1866   udpw->session = session;
1867   udpw->msg_buf = (char *) &udpw[1];
1868   udpw->msg_size = msg_len;
1869   udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1870   udpw->timeout = frag_ctx->timeout;
1871   udpw->start_time = frag_ctx->start_time;
1872   udpw->transmission_time = frag_ctx->next_frag_time;
1873   frag_ctx->next_frag_time
1874     = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
1875                                 session->flow_delay_from_other_peer);
1876   udpw->frag_ctx = frag_ctx;
1877   udpw->qc = &qc_fragment_sent;
1878   udpw->qc_cls = plugin;
1879   memcpy (udpw->msg_buf,
1880           msg,
1881           msg_len);
1882   enqueue (plugin,
1883            udpw);
1884 }
1885
1886
1887 /**
1888  * We are finished with a message from the message queue.
1889  * Notify the continuation and update statistics.
1890  *
1891  * @param cls the `struct Plugin *`
1892  * @param udpw the queue entry
1893  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1894  */
1895 static void
1896 qc_message_sent (void *cls,
1897                  struct UDP_MessageWrapper *udpw,
1898                  int result)
1899 {
1900   struct Plugin *plugin = cls;
1901   size_t overhead;
1902   struct GNUNET_TIME_Relative delay;
1903
1904   if (udpw->msg_size >= udpw->payload_size)
1905     overhead = udpw->msg_size - udpw->payload_size;
1906   else
1907     overhead = udpw->msg_size;
1908
1909   if (NULL != udpw->cont)
1910   {
1911     delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
1912     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1913     {
1914       LOG (GNUNET_ERROR_TYPE_WARNING,
1915            "Message sent via UDP with delay of %s\n",
1916            GNUNET_STRINGS_relative_time_to_string (delay,
1917                                                    GNUNET_YES));
1918     }
1919     else
1920     {
1921       LOG (GNUNET_ERROR_TYPE_DEBUG,
1922            "Message sent via UDP with delay of %s\n",
1923            GNUNET_STRINGS_relative_time_to_string (delay,
1924                                                    GNUNET_YES));
1925     }
1926     udpw->cont (udpw->cont_cls,
1927                 &udpw->session->target,
1928                 result,
1929                 udpw->payload_size,
1930                 overhead);
1931   }
1932   if (GNUNET_OK == result)
1933   {
1934     GNUNET_STATISTICS_update (plugin->env->stats,
1935                               "# UDP, unfragmented msgs, messages, sent, success",
1936                               1,
1937                               GNUNET_NO);
1938     GNUNET_STATISTICS_update (plugin->env->stats,
1939                               "# UDP, unfragmented msgs, bytes payload, sent, success",
1940                               udpw->payload_size,
1941                               GNUNET_NO);
1942     GNUNET_STATISTICS_update (plugin->env->stats,
1943                               "# UDP, unfragmented msgs, bytes overhead, sent, success",
1944                               overhead,
1945                               GNUNET_NO);
1946     GNUNET_STATISTICS_update (plugin->env->stats,
1947                               "# UDP, total, bytes overhead, sent",
1948                               overhead,
1949                               GNUNET_NO);
1950     GNUNET_STATISTICS_update (plugin->env->stats,
1951                               "# UDP, total, bytes payload, sent",
1952                               udpw->payload_size,
1953                               GNUNET_NO);
1954   }
1955   else
1956   {
1957     GNUNET_STATISTICS_update (plugin->env->stats,
1958                               "# UDP, unfragmented msgs, messages, sent, failure",
1959                               1,
1960                               GNUNET_NO);
1961     GNUNET_STATISTICS_update (plugin->env->stats,
1962                               "# UDP, unfragmented msgs, bytes payload, sent, failure",
1963                               udpw->payload_size,
1964                               GNUNET_NO);
1965     GNUNET_STATISTICS_update (plugin->env->stats,
1966                               "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1967                               overhead,
1968                               GNUNET_NO);
1969   }
1970 }
1971
1972
1973 /**
1974  * Function that can be used by the transport service to transmit a
1975  * message using the plugin.  Note that in the case of a peer
1976  * disconnecting, the continuation MUST be called prior to the
1977  * disconnect notification itself.  This function will be called with
1978  * this peer's HELLO message to initiate a fresh connection to another
1979  * peer.
1980  *
1981  * @param cls closure
1982  * @param s which session must be used
1983  * @param msgbuf the message to transmit
1984  * @param msgbuf_size number of bytes in @a msgbuf
1985  * @param priority how important is the message (most plugins will
1986  *                 ignore message priority and just FIFO)
1987  * @param to how long to wait at most for the transmission (does not
1988  *                require plugins to discard the message after the timeout,
1989  *                just advisory for the desired delay; most plugins will ignore
1990  *                this as well)
1991  * @param cont continuation to call once the message has
1992  *        been transmitted (or if the transport is ready
1993  *        for the next transmission call; or if the
1994  *        peer disconnected...); can be NULL
1995  * @param cont_cls closure for @a cont
1996  * @return number of bytes used (on the physical network, with overheads);
1997  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1998  *         and does NOT mean that the message was not transmitted (DV)
1999  */
2000 static ssize_t
2001 udp_plugin_send (void *cls,
2002                  struct GNUNET_ATS_Session *s,
2003                  const char *msgbuf,
2004                  size_t msgbuf_size,
2005                  unsigned int priority,
2006                  struct GNUNET_TIME_Relative to,
2007                  GNUNET_TRANSPORT_TransmitContinuation cont,
2008                  void *cont_cls)
2009 {
2010   struct Plugin *plugin = cls;
2011   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
2012   struct UDP_FragmentationContext *frag_ctx;
2013   struct UDP_MessageWrapper *udpw;
2014   struct UDPMessage *udp;
2015   char mbuf[udpmlen] GNUNET_ALIGN;
2016
2017   if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
2018        (NULL == plugin->sockv6) )
2019     return GNUNET_SYSERR;
2020   if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
2021        (NULL == plugin->sockv4) )
2022     return GNUNET_SYSERR;
2023   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2024   {
2025     GNUNET_break (0);
2026     return GNUNET_SYSERR;
2027   }
2028   if (GNUNET_YES !=
2029       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
2030                                                     &s->target,
2031                                                     s))
2032   {
2033     GNUNET_break (0);
2034     return GNUNET_SYSERR;
2035   }
2036   LOG (GNUNET_ERROR_TYPE_DEBUG,
2037        "UDP transmits %u-byte message to `%s' using address `%s'\n",
2038        udpmlen,
2039        GNUNET_i2s (&s->target),
2040        udp_address_to_string (plugin,
2041                               s->address->address,
2042                               s->address->address_length));
2043
2044   udp = (struct UDPMessage *) mbuf;
2045   udp->header.size = htons (udpmlen);
2046   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2047   udp->reserved = htonl (0);
2048   udp->sender = *plugin->env->my_identity;
2049
2050   /* We do not update the session time out here!  Otherwise this
2051    * session will not timeout since we send keep alive before session
2052    * can timeout.
2053    *
2054    * For UDP we update session timeout only on receive, this will
2055    * cover keep alives, since remote peer will reply with keep alive
2056    * responses!
2057    */
2058   if (udpmlen <= UDP_MTU)
2059   {
2060     /* unfragmented message */
2061     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
2062     udpw->session = s;
2063     udpw->msg_buf = (char *) &udpw[1];
2064     udpw->msg_size = udpmlen; /* message size with UDP overhead */
2065     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2066     udpw->start_time = GNUNET_TIME_absolute_get ();
2067     udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
2068     udpw->transmission_time = s->last_transmit_time;
2069     s->last_transmit_time
2070       = GNUNET_TIME_absolute_add (s->last_transmit_time,
2071                                   s->flow_delay_from_other_peer);
2072     udpw->cont = cont;
2073     udpw->cont_cls = cont_cls;
2074     udpw->frag_ctx = NULL;
2075     udpw->qc = &qc_message_sent;
2076     udpw->qc_cls = plugin;
2077     memcpy (udpw->msg_buf,
2078             udp,
2079             sizeof (struct UDPMessage));
2080     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
2081             msgbuf,
2082             msgbuf_size);
2083     enqueue (plugin,
2084              udpw);
2085     GNUNET_STATISTICS_update (plugin->env->stats,
2086                               "# UDP, unfragmented messages queued total",
2087                               1,
2088                               GNUNET_NO);
2089     GNUNET_STATISTICS_update (plugin->env->stats,
2090                               "# UDP, unfragmented bytes payload queued total",
2091                               msgbuf_size,
2092                               GNUNET_NO);
2093   }
2094   else
2095   {
2096     /* fragmented message */
2097     if (NULL != s->frag_ctx)
2098       return GNUNET_SYSERR;
2099     memcpy (&udp[1],
2100             msgbuf,
2101             msgbuf_size);
2102     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2103     frag_ctx->plugin = plugin;
2104     frag_ctx->session = s;
2105     frag_ctx->cont = cont;
2106     frag_ctx->cont_cls = cont_cls;
2107     frag_ctx->start_time = GNUNET_TIME_absolute_get ();
2108     frag_ctx->next_frag_time = s->last_transmit_time;
2109     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
2110     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
2111     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2112     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2113                                                      UDP_MTU,
2114                                                      &plugin->tracker,
2115                                                      s->last_expected_msg_delay,
2116                                                      s->last_expected_ack_delay,
2117                                                      &udp->header,
2118                                                      &enqueue_fragment,
2119                                                      frag_ctx);
2120     s->frag_ctx = frag_ctx;
2121     s->last_transmit_time = frag_ctx->next_frag_time;
2122     GNUNET_STATISTICS_update (plugin->env->stats,
2123                               "# UDP, fragmented messages active",
2124                               1,
2125                               GNUNET_NO);
2126     GNUNET_STATISTICS_update (plugin->env->stats,
2127                               "# UDP, fragmented messages, total",
2128                               1,
2129                               GNUNET_NO);
2130     GNUNET_STATISTICS_update (plugin->env->stats,
2131                               "# UDP, fragmented bytes (payload)",
2132                               frag_ctx->payload_size,
2133                               GNUNET_NO);
2134   }
2135   notify_session_monitor (s->plugin,
2136                           s,
2137                           GNUNET_TRANSPORT_SS_UPDATE);
2138   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2139     schedule_select_v4 (plugin);
2140   else
2141     schedule_select_v6 (plugin);
2142   return udpmlen;
2143 }
2144
2145
2146 /**
2147  * Handle an ACK message.
2148  *
2149  * @param plugin the UDP plugin
2150  * @param msg the (presumed) UDP ACK message
2151  * @param udp_addr sender address
2152  * @param udp_addr_len number of bytes in @a udp_addr
2153  */
2154 static void
2155 read_process_ack (struct Plugin *plugin,
2156                   const struct GNUNET_MessageHeader *msg,
2157                   const union UdpAddress *udp_addr,
2158                   socklen_t udp_addr_len)
2159 {
2160   const struct GNUNET_MessageHeader *ack;
2161   const struct UDP_ACK_Message *udp_ack;
2162   struct GNUNET_HELLO_Address *address;
2163   struct GNUNET_ATS_Session *s;
2164   struct GNUNET_TIME_Relative flow_delay;
2165
2166   if (ntohs (msg->size)
2167       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2168   {
2169     GNUNET_break_op (0);
2170     return;
2171   }
2172   udp_ack = (const struct UDP_ACK_Message *) msg;
2173   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2174   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2175   {
2176     GNUNET_break_op(0);
2177     return;
2178   }
2179   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2180                                            PLUGIN_NAME,
2181                                            udp_addr,
2182                                            udp_addr_len,
2183                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2184   s = udp_plugin_lookup_session (plugin,
2185                                  address);
2186   if (NULL == s)
2187   {
2188     LOG (GNUNET_ERROR_TYPE_WARNING,
2189          "UDP session of address %s for ACK not found\n",
2190          udp_address_to_string (plugin,
2191                                 address->address,
2192                                 address->address_length));
2193     GNUNET_HELLO_address_free (address);
2194     return;
2195   }
2196   if (NULL == s->frag_ctx)
2197   {
2198     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2199          "Fragmentation context of address %s for ACK (%s) not found\n",
2200          udp_address_to_string (plugin,
2201                                 address->address,
2202                                 address->address_length),
2203          GNUNET_FRAGMENT_print_ack (ack));
2204     GNUNET_HELLO_address_free (address);
2205     return;
2206   }
2207   GNUNET_HELLO_address_free (address);
2208
2209   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2210   if (flow_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
2211     LOG (GNUNET_ERROR_TYPE_WARNING,
2212          "We received a sending delay of %s for %s\n",
2213          GNUNET_STRINGS_relative_time_to_string (flow_delay,
2214                                                  GNUNET_YES),
2215          GNUNET_i2s (&udp_ack->sender));
2216   else
2217     LOG (GNUNET_ERROR_TYPE_DEBUG,
2218          "We received a sending delay of %s for %s\n",
2219          GNUNET_STRINGS_relative_time_to_string (flow_delay,
2220                                                  GNUNET_YES),
2221          GNUNET_i2s (&udp_ack->sender));
2222   /* Flow delay is for the reassembled packet, however, our delay
2223      is per packet, so we need to adjust: */
2224   flow_delay = GNUNET_TIME_relative_divide (flow_delay,
2225                                             1 + (s->frag_ctx->payload_size /
2226                                                  UDP_MTU));
2227   s->flow_delay_from_other_peer = flow_delay;
2228
2229
2230   if (GNUNET_OK !=
2231       GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2232                                    ack))
2233   {
2234     LOG (GNUNET_ERROR_TYPE_DEBUG,
2235          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2236          (unsigned int) ntohs (msg->size),
2237          GNUNET_i2s (&udp_ack->sender),
2238          udp_address_to_string (plugin,
2239                                 udp_addr,
2240                                 udp_addr_len));
2241     /* Expect more ACKs to arrive */
2242     return;
2243   }
2244
2245   LOG (GNUNET_ERROR_TYPE_DEBUG,
2246        "Message from %s at %s full ACK'ed\n",
2247        GNUNET_i2s (&udp_ack->sender),
2248        udp_address_to_string (plugin,
2249                               udp_addr,
2250                               udp_addr_len));
2251
2252   /* Remove fragmented message after successful sending */
2253   fragmented_message_done (s->frag_ctx,
2254                            GNUNET_OK);
2255 }
2256
2257
2258 /* ********************** Receiving ********************** */
2259
2260
2261 /**
2262  * Closure for #find_receive_context().
2263  */
2264 struct FindReceiveContext
2265 {
2266   /**
2267    * Where to store the result.
2268    */
2269   struct DefragContext *rc;
2270
2271   /**
2272    * Session associated with this context.
2273    */
2274   struct GNUNET_ATS_Session *session;
2275
2276   /**
2277    * Address to find.
2278    */
2279   const union UdpAddress *udp_addr;
2280
2281   /**
2282    * Number of bytes in @e udp_addr.
2283    */
2284   size_t udp_addr_len;
2285
2286 };
2287
2288
2289 /**
2290  * Scan the heap for a receive context with the given address.
2291  *
2292  * @param cls the `struct FindReceiveContext`
2293  * @param node internal node of the heap
2294  * @param element value stored at the node (a `struct ReceiveContext`)
2295  * @param cost cost associated with the node
2296  * @return #GNUNET_YES if we should continue to iterate,
2297  *         #GNUNET_NO if not.
2298  */
2299 static int
2300 find_receive_context (void *cls,
2301                       struct GNUNET_CONTAINER_HeapNode *node,
2302                       void *element,
2303                       GNUNET_CONTAINER_HeapCostType cost)
2304 {
2305   struct FindReceiveContext *frc = cls;
2306   struct DefragContext *e = element;
2307
2308   if ( (frc->udp_addr_len == e->udp_addr_len) &&
2309        (0 == memcmp (frc->udp_addr,
2310                      e->udp_addr,
2311                      frc->udp_addr_len)) )
2312   {
2313     frc->rc = e;
2314     return GNUNET_NO;
2315   }
2316   return GNUNET_YES;
2317 }
2318
2319
2320 /**
2321  * Message tokenizer has broken up an incomming message. Pass it on
2322  * to the service.
2323  *
2324  * @param cls the `struct Plugin *`
2325  * @param client the `struct GNUNET_ATS_Session *`
2326  * @param hdr the actual message
2327  * @return #GNUNET_OK (always)
2328  */
2329 static int
2330 process_inbound_tokenized_messages (void *cls,
2331                                     void *client,
2332                                     const struct GNUNET_MessageHeader *hdr)
2333 {
2334   struct Plugin *plugin = cls;
2335   struct GNUNET_ATS_Session *session = client;
2336
2337   if (GNUNET_YES == session->in_destroy)
2338     return GNUNET_OK;
2339   reschedule_session_timeout (session);
2340   session->flow_delay_for_other_peer
2341     = plugin->env->receive (plugin->env->cls,
2342                             session->address,
2343                             session,
2344                             hdr);
2345   return GNUNET_OK;
2346 }
2347
2348
2349 /**
2350  * Functions with this signature are called whenever we need to close
2351  * a session due to a disconnect or failure to establish a connection.
2352  *
2353  * @param cls closure with the `struct Plugin`
2354  * @param s session to close down
2355  * @return #GNUNET_OK on success
2356  */
2357 static int
2358 udp_disconnect_session (void *cls,
2359                         struct GNUNET_ATS_Session *s)
2360 {
2361   struct Plugin *plugin = cls;
2362   struct UDP_MessageWrapper *udpw;
2363   struct UDP_MessageWrapper *next;
2364   struct FindReceiveContext frc;
2365
2366   GNUNET_assert (GNUNET_YES != s->in_destroy);
2367   LOG (GNUNET_ERROR_TYPE_DEBUG,
2368        "Session %p to peer `%s' at address %s ended\n",
2369        s,
2370        GNUNET_i2s (&s->target),
2371        udp_address_to_string (plugin,
2372                               s->address->address,
2373                               s->address->address_length));
2374   if (NULL != s->timeout_task)
2375   {
2376     GNUNET_SCHEDULER_cancel (s->timeout_task);
2377     s->timeout_task = NULL;
2378   }
2379   if (NULL != s->frag_ctx)
2380   {
2381     /* Remove fragmented message due to disconnect */
2382     fragmented_message_done (s->frag_ctx,
2383                              GNUNET_SYSERR);
2384   }
2385   GNUNET_assert (GNUNET_YES ==
2386                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
2387                                                        &s->target,
2388                                                        s));
2389   frc.rc = NULL;
2390   frc.udp_addr = s->address->address;
2391   frc.udp_addr_len = s->address->address_length;
2392   /* Lookup existing receive context for this address */
2393   if (NULL != plugin->defrag_ctxs)
2394   {
2395     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2396                                    &find_receive_context,
2397                                    &frc);
2398     if (NULL != frc.rc)
2399     {
2400       struct DefragContext *d_ctx = frc.rc;
2401
2402       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
2403       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2404       GNUNET_free (d_ctx);
2405     }
2406   }
2407   s->in_destroy = GNUNET_YES;
2408   next = plugin->ipv4_queue_head;
2409   while (NULL != (udpw = next))
2410   {
2411     next = udpw->next;
2412     if (udpw->session == s)
2413     {
2414       dequeue (plugin,
2415                udpw);
2416       udpw->qc (udpw->qc_cls,
2417                 udpw,
2418                 GNUNET_SYSERR);
2419       GNUNET_free (udpw);
2420     }
2421   }
2422   next = plugin->ipv6_queue_head;
2423   while (NULL != (udpw = next))
2424   {
2425     next = udpw->next;
2426     if (udpw->session == s)
2427     {
2428       dequeue (plugin,
2429                udpw);
2430       udpw->qc (udpw->qc_cls,
2431                 udpw,
2432                 GNUNET_SYSERR);
2433       GNUNET_free (udpw);
2434     }
2435   }
2436   if ( (NULL != s->frag_ctx) &&
2437        (NULL != s->frag_ctx->cont) )
2438   {
2439     /* The 'frag_ctx' itself will be freed in #free_session() a bit
2440        later, as it might be in use right now */
2441     LOG (GNUNET_ERROR_TYPE_DEBUG,
2442          "Calling continuation for fragemented message to `%s' with result SYSERR\n",
2443          GNUNET_i2s (&s->target));
2444     s->frag_ctx->cont (s->frag_ctx->cont_cls,
2445                        &s->target,
2446                        GNUNET_SYSERR,
2447                        s->frag_ctx->payload_size,
2448                        s->frag_ctx->on_wire_size);
2449   }
2450   notify_session_monitor (s->plugin,
2451                           s,
2452                           GNUNET_TRANSPORT_SS_DONE);
2453   plugin->env->session_end (plugin->env->cls,
2454                             s->address,
2455                             s);
2456   GNUNET_STATISTICS_set (plugin->env->stats,
2457                          "# UDP sessions active",
2458                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2459                          GNUNET_NO);
2460   if (0 == s->rc)
2461     free_session (s);
2462   return GNUNET_OK;
2463 }
2464
2465
2466 /**
2467  * Destroy a session, plugin is being unloaded.
2468  *
2469  * @param cls the `struct Plugin`
2470  * @param key hash of public key of target peer
2471  * @param value a `struct PeerSession *` to clean up
2472  * @return #GNUNET_OK (continue to iterate)
2473  */
2474 static int
2475 disconnect_and_free_it (void *cls,
2476                         const struct GNUNET_PeerIdentity *key,
2477                         void *value)
2478 {
2479   struct Plugin *plugin = cls;
2480
2481   udp_disconnect_session (plugin,
2482                           value);
2483   return GNUNET_OK;
2484 }
2485
2486
2487 /**
2488  * Disconnect from a remote node.  Clean up session if we have one for
2489  * this peer.
2490  *
2491  * @param cls closure for this call (should be handle to Plugin)
2492  * @param target the peeridentity of the peer to disconnect
2493  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
2494  */
2495 static void
2496 udp_disconnect (void *cls,
2497                 const struct GNUNET_PeerIdentity *target)
2498 {
2499   struct Plugin *plugin = cls;
2500
2501   LOG (GNUNET_ERROR_TYPE_DEBUG,
2502        "Disconnecting from peer `%s'\n",
2503        GNUNET_i2s (target));
2504   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
2505                                               target,
2506                                               &disconnect_and_free_it,
2507                                               plugin);
2508 }
2509
2510
2511 /**
2512  * Session was idle, so disconnect it.
2513  *
2514  * @param cls the `struct GNUNET_ATS_Session` to time out
2515  * @param tc scheduler context
2516  */
2517 static void
2518 session_timeout (void *cls,
2519                  const struct GNUNET_SCHEDULER_TaskContext *tc)
2520 {
2521   struct GNUNET_ATS_Session *s = cls;
2522   struct Plugin *plugin = s->plugin;
2523   struct GNUNET_TIME_Relative left;
2524
2525   s->timeout_task = NULL;
2526   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
2527   if (left.rel_value_us > 0)
2528   {
2529     /* not actually our turn yet, but let's at least update
2530        the monitor, it may think we're about to die ... */
2531     notify_session_monitor (s->plugin,
2532                             s,
2533                             GNUNET_TRANSPORT_SS_UPDATE);
2534     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
2535                                                     &session_timeout,
2536                                                     s);
2537     return;
2538   }
2539   LOG (GNUNET_ERROR_TYPE_DEBUG,
2540        "Session %p was idle for %s, disconnecting\n",
2541        s,
2542        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
2543                                                GNUNET_YES));
2544   /* call session destroy function */
2545   udp_disconnect_session (plugin,
2546                           s);
2547 }
2548
2549
2550 /**
2551  * Allocate a new session for the given endpoint address.
2552  * Note that this function does not inform the service
2553  * of the new session, this is the responsibility of the
2554  * caller (if needed).
2555  *
2556  * @param cls the `struct Plugin`
2557  * @param address address of the other peer to use
2558  * @param network_type network type the address belongs to
2559  * @return NULL on error, otherwise session handle
2560  */
2561 static struct GNUNET_ATS_Session *
2562 udp_plugin_create_session (void *cls,
2563                            const struct GNUNET_HELLO_Address *address,
2564                            enum GNUNET_ATS_Network_Type network_type)
2565 {
2566   struct Plugin *plugin = cls;
2567   struct GNUNET_ATS_Session *s;
2568
2569   s = GNUNET_new (struct GNUNET_ATS_Session);
2570   s->plugin = plugin;
2571   s->address = GNUNET_HELLO_address_copy (address);
2572   s->target = address->peer;
2573   s->last_transmit_time = GNUNET_TIME_absolute_get ();
2574   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2575                                                               250);
2576   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
2577   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
2578   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
2579   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
2580   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
2581                                                   &session_timeout,
2582                                                   s);
2583   s->scope = network_type;
2584
2585   LOG (GNUNET_ERROR_TYPE_DEBUG,
2586        "Creating new session %p for peer `%s' address `%s'\n",
2587        s,
2588        GNUNET_i2s (&address->peer),
2589        udp_address_to_string (plugin,
2590                               address->address,
2591                               address->address_length));
2592   GNUNET_assert (GNUNET_OK ==
2593                  GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
2594                                                     &s->target,
2595                                                     s,
2596                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2597   GNUNET_STATISTICS_set (plugin->env->stats,
2598                          "# UDP sessions active",
2599                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2600                          GNUNET_NO);
2601   notify_session_monitor (plugin,
2602                           s,
2603                           GNUNET_TRANSPORT_SS_INIT);
2604   return s;
2605 }
2606
2607
2608 /**
2609  * Creates a new outbound session the transport service will use to
2610  * send data to the peer.
2611  *
2612  * @param cls the `struct Plugin *`
2613  * @param address the address
2614  * @return the session or NULL of max connections exceeded
2615  */
2616 static struct GNUNET_ATS_Session *
2617 udp_plugin_get_session (void *cls,
2618                         const struct GNUNET_HELLO_Address *address)
2619 {
2620   struct Plugin *plugin = cls;
2621   struct GNUNET_ATS_Session *s;
2622   enum GNUNET_ATS_Network_Type network_type = GNUNET_ATS_NET_UNSPECIFIED;
2623   const struct IPv4UdpAddress *udp_v4;
2624   const struct IPv6UdpAddress *udp_v6;
2625
2626   if (NULL == address)
2627   {
2628     GNUNET_break (0);
2629     return NULL;
2630   }
2631   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
2632        (address->address_length != sizeof(struct IPv6UdpAddress)) )
2633   {
2634     GNUNET_break_op (0);
2635     return NULL;
2636   }
2637   if (NULL != (s = udp_plugin_lookup_session (cls,
2638                                               address)))
2639     return s;
2640
2641   /* need to create new session */
2642   if (sizeof (struct IPv4UdpAddress) == address->address_length)
2643   {
2644     struct sockaddr_in v4;
2645
2646     udp_v4 = (const struct IPv4UdpAddress *) address->address;
2647     memset (&v4, '\0', sizeof (v4));
2648     v4.sin_family = AF_INET;
2649 #if HAVE_SOCKADDR_IN_SIN_LEN
2650     v4.sin_len = sizeof (struct sockaddr_in);
2651 #endif
2652     v4.sin_port = udp_v4->u4_port;
2653     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
2654     network_type = plugin->env->get_address_type (plugin->env->cls,
2655                                                   (const struct sockaddr *) &v4,
2656                                                   sizeof (v4));
2657   }
2658   if (sizeof (struct IPv6UdpAddress) == address->address_length)
2659   {
2660     struct sockaddr_in6 v6;
2661
2662     udp_v6 = (const struct IPv6UdpAddress *) address->address;
2663     memset (&v6, '\0', sizeof (v6));
2664     v6.sin6_family = AF_INET6;
2665 #if HAVE_SOCKADDR_IN_SIN_LEN
2666     v6.sin6_len = sizeof (struct sockaddr_in6);
2667 #endif
2668     v6.sin6_port = udp_v6->u6_port;
2669     v6.sin6_addr = udp_v6->ipv6_addr;
2670     network_type = plugin->env->get_address_type (plugin->env->cls,
2671                                                   (const struct sockaddr *) &v6,
2672                                                   sizeof (v6));
2673   }
2674   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2675   return udp_plugin_create_session (cls,
2676                                     address,
2677                                     network_type);
2678 }
2679
2680
2681 /**
2682  * We've received a UDP Message.  Process it (pass contents to main service).
2683  *
2684  * @param plugin plugin context
2685  * @param msg the message
2686  * @param udp_addr sender address
2687  * @param udp_addr_len number of bytes in @a udp_addr
2688  * @param network_type network type the address belongs to
2689  */
2690 static void
2691 process_udp_message (struct Plugin *plugin,
2692                      const struct UDPMessage *msg,
2693                      const union UdpAddress *udp_addr,
2694                      size_t udp_addr_len,
2695                      enum GNUNET_ATS_Network_Type network_type)
2696 {
2697   struct GNUNET_ATS_Session *s;
2698   struct GNUNET_HELLO_Address *address;
2699
2700   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2701   if (0 != ntohl (msg->reserved))
2702   {
2703     GNUNET_break_op(0);
2704     return;
2705   }
2706   if (ntohs (msg->header.size)
2707       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2708   {
2709     GNUNET_break_op(0);
2710     return;
2711   }
2712
2713   address = GNUNET_HELLO_address_allocate (&msg->sender,
2714                                            PLUGIN_NAME,
2715                                            udp_addr,
2716                                            udp_addr_len,
2717                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2718   if (NULL ==
2719       (s = udp_plugin_lookup_session (plugin,
2720                                       address)))
2721   {
2722     s = udp_plugin_create_session (plugin,
2723                                    address,
2724                                    network_type);
2725     plugin->env->session_start (plugin->env->cls,
2726                                 address,
2727                                 s,
2728                                 s->scope);
2729     notify_session_monitor (plugin,
2730                             s,
2731                             GNUNET_TRANSPORT_SS_UP);
2732   }
2733   GNUNET_free (address);
2734
2735   s->rc++;
2736   GNUNET_SERVER_mst_receive (plugin->mst,
2737                              s,
2738                              (const char *) &msg[1],
2739                              ntohs (msg->header.size) - sizeof(struct UDPMessage),
2740                              GNUNET_YES,
2741                              GNUNET_NO);
2742   s->rc--;
2743   if ( (0 == s->rc) &&
2744        (GNUNET_YES == s->in_destroy) )
2745     free_session (s);
2746 }
2747
2748
2749 /**
2750  * Process a defragmented message.
2751  *
2752  * @param cls the `struct DefragContext *`
2753  * @param msg the message
2754  */
2755 static void
2756 fragment_msg_proc (void *cls,
2757                    const struct GNUNET_MessageHeader *msg)
2758 {
2759   struct DefragContext *dc = cls;
2760   const struct UDPMessage *um;
2761
2762   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2763   {
2764     GNUNET_break_op (0);
2765     return;
2766   }
2767   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2768   {
2769     GNUNET_break_op (0);
2770     return;
2771   }
2772   um = (const struct UDPMessage *) msg;
2773   dc->sender = um->sender;
2774   dc->have_sender = GNUNET_YES;
2775   process_udp_message (dc->plugin,
2776                        um,
2777                        dc->udp_addr,
2778                        dc->udp_addr_len,
2779                        dc->network_type);
2780 }
2781
2782
2783 /**
2784  * We finished sending an acknowledgement.  Update
2785  * statistics.
2786  *
2787  * @param cls the `struct Plugin`
2788  * @param udpw message queue entry of the ACK
2789  * @param result #GNUNET_OK if the transmission worked,
2790  *               #GNUNET_SYSERR if we failed to send the ACK
2791  */
2792 static void
2793 ack_message_sent (void *cls,
2794                   struct UDP_MessageWrapper *udpw,
2795                   int result)
2796 {
2797   struct Plugin *plugin = cls;
2798
2799   if (GNUNET_OK == result)
2800   {
2801     GNUNET_STATISTICS_update (plugin->env->stats,
2802                               "# UDP, ACK messages sent",
2803                               1,
2804                               GNUNET_NO);
2805   }
2806   else
2807   {
2808     GNUNET_STATISTICS_update (plugin->env->stats,
2809                               "# UDP, ACK transmissions failed",
2810                               1,
2811                               GNUNET_NO);
2812   }
2813 }
2814
2815
2816 /**
2817  * Transmit an acknowledgement.
2818  *
2819  * @param cls the `struct DefragContext *`
2820  * @param id message ID (unused)
2821  * @param msg ack to transmit
2822  */
2823 static void
2824 ack_proc (void *cls,
2825           uint32_t id,
2826           const struct GNUNET_MessageHeader *msg)
2827 {
2828   struct DefragContext *rc = cls;
2829   struct Plugin *plugin = rc->plugin;
2830   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2831   struct UDP_ACK_Message *udp_ack;
2832   uint32_t delay;
2833   struct UDP_MessageWrapper *udpw;
2834   struct GNUNET_ATS_Session *s;
2835   struct GNUNET_HELLO_Address *address;
2836
2837   if (GNUNET_NO == rc->have_sender)
2838   {
2839     /* tried to defragment but never succeeded, hence will not ACK */
2840     /* This can happen if we just lost msgs */
2841     GNUNET_STATISTICS_update (plugin->env->stats,
2842                               "# UDP, fragments discarded without ACK",
2843                               1,
2844                               GNUNET_NO);
2845     return;
2846   }
2847   address = GNUNET_HELLO_address_allocate (&rc->sender,
2848                                            PLUGIN_NAME,
2849                                            rc->udp_addr,
2850                                            rc->udp_addr_len,
2851                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2852   s = udp_plugin_lookup_session (plugin,
2853                                  address);
2854   GNUNET_HELLO_address_free (address);
2855   if (NULL == s)
2856   {
2857     LOG (GNUNET_ERROR_TYPE_ERROR,
2858          "Trying to transmit ACK to peer `%s' but no session found!\n",
2859          udp_address_to_string (plugin,
2860                                 rc->udp_addr,
2861                                 rc->udp_addr_len));
2862     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2863     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2864     GNUNET_free (rc);
2865     GNUNET_STATISTICS_update (plugin->env->stats,
2866                               "# UDP, ACK transmissions failed",
2867                               1,
2868                               GNUNET_NO);
2869     return;
2870   }
2871   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
2872     delay = s->flow_delay_for_other_peer.rel_value_us;
2873   else
2874     delay = UINT32_MAX;
2875   LOG (GNUNET_ERROR_TYPE_DEBUG,
2876        "Sending ACK to `%s' including delay of %s\n",
2877        udp_address_to_string (plugin,
2878                               rc->udp_addr,
2879                               rc->udp_addr_len),
2880        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2881                                                GNUNET_YES));
2882   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2883   udpw->msg_size = msize;
2884   udpw->payload_size = 0;
2885   udpw->session = s;
2886   udpw->start_time = GNUNET_TIME_absolute_get ();
2887   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2888   udpw->msg_buf = (char *) &udpw[1];
2889   udpw->qc = &ack_message_sent;
2890   udpw->qc_cls = plugin;
2891   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2892   udp_ack->header.size = htons ((uint16_t) msize);
2893   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2894   udp_ack->delay = htonl (delay);
2895   udp_ack->sender = *plugin->env->my_identity;
2896   memcpy (&udp_ack[1],
2897           msg,
2898           ntohs (msg->size));
2899   enqueue (plugin,
2900            udpw);
2901   notify_session_monitor (plugin,
2902                           s,
2903                           GNUNET_TRANSPORT_SS_UPDATE);
2904   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2905     schedule_select_v4 (plugin);
2906   else
2907     schedule_select_v6 (plugin);
2908 }
2909
2910
2911 /**
2912  * We received a fragment, process it.
2913  *
2914  * @param plugin our plugin
2915  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2916  * @param udp_addr sender address
2917  * @param udp_addr_len number of bytes in @a udp_addr
2918  * @param network_type network type the address belongs to
2919  */
2920 static void
2921 read_process_fragment (struct Plugin *plugin,
2922                        const struct GNUNET_MessageHeader *msg,
2923                        const union UdpAddress *udp_addr,
2924                        size_t udp_addr_len,
2925                        enum GNUNET_ATS_Network_Type network_type)
2926 {
2927   struct DefragContext *d_ctx;
2928   struct GNUNET_TIME_Absolute now;
2929   struct FindReceiveContext frc;
2930
2931   frc.rc = NULL;
2932   frc.udp_addr = udp_addr;
2933   frc.udp_addr_len = udp_addr_len;
2934
2935   /* Lookup existing receive context for this address */
2936   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2937                                  &find_receive_context,
2938                                  &frc);
2939   now = GNUNET_TIME_absolute_get ();
2940   d_ctx = frc.rc;
2941
2942   if (NULL == d_ctx)
2943   {
2944     /* Create a new defragmentation context */
2945     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
2946     memcpy (&d_ctx[1],
2947             udp_addr,
2948             udp_addr_len);
2949     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2950     d_ctx->udp_addr_len = udp_addr_len;
2951     d_ctx->network_type = network_type;
2952     d_ctx->plugin = plugin;
2953     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2954                                                       UDP_MTU,
2955                                                       UDP_MAX_MESSAGES_IN_DEFRAG,
2956                                                       d_ctx,
2957                                                       &fragment_msg_proc,
2958                                                       &ack_proc);
2959     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2960                                                  d_ctx,
2961                                                  (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2962     LOG (GNUNET_ERROR_TYPE_DEBUG,
2963          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2964          (unsigned int) ntohs (msg->size),
2965          udp_address_to_string (plugin,
2966                                 udp_addr,
2967                                 udp_addr_len));
2968   }
2969   else
2970   {
2971     LOG (GNUNET_ERROR_TYPE_DEBUG,
2972          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2973          (unsigned int) ntohs (msg->size),
2974          udp_address_to_string (plugin,
2975                                 udp_addr,
2976                                 udp_addr_len));
2977   }
2978
2979   if (GNUNET_OK ==
2980       GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag,
2981                                           msg))
2982   {
2983     /* keep this 'rc' from expiring */
2984     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
2985                                        d_ctx->hnode,
2986                                        (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2987   }
2988   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2989       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2990   {
2991     /* remove 'rc' that was inactive the longest */
2992     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2993     GNUNET_assert (NULL != d_ctx);
2994     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2995     GNUNET_free (d_ctx);
2996     GNUNET_STATISTICS_update (plugin->env->stats,
2997                               "# UDP, Defragmentations aborted",
2998                               1,
2999                               GNUNET_NO);
3000   }
3001 }
3002
3003
3004 /**
3005  * Read and process a message from the given socket.
3006  *
3007  * @param plugin the overall plugin
3008  * @param rsock socket to read from
3009  */
3010 static void
3011 udp_select_read (struct Plugin *plugin,
3012                  struct GNUNET_NETWORK_Handle *rsock)
3013 {
3014   socklen_t fromlen;
3015   struct sockaddr_storage addr;
3016   char buf[65536] GNUNET_ALIGN;
3017   ssize_t size;
3018   const struct GNUNET_MessageHeader *msg;
3019   struct IPv4UdpAddress v4;
3020   struct IPv6UdpAddress v6;
3021   const struct sockaddr *sa;
3022   const struct sockaddr_in *sa4;
3023   const struct sockaddr_in6 *sa6;
3024   const union UdpAddress *int_addr;
3025   size_t int_addr_len;
3026   enum GNUNET_ATS_Network_Type network_type;
3027
3028   fromlen = sizeof (addr);
3029   memset (&addr,
3030           0,
3031           sizeof(addr));
3032   size = GNUNET_NETWORK_socket_recvfrom (rsock,
3033                                          buf,
3034                                          sizeof(buf),
3035                                          (struct sockaddr *) &addr,
3036                                          &fromlen);
3037   sa = (const struct sockaddr *) &addr;
3038 #if MINGW
3039   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
3040    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
3041    * on this socket has failed.
3042    * Quote from MSDN:
3043    *   WSAECONNRESET - The virtual circuit was reset by the remote side
3044    *   executing a hard or abortive close. The application should close
3045    *   the socket; it is no longer usable. On a UDP-datagram socket this
3046    *   error indicates a previous send operation resulted in an ICMP Port
3047    *   Unreachable message.
3048    */
3049   if ( (-1 == size) &&
3050        (ECONNRESET == errno) )
3051     return;
3052 #endif
3053   if (-1 == size)
3054   {
3055     LOG (GNUNET_ERROR_TYPE_DEBUG,
3056          "UDP failed to receive data: %s\n",
3057          STRERROR (errno));
3058     /* Connection failure or something. Not a protocol violation. */
3059     return;
3060   }
3061
3062   /* Check if this is a STUN packet */
3063   if (GNUNET_NAT_is_valid_stun_packet (plugin->nat,
3064                                        (uint8_t *)buf,
3065                                        size))
3066     return; /* was STUN, do not process further */
3067
3068   if (size < sizeof(struct GNUNET_MessageHeader))
3069   {
3070     LOG (GNUNET_ERROR_TYPE_WARNING,
3071          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
3072          (unsigned int ) size,
3073          GNUNET_a2s (sa,
3074                      fromlen));
3075     /* _MAY_ be a connection failure (got partial message) */
3076     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
3077     GNUNET_break_op (0);
3078     return;
3079   }
3080
3081
3082
3083
3084   msg = (const struct GNUNET_MessageHeader *) buf;
3085   LOG (GNUNET_ERROR_TYPE_DEBUG,
3086        "UDP received %u-byte message from `%s' type %u\n",
3087        (unsigned int) size,
3088        GNUNET_a2s (sa,
3089                    fromlen),
3090        ntohs (msg->type));
3091   if (size != ntohs (msg->size))
3092   {
3093     LOG (GNUNET_ERROR_TYPE_WARNING,
3094          "UDP malformed message header from %s\n",
3095          (unsigned int) size,
3096          GNUNET_a2s (sa,
3097                      fromlen));
3098     GNUNET_break_op (0);
3099     return;
3100   }
3101   GNUNET_STATISTICS_update (plugin->env->stats,
3102                             "# UDP, total bytes received",
3103                             size,
3104                             GNUNET_NO);
3105   network_type = plugin->env->get_address_type (plugin->env->cls,
3106                                                 sa,
3107                                                 fromlen);
3108   switch (sa->sa_family)
3109   {
3110   case AF_INET:
3111     sa4 = (const struct sockaddr_in *) &addr;
3112     v4.options = 0;
3113     v4.ipv4_addr = sa4->sin_addr.s_addr;
3114     v4.u4_port = sa4->sin_port;
3115     int_addr = (union UdpAddress *) &v4;
3116     int_addr_len = sizeof (v4);
3117     break;
3118   case AF_INET6:
3119     sa6 = (const struct sockaddr_in6 *) &addr;
3120     v6.options = 0;
3121     v6.ipv6_addr = sa6->sin6_addr;
3122     v6.u6_port = sa6->sin6_port;
3123     int_addr = (union UdpAddress *) &v6;
3124     int_addr_len = sizeof (v6);
3125     break;
3126   default:
3127     GNUNET_break (0);
3128     return;
3129   }
3130
3131   switch (ntohs (msg->type))
3132   {
3133   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
3134     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
3135       udp_broadcast_receive (plugin,
3136                              buf,
3137                              size,
3138                              int_addr,
3139                              int_addr_len,
3140                              network_type);
3141     return;
3142   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
3143     if (ntohs (msg->size) < sizeof(struct UDPMessage))
3144     {
3145       GNUNET_break_op(0);
3146       return;
3147     }
3148     process_udp_message (plugin,
3149                          (const struct UDPMessage *) msg,
3150                          int_addr,
3151                          int_addr_len,
3152                          network_type);
3153     return;
3154   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
3155     read_process_ack (plugin,
3156                       msg,
3157                       int_addr,
3158                       int_addr_len);
3159     return;
3160   case GNUNET_MESSAGE_TYPE_FRAGMENT:
3161     read_process_fragment (plugin,
3162                            msg,
3163                            int_addr,
3164                            int_addr_len,
3165                            network_type);
3166     return;
3167   default:
3168     GNUNET_break_op(0);
3169     return;
3170   }
3171 }
3172
3173
3174 /**
3175  * Removes messages from the transmission queue that have
3176  * timed out, and then selects a message that should be
3177  * transmitted next.
3178  *
3179  * @param plugin the UDP plugin
3180  * @param sock which socket should we process the queue for (v4 or v6)
3181  * @return message selected for transmission, or NULL for none
3182  */
3183 static struct UDP_MessageWrapper *
3184 remove_timeout_messages_and_select (struct Plugin *plugin,
3185                                     struct GNUNET_NETWORK_Handle *sock)
3186 {
3187   struct UDP_MessageWrapper *udpw;
3188   struct GNUNET_TIME_Relative remaining;
3189   struct GNUNET_ATS_Session *session;
3190   int removed;
3191
3192   removed = GNUNET_NO;
3193   udpw = (sock == plugin->sockv4)
3194     ? plugin->ipv4_queue_head
3195     : plugin->ipv6_queue_head;
3196   while (NULL != udpw)
3197   {
3198     session = udpw->session;
3199     /* Find messages with timeout */
3200     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
3201     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
3202     {
3203       /* Message timed out */
3204       removed = GNUNET_YES;
3205       dequeue (plugin,
3206                udpw);
3207       udpw->qc (udpw->qc_cls,
3208                 udpw,
3209                 GNUNET_SYSERR);
3210       GNUNET_free (udpw);
3211
3212       if (sock == plugin->sockv4)
3213       {
3214         udpw = plugin->ipv4_queue_head;
3215       }
3216       else if (sock == plugin->sockv6)
3217       {
3218         udpw = plugin->ipv6_queue_head;
3219       }
3220       else
3221       {
3222         GNUNET_break (0); /* should never happen */
3223         udpw = NULL;
3224       }
3225       GNUNET_STATISTICS_update (plugin->env->stats,
3226                                 "# messages discarded due to timeout",
3227                                 1,
3228                                 GNUNET_NO);
3229     }
3230     else
3231     {
3232       /* Message did not time out, check transmission time */
3233       remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
3234       if (0 == remaining.rel_value_us)
3235       {
3236         /* this message is not delayed */
3237         LOG (GNUNET_ERROR_TYPE_DEBUG,
3238              "Message for peer `%s' (%u bytes) is not delayed \n",
3239              GNUNET_i2s (&udpw->session->target),
3240              udpw->payload_size);
3241         break; /* Found message to send, break */
3242       }
3243       else
3244       {
3245         /* Message is delayed, try next */
3246         LOG (GNUNET_ERROR_TYPE_DEBUG,
3247              "Message for peer `%s' (%u bytes) is delayed for %s\n",
3248              GNUNET_i2s (&udpw->session->target),
3249              udpw->payload_size,
3250              GNUNET_STRINGS_relative_time_to_string (remaining,
3251                                                      GNUNET_YES));
3252         udpw = udpw->next;
3253       }
3254     }
3255   }
3256   if (GNUNET_YES == removed)
3257     notify_session_monitor (session->plugin,
3258                             session,
3259                             GNUNET_TRANSPORT_SS_UPDATE);
3260   return udpw;
3261 }
3262
3263
3264 /**
3265  * We failed to transmit a message via UDP. Generate
3266  * a descriptive error message.
3267  *
3268  * @param plugin our plugin
3269  * @param sa target address we were trying to reach
3270  * @param slen number of bytes in @a sa
3271  * @param error the errno value returned from the sendto() call
3272  */
3273 static void
3274 analyze_send_error (struct Plugin *plugin,
3275                     const struct sockaddr *sa,
3276                     socklen_t slen,
3277                     int error)
3278 {
3279   enum GNUNET_ATS_Network_Type type;
3280
3281   type = plugin->env->get_address_type (plugin->env->cls,
3282                                         sa,
3283                                         slen);
3284   if ( ( (GNUNET_ATS_NET_LAN == type) ||
3285          (GNUNET_ATS_NET_WAN == type) ) &&
3286        ( (ENETUNREACH == errno) ||
3287          (ENETDOWN == errno) ) )
3288   {
3289     if (slen == sizeof (struct sockaddr_in))
3290     {
3291       /* IPv4: "Network unreachable" or "Network down"
3292        *
3293        * This indicates we do not have connectivity
3294        */
3295       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3296            _("UDP could not transmit message to `%s': "
3297              "Network seems down, please check your network configuration\n"),
3298            GNUNET_a2s (sa,
3299                        slen));
3300     }
3301     if (slen == sizeof (struct sockaddr_in6))
3302     {
3303       /* IPv6: "Network unreachable" or "Network down"
3304        *
3305        * This indicates that this system is IPv6 enabled, but does not
3306        * have a valid global IPv6 address assigned or we do not have
3307        * connectivity
3308        */
3309       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3310            _("UDP could not transmit IPv6 message! "
3311              "Please check your network configuration and disable IPv6 if your "
3312              "connection does not have a global IPv6 address\n"));
3313     }
3314   }
3315   else
3316   {
3317     LOG (GNUNET_ERROR_TYPE_WARNING,
3318          "UDP could not transmit message to `%s': `%s'\n",
3319          GNUNET_a2s (sa,
3320                      slen),
3321          STRERROR (error));
3322   }
3323 }
3324
3325
3326 /**
3327  * It is time to try to transmit a UDP message.  Select one
3328  * and send.
3329  *
3330  * @param plugin the plugin
3331  * @param sock which socket (v4/v6) to send on
3332  */
3333 static void
3334 udp_select_send (struct Plugin *plugin,
3335                  struct GNUNET_NETWORK_Handle *sock)
3336 {
3337   ssize_t sent;
3338   socklen_t slen;
3339   const struct sockaddr *a;
3340   const struct IPv4UdpAddress *u4;
3341   struct sockaddr_in a4;
3342   const struct IPv6UdpAddress *u6;
3343   struct sockaddr_in6 a6;
3344   struct UDP_MessageWrapper *udpw;
3345
3346   /* Find message(s) to send */
3347   while (NULL != (udpw = remove_timeout_messages_and_select (plugin,
3348                                                              sock)))
3349   {
3350     if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3351     {
3352       u4 = udpw->session->address->address;
3353       memset (&a4,
3354               0,
3355               sizeof(a4));
3356       a4.sin_family = AF_INET;
3357 #if HAVE_SOCKADDR_IN_SIN_LEN
3358       a4.sin_len = sizeof (a4);
3359 #endif
3360       a4.sin_port = u4->u4_port;
3361       a4.sin_addr.s_addr = u4->ipv4_addr;
3362       a = (const struct sockaddr *) &a4;
3363       slen = sizeof (a4);
3364     }
3365     else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3366     {
3367       u6 = udpw->session->address->address;
3368       memset (&a6,
3369               0,
3370               sizeof(a6));
3371       a6.sin6_family = AF_INET6;
3372 #if HAVE_SOCKADDR_IN_SIN_LEN
3373       a6.sin6_len = sizeof (a6);
3374 #endif
3375       a6.sin6_port = u6->u6_port;
3376       a6.sin6_addr = u6->ipv6_addr;
3377       a = (const struct sockaddr *) &a6;
3378       slen = sizeof (a6);
3379     }
3380     else
3381     {
3382       GNUNET_break (0);
3383       dequeue (plugin,
3384                udpw);
3385       udpw->qc (udpw->qc_cls,
3386                 udpw,
3387                 GNUNET_SYSERR);
3388       notify_session_monitor (plugin,
3389                               udpw->session,
3390                               GNUNET_TRANSPORT_SS_UPDATE);
3391       GNUNET_free (udpw);
3392       continue;
3393     }
3394     sent = GNUNET_NETWORK_socket_sendto (sock,
3395                                          udpw->msg_buf,
3396                                          udpw->msg_size,
3397                                          a,
3398                                          slen);
3399     udpw->session->last_transmit_time
3400       = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
3401                                   udpw->session->last_transmit_time);
3402     dequeue (plugin,
3403              udpw);
3404     if (GNUNET_SYSERR == sent)
3405     {
3406       /* Failure */
3407       analyze_send_error (plugin,
3408                           a,
3409                           slen,
3410                           errno);
3411       udpw->qc (udpw->qc_cls,
3412                 udpw,
3413                 GNUNET_SYSERR);
3414       GNUNET_STATISTICS_update (plugin->env->stats,
3415                                 "# UDP, total, bytes, sent, failure",
3416                                 sent,
3417                                 GNUNET_NO);
3418       GNUNET_STATISTICS_update (plugin->env->stats,
3419                                 "# UDP, total, messages, sent, failure",
3420                                 1,
3421                                 GNUNET_NO);
3422     }
3423     else
3424     {
3425       /* Success */
3426       LOG (GNUNET_ERROR_TYPE_DEBUG,
3427            "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3428            (unsigned int) (udpw->msg_size),
3429            GNUNET_i2s (&udpw->session->target),
3430            GNUNET_a2s (a,
3431                        slen),
3432            (int ) sent,
3433            (sent < 0) ? STRERROR (errno) : "ok");
3434       GNUNET_STATISTICS_update (plugin->env->stats,
3435                                 "# UDP, total, bytes, sent, success",
3436                                 sent,
3437                                 GNUNET_NO);
3438       GNUNET_STATISTICS_update (plugin->env->stats,
3439                                 "# UDP, total, messages, sent, success",
3440                                 1,
3441                                 GNUNET_NO);
3442       if (NULL != udpw->frag_ctx)
3443         udpw->frag_ctx->on_wire_size += udpw->msg_size;
3444       udpw->qc (udpw->qc_cls,
3445                 udpw,
3446                 GNUNET_OK);
3447     }
3448     notify_session_monitor (plugin,
3449                             udpw->session,
3450                             GNUNET_TRANSPORT_SS_UPDATE);
3451     GNUNET_free (udpw);
3452   }
3453 }
3454
3455
3456 /* ***************** Event loop (part 2) *************** */
3457
3458
3459 /**
3460  * We have been notified that our readset has something to read.  We don't
3461  * know which socket needs to be read, so we have to check each one
3462  * Then reschedule this function to be called again once more is available.
3463  *
3464  * @param cls the plugin handle
3465  * @param tc the scheduling context
3466  */
3467 static void
3468 udp_plugin_select_v4 (void *cls,
3469                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3470 {
3471   struct Plugin *plugin = cls;
3472
3473   plugin->select_task_v4 = NULL;
3474   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3475     return;
3476   if (NULL == plugin->sockv4)
3477     return;
3478   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3479       (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3480                                    plugin->sockv4)))
3481     udp_select_read (plugin,
3482                      plugin->sockv4);
3483   udp_select_send (plugin,
3484                    plugin->sockv4);
3485   schedule_select_v4 (plugin);
3486 }
3487
3488
3489 /**
3490  * We have been notified that our readset has something to read.  We don't
3491  * know which socket needs to be read, so we have to check each one
3492  * Then reschedule this function to be called again once more is available.
3493  *
3494  * @param cls the plugin handle
3495  * @param tc the scheduling context
3496  */
3497 static void
3498 udp_plugin_select_v6 (void *cls,
3499                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3500 {
3501   struct Plugin *plugin = cls;
3502
3503   plugin->select_task_v6 = NULL;
3504   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3505     return;
3506   if (NULL == plugin->sockv6)
3507     return;
3508   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3509        (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3510                                     plugin->sockv6)) )
3511     udp_select_read (plugin,
3512                      plugin->sockv6);
3513
3514   udp_select_send (plugin,
3515                    plugin->sockv6);
3516   schedule_select_v6 (plugin);
3517 }
3518
3519
3520 /* ******************* Initialization *************** */
3521
3522
3523 /**
3524  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3525  *
3526  * @param plugin the plugin to initialize
3527  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3528  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3529  * @return number of sockets that were successfully bound
3530  */
3531 static int
3532 setup_sockets (struct Plugin *plugin,
3533                const struct sockaddr_in6 *bind_v6,
3534                const struct sockaddr_in *bind_v4)
3535 {
3536   int tries;
3537   int sockets_created = 0;
3538   struct sockaddr_in6 server_addrv6;
3539   struct sockaddr_in server_addrv4;
3540   const struct sockaddr *server_addr;
3541   const struct sockaddr *addrs[2];
3542   socklen_t addrlens[2];
3543   socklen_t addrlen;
3544   int eno;
3545
3546   /* Create IPv6 socket */
3547   eno = EINVAL;
3548   if (GNUNET_YES == plugin->enable_ipv6)
3549   {
3550     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6,
3551                                                    SOCK_DGRAM,
3552                                                    0);
3553     if (NULL == plugin->sockv6)
3554     {
3555       LOG (GNUNET_ERROR_TYPE_INFO,
3556            _("Disabling IPv6 since it is not supported on this system!\n"));
3557       plugin->enable_ipv6 = GNUNET_NO;
3558     }
3559     else
3560     {
3561       memset (&server_addrv6,
3562               0,
3563               sizeof(struct sockaddr_in6));
3564 #if HAVE_SOCKADDR_IN_SIN_LEN
3565       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3566 #endif
3567       server_addrv6.sin6_family = AF_INET6;
3568       if (NULL != bind_v6)
3569         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3570       else
3571         server_addrv6.sin6_addr = in6addr_any;
3572
3573       if (0 == plugin->port) /* autodetect */
3574         server_addrv6.sin6_port
3575           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3576                                              33537)
3577                    + 32000);
3578       else
3579         server_addrv6.sin6_port = htons (plugin->port);
3580       addrlen = sizeof (struct sockaddr_in6);
3581       server_addr = (const struct sockaddr *) &server_addrv6;
3582
3583       tries = 0;
3584       while (tries < 10)
3585       {
3586         LOG(GNUNET_ERROR_TYPE_DEBUG,
3587             "Binding to IPv6 `%s'\n",
3588             GNUNET_a2s (server_addr,
3589                         addrlen));
3590         /* binding */
3591         if (GNUNET_OK ==
3592             GNUNET_NETWORK_socket_bind (plugin->sockv6,
3593                                         server_addr,
3594                                         addrlen))
3595           break;
3596         eno = errno;
3597         if (0 != plugin->port)
3598         {
3599           tries = 10; /* fail immediately */
3600           break; /* bind failed on specific port */
3601         }
3602         /* autodetect */
3603         server_addrv6.sin6_port
3604           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3605                                              33537)
3606                    + 32000);
3607         tries++;
3608       }
3609       if (tries >= 10)
3610       {
3611         GNUNET_NETWORK_socket_close (plugin->sockv6);
3612         plugin->enable_ipv6 = GNUNET_NO;
3613         plugin->sockv6 = NULL;
3614       }
3615       else
3616       {
3617         plugin->port = ntohs (server_addrv6.sin6_port);
3618       }
3619       if (NULL != plugin->sockv6)
3620       {
3621         LOG (GNUNET_ERROR_TYPE_DEBUG,
3622              "IPv6 UDP socket created listinging at %s\n",
3623              GNUNET_a2s (server_addr,
3624                          addrlen));
3625         addrs[sockets_created] = server_addr;
3626         addrlens[sockets_created] = addrlen;
3627         sockets_created++;
3628       }
3629       else
3630       {
3631         LOG (GNUNET_ERROR_TYPE_WARNING,
3632              _("Failed to bind UDP socket to %s: %s\n"),
3633              GNUNET_a2s (server_addr,
3634                          addrlen),
3635              STRERROR (eno));
3636       }
3637     }
3638   }
3639
3640   /* Create IPv4 socket */
3641   eno = EINVAL;
3642   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET,
3643                                                  SOCK_DGRAM,
3644                                                  0);
3645   if (NULL == plugin->sockv4)
3646   {
3647     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3648                          "socket");
3649     LOG (GNUNET_ERROR_TYPE_INFO,
3650          _("Disabling IPv4 since it is not supported on this system!\n"));
3651     plugin->enable_ipv4 = GNUNET_NO;
3652   }
3653   else
3654   {
3655     memset (&server_addrv4,
3656             0,
3657             sizeof(struct sockaddr_in));
3658 #if HAVE_SOCKADDR_IN_SIN_LEN
3659     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3660 #endif
3661     server_addrv4.sin_family = AF_INET;
3662     if (NULL != bind_v4)
3663       server_addrv4.sin_addr = bind_v4->sin_addr;
3664     else
3665       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3666
3667     if (0 == plugin->port)
3668       /* autodetect */
3669       server_addrv4.sin_port
3670         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3671                                            33537)
3672                  + 32000);
3673     else
3674       server_addrv4.sin_port = htons (plugin->port);
3675
3676     addrlen = sizeof (struct sockaddr_in);
3677     server_addr = (const struct sockaddr *) &server_addrv4;
3678
3679     tries = 0;
3680     while (tries < 10)
3681     {
3682       LOG (GNUNET_ERROR_TYPE_DEBUG,
3683            "Binding to IPv4 `%s'\n",
3684            GNUNET_a2s (server_addr,
3685                        addrlen));
3686
3687       /* binding */
3688       if (GNUNET_OK ==
3689           GNUNET_NETWORK_socket_bind (plugin->sockv4,
3690                                       server_addr,
3691                                       addrlen))
3692         break;
3693       eno = errno;
3694       if (0 != plugin->port)
3695       {
3696         tries = 10; /* fail */
3697         break; /* bind failed on specific port */
3698       }
3699
3700       /* autodetect */
3701       server_addrv4.sin_port
3702         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3703                                            33537)
3704                  + 32000);
3705       tries++;
3706     }
3707     if (tries >= 10)
3708     {
3709       GNUNET_NETWORK_socket_close (plugin->sockv4);
3710       plugin->enable_ipv4 = GNUNET_NO;
3711       plugin->sockv4 = NULL;
3712     }
3713     else
3714     {
3715       plugin->port = ntohs (server_addrv4.sin_port);
3716     }
3717
3718     if (NULL != plugin->sockv4)
3719     {
3720       LOG (GNUNET_ERROR_TYPE_DEBUG,
3721            "IPv4 socket created on port %s\n",
3722            GNUNET_a2s (server_addr,
3723                        addrlen));
3724       addrs[sockets_created] = server_addr;
3725       addrlens[sockets_created] = addrlen;
3726       sockets_created++;
3727     }
3728     else
3729     {
3730       LOG (GNUNET_ERROR_TYPE_ERROR,
3731            _("Failed to bind UDP socket to %s: %s\n"),
3732            GNUNET_a2s (server_addr,
3733                        addrlen),
3734            STRERROR (eno));
3735     }
3736   }
3737
3738   if (0 == sockets_created)
3739   {
3740     LOG (GNUNET_ERROR_TYPE_WARNING,
3741          _("Failed to open UDP sockets\n"));
3742     return 0; /* No sockets created, return */
3743   }
3744   schedule_select_v4 (plugin);
3745   schedule_select_v6 (plugin);
3746   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3747                                      GNUNET_NO,
3748                                      plugin->port,
3749                                      sockets_created,
3750                                      addrs,
3751                                      addrlens,
3752                                      &udp_nat_port_map_callback,
3753                                      NULL,
3754                                      plugin,
3755                                      plugin->sockv4);
3756   return sockets_created;
3757 }
3758
3759
3760 /**
3761  * The exported method. Makes the core api available via a global and
3762  * returns the udp transport API.
3763  *
3764  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3765  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3766  */
3767 void *
3768 libgnunet_plugin_transport_udp_init (void *cls)
3769 {
3770   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3771   struct GNUNET_TRANSPORT_PluginFunctions *api;
3772   struct Plugin *p;
3773   unsigned long long port;
3774   unsigned long long aport;
3775   unsigned long long udp_max_bps;
3776   unsigned long long enable_v6;
3777   unsigned long long enable_broadcasting;
3778   unsigned long long enable_broadcasting_recv;
3779   char *bind4_address;
3780   char *bind6_address;
3781   struct GNUNET_TIME_Relative interval;
3782   struct sockaddr_in server_addrv4;
3783   struct sockaddr_in6 server_addrv6;
3784   int res;
3785   int have_bind4;
3786   int have_bind6;
3787
3788   if (NULL == env->receive)
3789   {
3790     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3791      initialze the plugin or the API */
3792     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3793     api->cls = NULL;
3794     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3795     api->address_to_string = &udp_address_to_string;
3796     api->string_to_address = &udp_string_to_address;
3797     return api;
3798   }
3799
3800   /* Get port number: port == 0 : autodetect a port,
3801    * > 0 : use this port, not given : 2086 default */
3802   if (GNUNET_OK !=
3803       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3804                                              "transport-udp",
3805                                              "PORT",
3806                                              &port))
3807     port = 2086;
3808   if (port > 65535)
3809   {
3810     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3811                                "transport-udp",
3812                                "PORT",
3813                                _("must be in [0,65535]"));
3814     return NULL;
3815   }
3816   if (GNUNET_OK !=
3817       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3818                                              "transport-udp",
3819                                              "ADVERTISED_PORT",
3820                                              &aport))
3821     aport = port;
3822   if (aport > 65535)
3823   {
3824     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3825                                "transport-udp",
3826                                "ADVERTISED_PORT",
3827                                _("must be in [0,65535]"));
3828     return NULL;
3829   }
3830
3831   if (GNUNET_YES ==
3832       GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3833                                             "nat",
3834                                             "DISABLEV6"))
3835     enable_v6 = GNUNET_NO;
3836   else
3837     enable_v6 = GNUNET_YES;
3838
3839   have_bind4 = GNUNET_NO;
3840   memset (&server_addrv4,
3841           0,
3842           sizeof (server_addrv4));
3843   if (GNUNET_YES ==
3844       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3845                                              "transport-udp",
3846                                              "BINDTO",
3847                                              &bind4_address))
3848   {
3849     LOG (GNUNET_ERROR_TYPE_DEBUG,
3850          "Binding UDP plugin to specific address: `%s'\n",
3851          bind4_address);
3852     if (1 != inet_pton (AF_INET,
3853                         bind4_address,
3854                         &server_addrv4.sin_addr))
3855     {
3856       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3857                                  "transport-udp",
3858                                  "BINDTO",
3859                                  _("must be valid IPv4 address"));
3860       GNUNET_free (bind4_address);
3861       return NULL;
3862     }
3863     have_bind4 = GNUNET_YES;
3864   }
3865   GNUNET_free_non_null (bind4_address);
3866   have_bind6 = GNUNET_NO;
3867   memset (&server_addrv6,
3868           0,
3869           sizeof (server_addrv6));
3870   if (GNUNET_YES ==
3871       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3872                                              "transport-udp",
3873                                              "BINDTO6",
3874                                              &bind6_address))
3875   {
3876     LOG (GNUNET_ERROR_TYPE_DEBUG,
3877          "Binding udp plugin to specific address: `%s'\n",
3878          bind6_address);
3879     if (1 != inet_pton (AF_INET6,
3880                         bind6_address,
3881                         &server_addrv6.sin6_addr))
3882     {
3883       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3884                                  "transport-udp",
3885                                  "BINDTO6",
3886                                  _("must be valid IPv6 address"));
3887       GNUNET_free (bind6_address);
3888       return NULL;
3889     }
3890     have_bind6 = GNUNET_YES;
3891   }
3892   GNUNET_free_non_null (bind6_address);
3893
3894   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3895                                                               "transport-udp",
3896                                                               "BROADCAST");
3897   if (enable_broadcasting == GNUNET_SYSERR)
3898     enable_broadcasting = GNUNET_NO;
3899
3900   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3901                                                                    "transport-udp",
3902                                                                    "BROADCAST_RECEIVE");
3903   if (enable_broadcasting_recv == GNUNET_SYSERR)
3904     enable_broadcasting_recv = GNUNET_YES;
3905
3906   if (GNUNET_SYSERR ==
3907       GNUNET_CONFIGURATION_get_value_time (env->cfg,
3908                                            "transport-udp",
3909                                            "BROADCAST_INTERVAL",
3910                                            &interval))
3911   {
3912     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
3913                                               10);
3914   }
3915   if (GNUNET_OK !=
3916       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3917                                              "transport-udp",
3918                                              "MAX_BPS",
3919                                              &udp_max_bps))
3920   {
3921     /* 50 MB/s == infinity for practical purposes */
3922     udp_max_bps = 1024 * 1024 * 50;
3923   }
3924
3925   p = GNUNET_new (struct Plugin);
3926   p->port = port;
3927   p->aport = aport;
3928   p->broadcast_interval = interval;
3929   p->enable_ipv6 = enable_v6;
3930   p->enable_ipv4 = GNUNET_YES; /* default */
3931   p->enable_broadcasting = enable_broadcasting;
3932   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3933   p->env = env;
3934   p->sessions = GNUNET_CONTAINER_multipeermap_create (16,
3935                                                       GNUNET_NO);
3936   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3937   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
3938                                      p);
3939   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3940                                  NULL,
3941                                  NULL,
3942                                  GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
3943                                  30);
3944   res = setup_sockets (p,
3945                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3946                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3947   if ( (0 == res) ||
3948        ( (NULL == p->sockv4) &&
3949          (NULL == p->sockv6) ) )
3950   {
3951     LOG (GNUNET_ERROR_TYPE_ERROR,
3952         _("Failed to create UDP network sockets\n"));
3953     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3954     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3955     GNUNET_SERVER_mst_destroy (p->mst);
3956     GNUNET_free (p);
3957     return NULL;
3958   }
3959
3960   /* Setup broadcasting and receiving beacons */
3961   setup_broadcast (p,
3962                    &server_addrv6,
3963                    &server_addrv4);
3964
3965   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3966   api->cls = p;
3967   api->disconnect_session = &udp_disconnect_session;
3968   api->query_keepalive_factor = &udp_query_keepalive_factor;
3969   api->disconnect_peer = &udp_disconnect;
3970   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3971   api->address_to_string = &udp_address_to_string;
3972   api->string_to_address = &udp_string_to_address;
3973   api->check_address = &udp_plugin_check_address;
3974   api->get_session = &udp_plugin_get_session;
3975   api->send = &udp_plugin_send;
3976   api->get_network = &udp_plugin_get_network;
3977   api->get_network_for_address = &udp_plugin_get_network_for_address;
3978   api->update_session_timeout = &udp_plugin_update_session_timeout;
3979   api->setup_monitor = &udp_plugin_setup_monitor;
3980   return api;
3981 }
3982
3983
3984 /**
3985  * Function called on each entry in the defragmentation heap to
3986  * clean it up.
3987  *
3988  * @param cls NULL
3989  * @param node node in the heap (to be removed)
3990  * @param element a `struct DefragContext` to be cleaned up
3991  * @param cost unused
3992  * @return #GNUNET_YES
3993  */
3994 static int
3995 heap_cleanup_iterator (void *cls,
3996                        struct GNUNET_CONTAINER_HeapNode *node,
3997                        void *element,
3998                        GNUNET_CONTAINER_HeapCostType cost)
3999 {
4000   struct DefragContext *d_ctx = element;
4001
4002   GNUNET_CONTAINER_heap_remove_node (node);
4003   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
4004   GNUNET_free (d_ctx);
4005   return GNUNET_YES;
4006 }
4007
4008
4009 /**
4010  * The exported method. Makes the core api available via a global and
4011  * returns the udp transport API.
4012  *
4013  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
4014  * @return NULL
4015  */
4016 void *
4017 libgnunet_plugin_transport_udp_done (void *cls)
4018 {
4019   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
4020   struct Plugin *plugin = api->cls;
4021   struct PrettyPrinterContext *cur;
4022   struct UDP_MessageWrapper *udpw;
4023
4024   if (NULL == plugin)
4025   {
4026     GNUNET_free (api);
4027     return NULL;
4028   }
4029   stop_broadcast (plugin);
4030   if (NULL != plugin->select_task_v4)
4031   {
4032     GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
4033     plugin->select_task_v4 = NULL;
4034   }
4035   if (NULL != plugin->select_task_v6)
4036   {
4037     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
4038     plugin->select_task_v6 = NULL;
4039   }
4040   if (NULL != plugin->sockv4)
4041   {
4042     GNUNET_break (GNUNET_OK ==
4043                   GNUNET_NETWORK_socket_close (plugin->sockv4));
4044     plugin->sockv4 = NULL;
4045   }
4046   if (NULL != plugin->sockv6)
4047   {
4048     GNUNET_break (GNUNET_OK ==
4049                   GNUNET_NETWORK_socket_close (plugin->sockv6));
4050     plugin->sockv6 = NULL;
4051   }
4052   if (NULL != plugin->nat)
4053   {
4054     GNUNET_NAT_unregister (plugin->nat);
4055     plugin->nat = NULL;
4056   }
4057   if (NULL != plugin->defrag_ctxs)
4058   {
4059     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
4060                                    &heap_cleanup_iterator,
4061                                    NULL);
4062     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
4063     plugin->defrag_ctxs = NULL;
4064   }
4065   if (NULL != plugin->mst)
4066   {
4067     GNUNET_SERVER_mst_destroy (plugin->mst);
4068     plugin->mst = NULL;
4069   }
4070   while (NULL != (udpw = plugin->ipv4_queue_head))
4071   {
4072     dequeue (plugin,
4073              udpw);
4074     udpw->qc (udpw->qc_cls,
4075               udpw,
4076               GNUNET_SYSERR);
4077     GNUNET_free (udpw);
4078   }
4079   while (NULL != (udpw = plugin->ipv6_queue_head))
4080   {
4081     dequeue (plugin,
4082              udpw);
4083     udpw->qc (udpw->qc_cls,
4084               udpw,
4085               GNUNET_SYSERR);
4086     GNUNET_free (udpw);
4087   }
4088   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
4089                                          &disconnect_and_free_it,
4090                                          plugin);
4091   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
4092
4093   while (NULL != (cur = plugin->ppc_dll_head))
4094   {
4095     GNUNET_break (0);
4096     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
4097                                  plugin->ppc_dll_tail,
4098                                  cur);
4099     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
4100     if (NULL != cur->timeout_task)
4101     {
4102       GNUNET_SCHEDULER_cancel (cur->timeout_task);
4103       cur->timeout_task = NULL;
4104     }
4105     GNUNET_free (cur);
4106   }
4107   GNUNET_free (plugin);
4108   GNUNET_free (api);
4109   return NULL;
4110 }
4111
4112 /* end of plugin_transport_udp.c */