-only schedule select once per fragmented message
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  Boston, MA 02110-1301, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 /**
46  * After how much inactivity should a UDP session time out?
47  */
48 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
49
50 /**
51  * Number of messages we can defragment in parallel.  We only really
52  * defragment 1 message at a time, but if messages get re-ordered, we
53  * may want to keep knowledge about the previous message to avoid
54  * discarding the current message in favor of a single fragment of a
55  * previous message.  3 should be good since we don't expect massive
56  * message reorderings with UDP.
57  */
58 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
59
60 /**
61  * We keep a defragmentation queue per sender address.  How many
62  * sender addresses do we support at the same time? Memory consumption
63  * is roughly a factor of 32k * #UDP_MAX_MESSAGES_IN_DEFRAG times this
64  * value. (So 128 corresponds to 12 MB and should suffice for
65  * connecting to roughly 128 peers via UDP).
66  */
67 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
68
69
70 /**
71  * UDP Message-Packet header (after defragmentation).
72  */
73 struct UDPMessage
74 {
75   /**
76    * Message header.
77    */
78   struct GNUNET_MessageHeader header;
79
80   /**
81    * Always zero for now.
82    */
83   uint32_t reserved;
84
85   /**
86    * What is the identity of the sender
87    */
88   struct GNUNET_PeerIdentity sender;
89
90 };
91
92
93 /**
94  * Closure for #append_port().
95  */
96 struct PrettyPrinterContext
97 {
98   /**
99    * DLL
100    */
101   struct PrettyPrinterContext *next;
102
103   /**
104    * DLL
105    */
106   struct PrettyPrinterContext *prev;
107
108   /**
109    * Our plugin.
110    */
111   struct Plugin *plugin;
112
113   /**
114    * Resolver handle
115    */
116   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
117
118   /**
119    * Function to call with the result.
120    */
121   GNUNET_TRANSPORT_AddressStringCallback asc;
122
123   /**
124    * Clsoure for @e asc.
125    */
126   void *asc_cls;
127
128   /**
129    * Timeout task
130    */
131   struct GNUNET_SCHEDULER_Task *timeout_task;
132
133   /**
134    * Is this an IPv6 address?
135    */
136   int ipv6;
137
138   /**
139    * Options
140    */
141   uint32_t options;
142
143   /**
144    * Port to add after the IP address.
145    */
146   uint16_t port;
147
148 };
149
150
151 /**
152  * Session with another peer.
153  */
154 struct GNUNET_ATS_Session
155 {
156   /**
157    * Which peer is this session for?
158    */
159   struct GNUNET_PeerIdentity target;
160
161   /**
162    * Plugin this session belongs to.
163    */
164   struct Plugin *plugin;
165
166   /**
167    * Context for dealing with fragments.
168    */
169   struct UDP_FragmentationContext *frag_ctx;
170
171   /**
172    * Desired delay for next sending we send to other peer
173    */
174   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
175
176   /**
177    * Desired delay for transmissions we received from other peer.
178    * Adjusted to be per fragment (UDP_MTU), even though on the
179    * wire it was for "full messages".
180    */
181   struct GNUNET_TIME_Relative flow_delay_from_other_peer;
182
183   /**
184    * Session timeout task
185    */
186   struct GNUNET_SCHEDULER_Task *timeout_task;
187
188   /**
189    * When does this session time out?
190    */
191   struct GNUNET_TIME_Absolute timeout;
192
193   /**
194    * What time did we last transmit?
195    */
196   struct GNUNET_TIME_Absolute last_transmit_time;
197
198   /**
199    * expected delay for ACKs
200    */
201   struct GNUNET_TIME_Relative last_expected_ack_delay;
202
203   /**
204    * desired delay between UDP messages
205    */
206   struct GNUNET_TIME_Relative last_expected_msg_delay;
207
208   /**
209    * Our own address.
210    */
211   struct GNUNET_HELLO_Address *address;
212
213   /**
214    * Number of bytes waiting for transmission to this peer.
215    */
216   unsigned long long bytes_in_queue;
217
218   /**
219    * Number of messages waiting for transmission to this peer.
220    */
221   unsigned int msgs_in_queue;
222
223   /**
224    * Reference counter to indicate that this session is
225    * currently being used and must not be destroyed;
226    * setting @e in_destroy will destroy it as soon as
227    * possible.
228    */
229   unsigned int rc;
230
231   /**
232    * Network type of the address.
233    */
234   enum GNUNET_ATS_Network_Type scope;
235
236   /**
237    * Is this session about to be destroyed (sometimes we cannot
238    * destroy a session immediately as below us on the stack
239    * there might be code that still uses it; in this case,
240    * @e rc is non-zero).
241    */
242   int in_destroy;
243 };
244
245
246
247 /**
248  * Data structure to track defragmentation contexts based
249  * on the source of the UDP traffic.
250  */
251 struct DefragContext
252 {
253
254   /**
255    * Defragmentation context.
256    */
257   struct GNUNET_DEFRAGMENT_Context *defrag;
258
259   /**
260    * Reference to master plugin struct.
261    */
262   struct Plugin *plugin;
263
264   /**
265    * Node in the defrag heap.
266    */
267   struct GNUNET_CONTAINER_HeapNode *hnode;
268
269   /**
270    * Source address this receive context is for (allocated at the
271    * end of the struct).
272    */
273   const union UdpAddress *udp_addr;
274
275   /**
276    * Who's message(s) are we defragmenting here?
277    * Only initialized once we succeeded and
278    * @e have_sender is set.
279    */
280   struct GNUNET_PeerIdentity sender;
281
282   /**
283    * Length of @e udp_addr.
284    */
285   size_t udp_addr_len;
286
287   /**
288    * Network type the address belongs to.
289    */
290   enum GNUNET_ATS_Network_Type network_type;
291
292   /**
293    * Has the @e sender field been initialized yet?
294    */
295   int have_sender;
296 };
297
298
299 /**
300  * Context to send fragmented messages
301  */
302 struct UDP_FragmentationContext
303 {
304   /**
305    * Next in linked list
306    */
307   struct UDP_FragmentationContext *next;
308
309   /**
310    * Previous in linked list
311    */
312   struct UDP_FragmentationContext *prev;
313
314   /**
315    * The plugin
316    */
317   struct Plugin *plugin;
318
319   /**
320    * Handle for fragmentation.
321    */
322   struct GNUNET_FRAGMENT_Context *frag;
323
324   /**
325    * The session this fragmentation context belongs to
326    */
327   struct GNUNET_ATS_Session *session;
328
329   /**
330    * Function to call upon completion of the transmission.
331    */
332   GNUNET_TRANSPORT_TransmitContinuation cont;
333
334   /**
335    * Closure for @e cont.
336    */
337   void *cont_cls;
338
339   /**
340    * Start time.
341    */
342   struct GNUNET_TIME_Absolute start_time;
343
344   /**
345    * Transmission time for the next fragment.  Incremented by
346    * the "flow_delay_from_other_peer" for each fragment when
347    * we setup the fragments.
348    */
349   struct GNUNET_TIME_Absolute next_frag_time;
350
351   /**
352    * Message timeout
353    */
354   struct GNUNET_TIME_Absolute timeout;
355
356   /**
357    * Payload size of original unfragmented message
358    */
359   size_t payload_size;
360
361   /**
362    * Bytes used to send all fragments on wire including UDP overhead
363    */
364   size_t on_wire_size;
365
366 };
367
368
369 /**
370  * Function called when a message is removed from the
371  * transmission queue.
372  *
373  * @param cls closure
374  * @param udpw message wrapper finished
375  * @param result #GNUNET_OK on success (message was sent)
376  *               #GNUNET_SYSERR if the target disconnected
377  *               or we had a timeout or other trouble sending
378  */
379 typedef void
380 (*QueueContinuation) (void *cls,
381                       struct UDP_MessageWrapper *udpw,
382                       int result);
383
384
385 /**
386  * Information we track for each message in the queue.
387  */
388 struct UDP_MessageWrapper
389 {
390   /**
391    * Session this message belongs to
392    */
393   struct GNUNET_ATS_Session *session;
394
395   /**
396    * DLL of messages, previous element
397    */
398   struct UDP_MessageWrapper *prev;
399
400   /**
401    * DLL of messages, next element
402    */
403   struct UDP_MessageWrapper *next;
404
405   /**
406    * Message with @e msg_size bytes including UDP-specific overhead.
407    */
408   char *msg_buf;
409
410   /**
411    * Function to call once the message wrapper is being removed
412    * from the queue (with success or failure).
413    */
414   QueueContinuation qc;
415
416   /**
417    * Closure for @e qc.
418    */
419   void *qc_cls;
420
421   /**
422    * External continuation to call upon completion of the
423    * transmission, NULL if this queue entry is not for a
424    * message from the application.
425    */
426   GNUNET_TRANSPORT_TransmitContinuation cont;
427
428   /**
429    * Closure for @e cont.
430    */
431   void *cont_cls;
432
433   /**
434    * Fragmentation context.
435    * frag_ctx == NULL if transport <= MTU
436    * frag_ctx != NULL if transport > MTU
437    */
438   struct UDP_FragmentationContext *frag_ctx;
439
440   /**
441    * Message enqueue time.
442    */
443   struct GNUNET_TIME_Absolute start_time;
444
445   /**
446    * Desired transmission time for this message, based on the
447    * flow limiting information we got from the other peer.
448    */
449   struct GNUNET_TIME_Absolute transmission_time;
450
451   /**
452    * Message timeout.
453    */
454   struct GNUNET_TIME_Absolute timeout;
455
456   /**
457    * Size of UDP message to send, including UDP-specific overhead.
458    */
459   size_t msg_size;
460
461   /**
462    * Payload size of original message.
463    */
464   size_t payload_size;
465
466 };
467
468
469 GNUNET_NETWORK_STRUCT_BEGIN
470
471 /**
472  * UDP ACK Message-Packet header.
473  */
474 struct UDP_ACK_Message
475 {
476   /**
477    * Message header.
478    */
479   struct GNUNET_MessageHeader header;
480
481   /**
482    * Desired delay for flow control, in us (in NBO).
483    */
484   uint32_t delay GNUNET_PACKED;
485
486   /**
487    * What is the identity of the sender
488    */
489   struct GNUNET_PeerIdentity sender;
490
491 };
492
493 GNUNET_NETWORK_STRUCT_END
494
495
496 /* ************************* Monitoring *********** */
497
498
499 /**
500  * If a session monitor is attached, notify it about the new
501  * session state.
502  *
503  * @param plugin our plugin
504  * @param session session that changed state
505  * @param state new state of the session
506  */
507 static void
508 notify_session_monitor (struct Plugin *plugin,
509                         struct GNUNET_ATS_Session *session,
510                         enum GNUNET_TRANSPORT_SessionState state)
511 {
512   struct GNUNET_TRANSPORT_SessionInfo info;
513
514   if (NULL == plugin->sic)
515     return;
516   if (GNUNET_YES == session->in_destroy)
517     return; /* already destroyed, just RC>0 left-over actions */
518   memset (&info,
519           0,
520           sizeof (info));
521   info.state = state;
522   info.is_inbound = GNUNET_SYSERR; /* hard to say */
523   info.num_msg_pending = session->msgs_in_queue;
524   info.num_bytes_pending = session->bytes_in_queue;
525   /* info.receive_delay remains zero as this is not supported by UDP
526      (cannot selectively not receive from 'some' peer while continuing
527      to receive from others) */
528   info.session_timeout = session->timeout;
529   info.address = session->address;
530   plugin->sic (plugin->sic_cls,
531                session,
532                &info);
533 }
534
535
536 /**
537  * Return information about the given session to the monitor callback.
538  *
539  * @param cls the `struct Plugin` with the monitor callback (`sic`)
540  * @param peer peer we send information about
541  * @param value our `struct GNUNET_ATS_Session` to send information about
542  * @return #GNUNET_OK (continue to iterate)
543  */
544 static int
545 send_session_info_iter (void *cls,
546                         const struct GNUNET_PeerIdentity *peer,
547                         void *value)
548 {
549   struct Plugin *plugin = cls;
550   struct GNUNET_ATS_Session *session = value;
551
552   notify_session_monitor (plugin,
553                           session,
554                           GNUNET_TRANSPORT_SS_INIT);
555   notify_session_monitor (plugin,
556                           session,
557                           GNUNET_TRANSPORT_SS_UP);
558   return GNUNET_OK;
559 }
560
561
562 /**
563  * Begin monitoring sessions of a plugin.  There can only
564  * be one active monitor per plugin (i.e. if there are
565  * multiple monitors, the transport service needs to
566  * multiplex the generated events over all of them).
567  *
568  * @param cls closure of the plugin
569  * @param sic callback to invoke, NULL to disable monitor;
570  *            plugin will being by iterating over all active
571  *            sessions immediately and then enter monitor mode
572  * @param sic_cls closure for @a sic
573  */
574 static void
575 udp_plugin_setup_monitor (void *cls,
576                           GNUNET_TRANSPORT_SessionInfoCallback sic,
577                           void *sic_cls)
578 {
579   struct Plugin *plugin = cls;
580
581   plugin->sic = sic;
582   plugin->sic_cls = sic_cls;
583   if (NULL != sic)
584   {
585     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
586                                            &send_session_info_iter,
587                                            plugin);
588     /* signal end of first iteration */
589     sic (sic_cls,
590          NULL,
591          NULL);
592   }
593 }
594
595
596 /* ****************** Little Helpers ****************** */
597
598
599 /**
600  * Function to free last resources associated with a session.
601  *
602  * @param s session to free
603  */
604 static void
605 free_session (struct GNUNET_ATS_Session *s)
606 {
607   if (NULL != s->address)
608   {
609     GNUNET_HELLO_address_free (s->address);
610     s->address = NULL;
611   }
612   if (NULL != s->frag_ctx)
613   {
614     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag,
615                                      NULL,
616                                      NULL);
617     GNUNET_free (s->frag_ctx);
618     s->frag_ctx = NULL;
619   }
620   GNUNET_free (s);
621 }
622
623
624 /**
625  * Function that is called to get the keepalive factor.
626  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
627  * calculate the interval between keepalive packets.
628  *
629  * @param cls closure with the `struct Plugin`
630  * @return keepalive factor
631  */
632 static unsigned int
633 udp_query_keepalive_factor (void *cls)
634 {
635   return 15;
636 }
637
638
639 /**
640  * Function obtain the network type for a session
641  *
642  * @param cls closure (`struct Plugin *`)
643  * @param session the session
644  * @return the network type
645  */
646 static enum GNUNET_ATS_Network_Type
647 udp_plugin_get_network (void *cls,
648                         struct GNUNET_ATS_Session *session)
649 {
650   return session->scope;
651 }
652
653
654 /**
655  * Function obtain the network type for an address.
656  *
657  * @param cls closure (`struct Plugin *`)
658  * @param address the address
659  * @return the network type
660  */
661 static enum GNUNET_ATS_Network_Type
662 udp_plugin_get_network_for_address (void *cls,
663                                     const struct GNUNET_HELLO_Address *address)
664 {
665   struct Plugin *plugin = cls;
666   size_t addrlen;
667   struct sockaddr_in a4;
668   struct sockaddr_in6 a6;
669   const struct IPv4UdpAddress *u4;
670   const struct IPv6UdpAddress *u6;
671   const void *sb;
672   size_t sbs;
673
674   addrlen = address->address_length;
675   if (addrlen == sizeof(struct IPv6UdpAddress))
676   {
677     GNUNET_assert (NULL != address->address); /* make static analysis happy */
678     u6 = address->address;
679     memset (&a6, 0, sizeof(a6));
680 #if HAVE_SOCKADDR_IN_SIN_LEN
681     a6.sin6_len = sizeof (a6);
682 #endif
683     a6.sin6_family = AF_INET6;
684     a6.sin6_port = u6->u6_port;
685     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
686     sb = &a6;
687     sbs = sizeof(a6);
688   }
689   else if (addrlen == sizeof(struct IPv4UdpAddress))
690   {
691     GNUNET_assert (NULL != address->address); /* make static analysis happy */
692     u4 = address->address;
693     memset (&a4, 0, sizeof(a4));
694 #if HAVE_SOCKADDR_IN_SIN_LEN
695     a4.sin_len = sizeof (a4);
696 #endif
697     a4.sin_family = AF_INET;
698     a4.sin_port = u4->u4_port;
699     a4.sin_addr.s_addr = u4->ipv4_addr;
700     sb = &a4;
701     sbs = sizeof(a4);
702   }
703   else
704   {
705     GNUNET_break (0);
706     return GNUNET_ATS_NET_UNSPECIFIED;
707   }
708   return plugin->env->get_address_type (plugin->env->cls,
709                                         sb,
710                                         sbs);
711 }
712
713
714 /* ******************* Event loop ******************** */
715
716 /**
717  * We have been notified that our readset has something to read.  We don't
718  * know which socket needs to be read, so we have to check each one
719  * Then reschedule this function to be called again once more is available.
720  *
721  * @param cls the plugin handle
722  * @param tc the scheduling context (for rescheduling this function again)
723  */
724 static void
725 udp_plugin_select_v4 (void *cls,
726                       const struct GNUNET_SCHEDULER_TaskContext *tc);
727
728
729 /**
730  * We have been notified that our readset has something to read.  We don't
731  * know which socket needs to be read, so we have to check each one
732  * Then reschedule this function to be called again once more is available.
733  *
734  * @param cls the plugin handle
735  * @param tc the scheduling context (for rescheduling this function again)
736  */
737 static void
738 udp_plugin_select_v6 (void *cls,
739                       const struct GNUNET_SCHEDULER_TaskContext *tc);
740
741
742 /**
743  * (re)schedule IPv4-select tasks for this plugin.
744  *
745  * @param plugin plugin to reschedule
746  */
747 static void
748 schedule_select_v4 (struct Plugin *plugin)
749 {
750   struct GNUNET_TIME_Relative min_delay;
751   struct UDP_MessageWrapper *udpw;
752
753   if ( (GNUNET_YES == plugin->enable_ipv4) &&
754        (NULL != plugin->sockv4) )
755   {
756     /* Find a message ready to send:
757      * Flow delay from other peer is expired or not set (0) */
758     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
759     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
760       min_delay = GNUNET_TIME_relative_min (min_delay,
761                                             GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
762     if (NULL != plugin->select_task_v4)
763       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
764     if (NULL != plugin->ipv4_queue_head)
765     {
766       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
767       {
768         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
769                     "Calculated flow delay for UDPv4 at %s\n",
770                     GNUNET_STRINGS_relative_time_to_string (min_delay,
771                                                             GNUNET_YES));
772       }
773       else
774       {
775         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776                     "Calculated flow delay for UDPv4 at %s\n",
777                     GNUNET_STRINGS_relative_time_to_string (min_delay,
778                                                             GNUNET_YES));
779       }
780     }
781     plugin->select_task_v4
782       = GNUNET_SCHEDULER_add_read_net (min_delay,
783                                        plugin->sockv4,
784                                        &udp_plugin_select_v4,
785                                        plugin);
786   }
787 }
788
789
790 /**
791  * (re)schedule IPv6-select tasks for this plugin.
792  *
793  * @param plugin plugin to reschedule
794  */
795 static void
796 schedule_select_v6 (struct Plugin *plugin)
797 {
798   struct GNUNET_TIME_Relative min_delay;
799   struct UDP_MessageWrapper *udpw;
800
801   if ( (GNUNET_YES == plugin->enable_ipv6) &&
802        (NULL != plugin->sockv6) )
803   {
804     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
805     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
806       min_delay = GNUNET_TIME_relative_min (min_delay,
807                                             GNUNET_TIME_absolute_get_remaining (udpw->transmission_time));
808     if (NULL != plugin->select_task_v6)
809       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
810     if (NULL != plugin->ipv6_queue_head)
811     {
812       if (min_delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
813       {
814         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
815                     "Calculated flow delay for UDPv6 at %s\n",
816                     GNUNET_STRINGS_relative_time_to_string (min_delay,
817                                                             GNUNET_YES));
818       }
819       else
820       {
821         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822                     "Calculated flow delay for UDPv6 at %s\n",
823                     GNUNET_STRINGS_relative_time_to_string (min_delay,
824                                                             GNUNET_YES));
825       }
826     }
827     plugin->select_task_v6
828       = GNUNET_SCHEDULER_add_read_net (min_delay,
829                                        plugin->sockv6,
830                                        &udp_plugin_select_v6,
831                                        plugin);
832   }
833 }
834
835
836 /* ******************* Address to string and back ***************** */
837
838
839 /**
840  * Function called for a quick conversion of the binary address to
841  * a numeric address.  Note that the caller must not free the
842  * address and that the next call to this function is allowed
843  * to override the address again.
844  *
845  * @param cls closure
846  * @param addr binary address (a `union UdpAddress`)
847  * @param addrlen length of the @a addr
848  * @return string representing the same address
849  */
850 const char *
851 udp_address_to_string (void *cls,
852                        const void *addr,
853                        size_t addrlen)
854 {
855   static char rbuf[INET6_ADDRSTRLEN + 10];
856   char buf[INET6_ADDRSTRLEN];
857   const void *sb;
858   struct in_addr a4;
859   struct in6_addr a6;
860   const struct IPv4UdpAddress *t4;
861   const struct IPv6UdpAddress *t6;
862   int af;
863   uint16_t port;
864   uint32_t options;
865
866   if (NULL == addr)
867   {
868     GNUNET_break_op (0);
869     return NULL;
870   }
871
872   if (addrlen == sizeof(struct IPv6UdpAddress))
873   {
874     t6 = addr;
875     af = AF_INET6;
876     options = ntohl (t6->options);
877     port = ntohs (t6->u6_port);
878     a6 = t6->ipv6_addr;
879     sb = &a6;
880   }
881   else if (addrlen == sizeof(struct IPv4UdpAddress))
882   {
883     t4 = addr;
884     af = AF_INET;
885     options = ntohl (t4->options);
886     port = ntohs (t4->u4_port);
887     a4.s_addr = t4->ipv4_addr;
888     sb = &a4;
889   }
890   else
891   {
892     GNUNET_break_op (0);
893     return NULL;
894   }
895   inet_ntop (af,
896              sb,
897              buf,
898              INET6_ADDRSTRLEN);
899   GNUNET_snprintf (rbuf,
900                    sizeof(rbuf),
901                    (af == AF_INET6)
902                    ? "%s.%u.[%s]:%u"
903                    : "%s.%u.%s:%u",
904                    PLUGIN_NAME,
905                    options,
906                    buf,
907                    port);
908   return rbuf;
909 }
910
911
912 /**
913  * Function called to convert a string address to a binary address.
914  *
915  * @param cls closure (`struct Plugin *`)
916  * @param addr string address
917  * @param addrlen length of the address
918  * @param buf location to store the buffer
919  * @param added location to store the number of bytes in the buffer.
920  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
921  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
922  */
923 static int
924 udp_string_to_address (void *cls,
925                        const char *addr,
926                        uint16_t addrlen,
927                        void **buf,
928                        size_t *added)
929 {
930   struct sockaddr_storage socket_address;
931   char *address;
932   char *plugin;
933   char *optionstr;
934   uint32_t options;
935
936   /* Format tcp.options.address:port */
937   address = NULL;
938   plugin = NULL;
939   optionstr = NULL;
940
941   if ((NULL == addr) || (0 == addrlen))
942   {
943     GNUNET_break (0);
944     return GNUNET_SYSERR;
945   }
946   if ('\0' != addr[addrlen - 1])
947   {
948     GNUNET_break (0);
949     return GNUNET_SYSERR;
950   }
951   if (strlen (addr) != addrlen - 1)
952   {
953     GNUNET_break (0);
954     return GNUNET_SYSERR;
955   }
956   plugin = GNUNET_strdup (addr);
957   optionstr = strchr (plugin, '.');
958   if (NULL == optionstr)
959   {
960     GNUNET_break (0);
961     GNUNET_free (plugin);
962     return GNUNET_SYSERR;
963   }
964   optionstr[0] = '\0';
965   optionstr++;
966   options = atol (optionstr);
967   address = strchr (optionstr, '.');
968   if (NULL == address)
969   {
970     GNUNET_break (0);
971     GNUNET_free (plugin);
972     return GNUNET_SYSERR;
973   }
974   address[0] = '\0';
975   address++;
976
977   if (GNUNET_OK !=
978       GNUNET_STRINGS_to_address_ip (address,
979                                     strlen (address),
980                                     &socket_address))
981   {
982     GNUNET_break (0);
983     GNUNET_free (plugin);
984     return GNUNET_SYSERR;
985   }
986   GNUNET_free(plugin);
987
988   switch (socket_address.ss_family)
989   {
990   case AF_INET:
991     {
992       struct IPv4UdpAddress *u4;
993       const struct sockaddr_in *in4 = (const struct sockaddr_in *) &socket_address;
994
995       u4 = GNUNET_new (struct IPv4UdpAddress);
996       u4->options = htonl (options);
997       u4->ipv4_addr = in4->sin_addr.s_addr;
998       u4->u4_port = in4->sin_port;
999       *buf = u4;
1000       *added = sizeof (struct IPv4UdpAddress);
1001       return GNUNET_OK;
1002     }
1003   case AF_INET6:
1004     {
1005       struct IPv6UdpAddress *u6;
1006       const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &socket_address;
1007
1008       u6 = GNUNET_new (struct IPv6UdpAddress);
1009       u6->options = htonl (options);
1010       u6->ipv6_addr = in6->sin6_addr;
1011       u6->u6_port = in6->sin6_port;
1012       *buf = u6;
1013       *added = sizeof (struct IPv6UdpAddress);
1014       return GNUNET_OK;
1015     }
1016   default:
1017     GNUNET_break (0);
1018     return GNUNET_SYSERR;
1019   }
1020 }
1021
1022
1023 /**
1024  * Append our port and forward the result.
1025  *
1026  * @param cls a `struct PrettyPrinterContext *`
1027  * @param hostname result from DNS resolver
1028  */
1029 static void
1030 append_port (void *cls,
1031              const char *hostname)
1032 {
1033   struct PrettyPrinterContext *ppc = cls;
1034   struct Plugin *plugin = ppc->plugin;
1035   char *ret;
1036
1037   if (NULL == hostname)
1038   {
1039     /* Final call, done */
1040     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
1041                                  plugin->ppc_dll_tail,
1042                                  ppc);
1043     ppc->resolver_handle = NULL;
1044     ppc->asc (ppc->asc_cls,
1045               NULL,
1046               GNUNET_OK);
1047     GNUNET_free (ppc);
1048     return;
1049   }
1050   if (GNUNET_YES == ppc->ipv6)
1051     GNUNET_asprintf (&ret,
1052                      "%s.%u.[%s]:%d",
1053                      PLUGIN_NAME,
1054                      ppc->options,
1055                      hostname,
1056                      ppc->port);
1057   else
1058     GNUNET_asprintf (&ret,
1059                      "%s.%u.%s:%d",
1060                      PLUGIN_NAME,
1061                      ppc->options,
1062                      hostname,
1063                      ppc->port);
1064   ppc->asc (ppc->asc_cls,
1065             ret,
1066             GNUNET_OK);
1067   GNUNET_free (ret);
1068 }
1069
1070
1071 /**
1072  * Convert the transports address to a nice, human-readable format.
1073  *
1074  * @param cls closure with the `struct Plugin *`
1075  * @param type name of the transport that generated the address
1076  * @param addr one of the addresses of the host, NULL for the last address
1077  *        the specific address format depends on the transport;
1078  *        a `union UdpAddress`
1079  * @param addrlen length of the address
1080  * @param numeric should (IP) addresses be displayed in numeric form?
1081  * @param timeout after how long should we give up?
1082  * @param asc function to call on each string
1083  * @param asc_cls closure for @a asc
1084  */
1085 static void
1086 udp_plugin_address_pretty_printer (void *cls,
1087                                    const char *type,
1088                                    const void *addr,
1089                                    size_t addrlen,
1090                                    int numeric,
1091                                    struct GNUNET_TIME_Relative timeout,
1092                                    GNUNET_TRANSPORT_AddressStringCallback asc,
1093                                    void *asc_cls)
1094 {
1095   struct Plugin *plugin = cls;
1096   struct PrettyPrinterContext *ppc;
1097   const struct sockaddr *sb;
1098   size_t sbs;
1099   struct sockaddr_in a4;
1100   struct sockaddr_in6 a6;
1101   const struct IPv4UdpAddress *u4;
1102   const struct IPv6UdpAddress *u6;
1103   uint16_t port;
1104   uint32_t options;
1105
1106   if (addrlen == sizeof(struct IPv6UdpAddress))
1107   {
1108     u6 = addr;
1109     memset (&a6,
1110             0,
1111             sizeof (a6));
1112     a6.sin6_family = AF_INET6;
1113 #if HAVE_SOCKADDR_IN_SIN_LEN
1114     a6.sin6_len = sizeof (a6);
1115 #endif
1116     a6.sin6_port = u6->u6_port;
1117     a6.sin6_addr = u6->ipv6_addr;
1118     port = ntohs (u6->u6_port);
1119     options = ntohl (u6->options);
1120     sb = (const struct sockaddr *) &a6;
1121     sbs = sizeof (a6);
1122   }
1123   else if (addrlen == sizeof (struct IPv4UdpAddress))
1124   {
1125     u4 = addr;
1126     memset (&a4,
1127             0,
1128             sizeof(a4));
1129     a4.sin_family = AF_INET;
1130 #if HAVE_SOCKADDR_IN_SIN_LEN
1131     a4.sin_len = sizeof (a4);
1132 #endif
1133     a4.sin_port = u4->u4_port;
1134     a4.sin_addr.s_addr = u4->ipv4_addr;
1135     port = ntohs (u4->u4_port);
1136     options = ntohl (u4->options);
1137     sb = (const struct sockaddr *) &a4;
1138     sbs = sizeof(a4);
1139   }
1140   else
1141   {
1142     /* invalid address */
1143     GNUNET_break_op (0);
1144     asc (asc_cls,
1145          NULL,
1146          GNUNET_SYSERR);
1147     asc (asc_cls,
1148          NULL,
1149          GNUNET_OK);
1150     return;
1151   }
1152   ppc = GNUNET_new (struct PrettyPrinterContext);
1153   ppc->plugin = plugin;
1154   ppc->asc = asc;
1155   ppc->asc_cls = asc_cls;
1156   ppc->port = port;
1157   ppc->options = options;
1158   if (addrlen == sizeof (struct IPv6UdpAddress))
1159     ppc->ipv6 = GNUNET_YES;
1160   else
1161     ppc->ipv6 = GNUNET_NO;
1162   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
1163                                plugin->ppc_dll_tail,
1164                                ppc);
1165   ppc->resolver_handle
1166     = GNUNET_RESOLVER_hostname_get (sb,
1167                                     sbs,
1168                                     ! numeric,
1169                                     timeout,
1170                                     &append_port,
1171                                     ppc);
1172 }
1173
1174
1175 /**
1176  * Check if the given port is plausible (must be either our listen
1177  * port or our advertised port).  If it is neither, we return
1178  * #GNUNET_SYSERR.
1179  *
1180  * @param plugin global variables
1181  * @param in_port port number to check
1182  * @return #GNUNET_OK if port is either our open or advertised port
1183  */
1184 static int
1185 check_port (const struct Plugin *plugin,
1186             uint16_t in_port)
1187 {
1188   if ( (plugin->port == in_port) ||
1189        (plugin->aport == in_port) )
1190     return GNUNET_OK;
1191   return GNUNET_SYSERR;
1192 }
1193
1194
1195 /**
1196  * Function that will be called to check if a binary address for this
1197  * plugin is well-formed and corresponds to an address for THIS peer
1198  * (as per our configuration).  Naturally, if absolutely necessary,
1199  * plugins can be a bit conservative in their answer, but in general
1200  * plugins should make sure that the address does not redirect
1201  * traffic to a 3rd party that might try to man-in-the-middle our
1202  * traffic.
1203  *
1204  * @param cls closure, should be our handle to the Plugin
1205  * @param addr pointer to a `union UdpAddress`
1206  * @param addrlen length of @a addr
1207  * @return #GNUNET_OK if this is a plausible address for this peer
1208  *         and transport, #GNUNET_SYSERR if not
1209  */
1210 static int
1211 udp_plugin_check_address (void *cls,
1212                           const void *addr,
1213                           size_t addrlen)
1214 {
1215   struct Plugin *plugin = cls;
1216   const struct IPv4UdpAddress *v4;
1217   const struct IPv6UdpAddress *v6;
1218
1219   if (sizeof(struct IPv4UdpAddress) == addrlen)
1220   {
1221     v4 = (const struct IPv4UdpAddress *) addr;
1222     if (GNUNET_OK != check_port (plugin,
1223                                  ntohs (v4->u4_port)))
1224       return GNUNET_SYSERR;
1225     if (GNUNET_OK !=
1226         GNUNET_NAT_test_address (plugin->nat,
1227                                  &v4->ipv4_addr,
1228                                  sizeof (struct in_addr)))
1229       return GNUNET_SYSERR;
1230   }
1231   else if (sizeof(struct IPv6UdpAddress) == addrlen)
1232   {
1233     v6 = (const struct IPv6UdpAddress *) addr;
1234     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1235     {
1236       GNUNET_break_op (0);
1237       return GNUNET_SYSERR;
1238     }
1239     if (GNUNET_OK != check_port (plugin,
1240                                  ntohs (v6->u6_port)))
1241       return GNUNET_SYSERR;
1242     if (GNUNET_OK !=
1243         GNUNET_NAT_test_address (plugin->nat,
1244                                  &v6->ipv6_addr,
1245                                  sizeof (struct in6_addr)))
1246       return GNUNET_SYSERR;
1247   }
1248   else
1249   {
1250     GNUNET_break_op (0);
1251     return GNUNET_SYSERR;
1252   }
1253   return GNUNET_OK;
1254 }
1255
1256
1257 /**
1258  * Our external IP address/port mapping has changed.
1259  *
1260  * @param cls closure, the `struct Plugin`
1261  * @param add_remove #GNUNET_YES to mean the new public IP address,
1262  *                   #GNUNET_NO to mean the previous (now invalid) one
1263  * @param addr either the previous or the new public IP address
1264  * @param addrlen actual length of the @a addr
1265  */
1266 static void
1267 udp_nat_port_map_callback (void *cls,
1268                            int add_remove,
1269                            const struct sockaddr *addr,
1270                            socklen_t addrlen)
1271 {
1272   struct Plugin *plugin = cls;
1273   struct GNUNET_HELLO_Address *address;
1274   struct IPv4UdpAddress u4;
1275   struct IPv6UdpAddress u6;
1276   void *arg;
1277   size_t args;
1278
1279   LOG (GNUNET_ERROR_TYPE_DEBUG,
1280        (GNUNET_YES == add_remove)
1281        ? "NAT notification to add address `%s'\n"
1282        : "NAT notification to remove address `%s'\n",
1283        GNUNET_a2s (addr,
1284                    addrlen));
1285   /* convert 'address' to our internal format */
1286   switch (addr->sa_family)
1287   {
1288   case AF_INET:
1289     {
1290       const struct sockaddr_in *i4;
1291
1292       GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
1293       i4 = (const struct sockaddr_in *) addr;
1294       if (0 == ntohs (i4->sin_port))
1295       {
1296         GNUNET_break (0);
1297         return;
1298       }
1299       memset (&u4,
1300               0,
1301               sizeof(u4));
1302       u4.options = htonl (plugin->myoptions);
1303       u4.ipv4_addr = i4->sin_addr.s_addr;
1304       u4.u4_port = i4->sin_port;
1305       arg = &u4;
1306       args = sizeof (struct IPv4UdpAddress);
1307       break;
1308     }
1309   case AF_INET6:
1310     {
1311       const struct sockaddr_in6 *i6;
1312
1313       GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
1314       i6 = (const struct sockaddr_in6 *) addr;
1315       if (0 == ntohs (i6->sin6_port))
1316       {
1317         GNUNET_break (0);
1318         return;
1319       }
1320       memset (&u6,
1321               0,
1322               sizeof(u6));
1323       u6.options = htonl (plugin->myoptions);
1324       u6.ipv6_addr = i6->sin6_addr;
1325       u6.u6_port = i6->sin6_port;
1326       arg = &u6;
1327       args = sizeof (struct IPv6UdpAddress);
1328       break;
1329     }
1330   default:
1331     GNUNET_break (0);
1332     return;
1333   }
1334   /* modify our published address list */
1335   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1336                                            PLUGIN_NAME,
1337                                            arg,
1338                                            args,
1339                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
1340   plugin->env->notify_address (plugin->env->cls,
1341                                add_remove,
1342                                address);
1343   GNUNET_HELLO_address_free (address);
1344 }
1345
1346
1347 /* ********************* Finding sessions ******************* */
1348
1349
1350 /**
1351  * Closure for #session_cmp_it().
1352  */
1353 struct GNUNET_ATS_SessionCompareContext
1354 {
1355   /**
1356    * Set to session matching the address.
1357    */
1358   struct GNUNET_ATS_Session *res;
1359
1360   /**
1361    * Address we are looking for.
1362    */
1363   const struct GNUNET_HELLO_Address *address;
1364 };
1365
1366
1367 /**
1368  * Find a session with a matching address.
1369  *
1370  * @param cls the `struct GNUNET_ATS_SessionCompareContext *`
1371  * @param key peer identity (unused)
1372  * @param value the `struct GNUNET_ATS_Session *`
1373  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1374  */
1375 static int
1376 session_cmp_it (void *cls,
1377                 const struct GNUNET_PeerIdentity *key,
1378                 void *value)
1379 {
1380   struct GNUNET_ATS_SessionCompareContext *cctx = cls;
1381   struct GNUNET_ATS_Session *s = value;
1382
1383   if (0 == GNUNET_HELLO_address_cmp (s->address,
1384                                      cctx->address))
1385   {
1386     GNUNET_assert (GNUNET_NO == s->in_destroy);
1387     cctx->res = s;
1388     return GNUNET_NO;
1389   }
1390   return GNUNET_OK;
1391 }
1392
1393
1394 /**
1395  * Locate an existing session the transport service is using to
1396  * send data to another peer.  Performs some basic sanity checks
1397  * on the address and then tries to locate a matching session.
1398  *
1399  * @param cls the plugin
1400  * @param address the address we should locate the session by
1401  * @return the session if it exists, or NULL if it is not found
1402  */
1403 static struct GNUNET_ATS_Session *
1404 udp_plugin_lookup_session (void *cls,
1405                            const struct GNUNET_HELLO_Address *address)
1406 {
1407   struct Plugin *plugin = cls;
1408   const struct IPv6UdpAddress *udp_a6;
1409   const struct IPv4UdpAddress *udp_a4;
1410   struct GNUNET_ATS_SessionCompareContext cctx;
1411
1412   if (NULL == address->address)
1413   {
1414     GNUNET_break (0);
1415     return NULL;
1416   }
1417   if (sizeof(struct IPv4UdpAddress) == address->address_length)
1418   {
1419     if (NULL == plugin->sockv4)
1420       return NULL;
1421     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1422     if (0 == udp_a4->u4_port)
1423     {
1424       GNUNET_break (0);
1425       return NULL;
1426     }
1427   }
1428   else if (sizeof(struct IPv6UdpAddress) == address->address_length)
1429   {
1430     if (NULL == plugin->sockv6)
1431       return NULL;
1432     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1433     if (0 == udp_a6->u6_port)
1434     {
1435       GNUNET_break (0);
1436       return NULL;
1437     }
1438   }
1439   else
1440   {
1441     GNUNET_break (0);
1442     return NULL;
1443   }
1444
1445   /* check if session already exists */
1446   cctx.address = address;
1447   cctx.res = NULL;
1448   LOG (GNUNET_ERROR_TYPE_DEBUG,
1449        "Looking for existing session for peer `%s' with address `%s'\n",
1450        GNUNET_i2s (&address->peer),
1451        udp_address_to_string (plugin,
1452                               address->address,
1453                               address->address_length));
1454   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1455                                               &address->peer,
1456                                               &session_cmp_it,
1457                                               &cctx);
1458   if (NULL == cctx.res)
1459     return NULL;
1460   LOG (GNUNET_ERROR_TYPE_DEBUG,
1461        "Found existing session %p\n",
1462        cctx.res);
1463   return cctx.res;
1464 }
1465
1466
1467 /* ********************** Timeout ****************** */
1468
1469
1470 /**
1471  * Increment session timeout due to activity.
1472  *
1473  * @param s session to reschedule timeout activity for
1474  */
1475 static void
1476 reschedule_session_timeout (struct GNUNET_ATS_Session *s)
1477 {
1478   if (GNUNET_YES == s->in_destroy)
1479     return;
1480   GNUNET_assert (NULL != s->timeout_task);
1481   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1482 }
1483
1484
1485
1486 /**
1487  * Function that will be called whenever the transport service wants to
1488  * notify the plugin that a session is still active and in use and
1489  * therefore the session timeout for this session has to be updated
1490  *
1491  * @param cls closure with the `struct Plugin`
1492  * @param peer which peer was the session for
1493  * @param session which session is being updated
1494  */
1495 static void
1496 udp_plugin_update_session_timeout (void *cls,
1497                                    const struct GNUNET_PeerIdentity *peer,
1498                                    struct GNUNET_ATS_Session *session)
1499 {
1500   struct Plugin *plugin = cls;
1501
1502   if (GNUNET_YES !=
1503       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1504                                                     peer,
1505                                                     session))
1506   {
1507     GNUNET_break (0);
1508     return;
1509   }
1510   /* Reschedule session timeout */
1511   reschedule_session_timeout (session);
1512 }
1513
1514
1515 /* ************************* Sending ************************ */
1516
1517
1518 /**
1519  * Remove the given message from the transmission queue and
1520  * update all applicable statistics.
1521  *
1522  * @param plugin the UDP plugin
1523  * @param udpw message wrapper to dequeue
1524  */
1525 static void
1526 dequeue (struct Plugin *plugin,
1527          struct UDP_MessageWrapper *udpw)
1528 {
1529   struct GNUNET_ATS_Session *session = udpw->session;
1530
1531   if (plugin->bytes_in_buffer < udpw->msg_size)
1532   {
1533     GNUNET_break (0);
1534   }
1535   else
1536   {
1537     GNUNET_STATISTICS_update (plugin->env->stats,
1538                               "# UDP, total bytes in send buffers",
1539                               - (long long) udpw->msg_size,
1540                               GNUNET_NO);
1541     plugin->bytes_in_buffer -= udpw->msg_size;
1542   }
1543   GNUNET_STATISTICS_update (plugin->env->stats,
1544                             "# UDP, total messages in send buffers",
1545                             -1,
1546                             GNUNET_NO);
1547   if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1548   {
1549     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1550                                  plugin->ipv4_queue_tail,
1551                                  udpw);
1552   }
1553   else if (sizeof(struct IPv6UdpAddress) == udpw->session->address->address_length)
1554   {
1555     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1556                                  plugin->ipv6_queue_tail,
1557                                  udpw);
1558   }
1559   else
1560   {
1561     GNUNET_break (0);
1562     return;
1563   }
1564   GNUNET_assert (session->msgs_in_queue > 0);
1565   session->msgs_in_queue--;
1566   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1567   session->bytes_in_queue -= udpw->msg_size;
1568 }
1569
1570
1571 /**
1572  * Enqueue a message for transmission and update statistics.
1573  *
1574  * @param plugin the UDP plugin
1575  * @param udpw message wrapper to queue
1576  */
1577 static void
1578 enqueue (struct Plugin *plugin,
1579          struct UDP_MessageWrapper *udpw)
1580 {
1581   struct GNUNET_ATS_Session *session = udpw->session;
1582
1583   if (GNUNET_YES == session->in_destroy)
1584   {
1585     GNUNET_break (0);
1586     return;
1587   }
1588   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1589   {
1590     GNUNET_break (0);
1591   }
1592   else
1593   {
1594     GNUNET_STATISTICS_update (plugin->env->stats,
1595                               "# UDP, total bytes in send buffers",
1596                               udpw->msg_size,
1597                               GNUNET_NO);
1598     plugin->bytes_in_buffer += udpw->msg_size;
1599   }
1600   GNUNET_STATISTICS_update (plugin->env->stats,
1601                             "# UDP, total messages in send buffers",
1602                             1,
1603                             GNUNET_NO);
1604   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
1605   {
1606     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1607                                 plugin->ipv4_queue_tail,
1608                                 udpw);
1609   }
1610   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
1611   {
1612     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1613                                  plugin->ipv6_queue_tail,
1614                                  udpw);
1615   }
1616   else
1617   {
1618     GNUNET_break (0);
1619     udpw->cont (udpw->cont_cls,
1620                 &session->target,
1621                 GNUNET_SYSERR,
1622                 udpw->msg_size,
1623                 0);
1624     GNUNET_free (udpw);
1625     return;
1626   }
1627   session->msgs_in_queue++;
1628   session->bytes_in_queue += udpw->msg_size;
1629 }
1630
1631
1632 /**
1633  * We have completed our (attempt) to transmit a message that had to
1634  * be fragmented -- either because we got an ACK saying that all
1635  * fragments were received, or because of timeout / disconnect.  Clean
1636  * up our state.
1637  *
1638  * @param frag_ctx fragmentation context to clean up
1639  * @param result #GNUNET_OK if we succeeded (got ACK),
1640  *               #GNUNET_SYSERR if the transmission failed
1641  */
1642 static void
1643 fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
1644                          int result)
1645 {
1646   struct Plugin *plugin = frag_ctx->plugin;
1647   struct GNUNET_ATS_Session *s = frag_ctx->session;
1648   struct UDP_MessageWrapper *udpw;
1649   struct UDP_MessageWrapper *tmp;
1650   size_t overhead;
1651   struct GNUNET_TIME_Relative delay;
1652
1653   LOG (GNUNET_ERROR_TYPE_DEBUG,
1654        "%p: Fragmented message removed with result %s\n",
1655        frag_ctx,
1656        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1657   /* Call continuation for fragmented message */
1658   if (frag_ctx->on_wire_size >= frag_ctx->payload_size)
1659     overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1660   else
1661     overhead = frag_ctx->on_wire_size;
1662   delay = GNUNET_TIME_absolute_get_duration (frag_ctx->start_time);
1663   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1664   {
1665     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1666                 "Fragmented message acknowledged after %s\n",
1667                 GNUNET_STRINGS_relative_time_to_string (delay,
1668                                                         GNUNET_YES));
1669   }
1670   else
1671   {
1672     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673                 "Fragmented message acknowledged after %s\n",
1674                 GNUNET_STRINGS_relative_time_to_string (delay,
1675                                                         GNUNET_YES));
1676   }
1677
1678   if (NULL != frag_ctx->cont)
1679     frag_ctx->cont (frag_ctx->cont_cls,
1680                     &s->target,
1681                     result,
1682                     s->frag_ctx->payload_size,
1683                     frag_ctx->on_wire_size);
1684   GNUNET_STATISTICS_update (plugin->env->stats,
1685                             "# UDP, fragmented messages active",
1686                             -1,
1687                             GNUNET_NO);
1688
1689   if (GNUNET_OK == result)
1690   {
1691     GNUNET_STATISTICS_update (plugin->env->stats,
1692                               "# UDP, fragmented msgs, messages, sent, success",
1693                               1,
1694                               GNUNET_NO);
1695     GNUNET_STATISTICS_update (plugin->env->stats,
1696                               "# UDP, fragmented msgs, bytes payload, sent, success",
1697                               s->frag_ctx->payload_size,
1698                               GNUNET_NO);
1699     GNUNET_STATISTICS_update (plugin->env->stats,
1700                               "# UDP, fragmented msgs, bytes overhead, sent, success",
1701                               overhead,
1702                               GNUNET_NO);
1703     GNUNET_STATISTICS_update (plugin->env->stats,
1704                               "# UDP, total, bytes overhead, sent",
1705                               overhead,
1706                               GNUNET_NO);
1707     GNUNET_STATISTICS_update (plugin->env->stats,
1708                               "# UDP, total, bytes payload, sent",
1709                               s->frag_ctx->payload_size,
1710                               GNUNET_NO);
1711   }
1712   else
1713   {
1714     GNUNET_STATISTICS_update (plugin->env->stats,
1715                               "# UDP, fragmented msgs, messages, sent, failure",
1716                               1,
1717                               GNUNET_NO);
1718     GNUNET_STATISTICS_update (plugin->env->stats,
1719                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1720                               s->frag_ctx->payload_size,
1721                               GNUNET_NO);
1722     GNUNET_STATISTICS_update (plugin->env->stats,
1723                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1724                               overhead,
1725                               GNUNET_NO);
1726     GNUNET_STATISTICS_update (plugin->env->stats,
1727                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1728                               overhead,
1729                               GNUNET_NO);
1730   }
1731
1732   /* Remove remaining fragments from queue, no need to transmit those
1733      any longer. */
1734   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1735   {
1736     udpw = plugin->ipv6_queue_head;
1737     while (NULL != udpw)
1738     {
1739       tmp = udpw->next;
1740       if ( (udpw->frag_ctx != NULL) &&
1741            (udpw->frag_ctx == frag_ctx) )
1742       {
1743         dequeue (plugin,
1744                  udpw);
1745         GNUNET_free (udpw);
1746       }
1747       udpw = tmp;
1748     }
1749   }
1750   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1751   {
1752     udpw = plugin->ipv4_queue_head;
1753     while (NULL != udpw)
1754     {
1755       tmp = udpw->next;
1756       if ( (NULL != udpw->frag_ctx) &&
1757            (udpw->frag_ctx == frag_ctx) )
1758       {
1759         dequeue (plugin,
1760                  udpw);
1761         GNUNET_free (udpw);
1762       }
1763       udpw = tmp;
1764     }
1765   }
1766   notify_session_monitor (s->plugin,
1767                           s,
1768                           GNUNET_TRANSPORT_SS_UPDATE);
1769   GNUNET_FRAGMENT_context_destroy (frag_ctx->frag,
1770                                    &s->last_expected_msg_delay,
1771                                    &s->last_expected_ack_delay);
1772   s->frag_ctx = NULL;
1773   GNUNET_free (frag_ctx);
1774 }
1775
1776
1777 /**
1778  * We are finished with a fragment in the message queue.
1779  * Notify the continuation and update statistics.
1780  *
1781  * @param cls the `struct Plugin *`
1782  * @param udpw the queue entry
1783  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1784  */
1785 static void
1786 qc_fragment_sent (void *cls,
1787                   struct UDP_MessageWrapper *udpw,
1788                   int result)
1789 {
1790   struct Plugin *plugin = cls;
1791
1792   GNUNET_assert (NULL != udpw->frag_ctx);
1793   if (GNUNET_OK == result)
1794   {
1795     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1796     GNUNET_STATISTICS_update (plugin->env->stats,
1797                               "# UDP, fragmented msgs, fragments, sent, success",
1798                               1,
1799                               GNUNET_NO);
1800     GNUNET_STATISTICS_update (plugin->env->stats,
1801                               "# UDP, fragmented msgs, fragments bytes, sent, success",
1802                               udpw->msg_size,
1803                               GNUNET_NO);
1804   }
1805   else
1806   {
1807     fragmented_message_done (udpw->frag_ctx,
1808                              GNUNET_SYSERR);
1809     GNUNET_STATISTICS_update (plugin->env->stats,
1810                               "# UDP, fragmented msgs, fragments, sent, failure",
1811                               1,
1812                               GNUNET_NO);
1813     GNUNET_STATISTICS_update (plugin->env->stats,
1814                               "# UDP, fragmented msgs, fragments bytes, sent, failure",
1815                               udpw->msg_size,
1816                               GNUNET_NO);
1817   }
1818 }
1819
1820
1821 /**
1822  * Function that is called with messages created by the fragmentation
1823  * module.  In the case of the `proc` callback of the
1824  * #GNUNET_FRAGMENT_context_create() function, this function must
1825  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1826  *
1827  * @param cls closure, the `struct UDP_FragmentationContext`
1828  * @param msg the message that was created
1829  */
1830 static void
1831 enqueue_fragment (void *cls,
1832                   const struct GNUNET_MessageHeader *msg)
1833 {
1834   struct UDP_FragmentationContext *frag_ctx = cls;
1835   struct Plugin *plugin = frag_ctx->plugin;
1836   struct UDP_MessageWrapper *udpw;
1837   struct GNUNET_ATS_Session *session = frag_ctx->session;
1838   size_t msg_len = ntohs (msg->size);
1839
1840   LOG (GNUNET_ERROR_TYPE_DEBUG,
1841        "Enqueuing fragment with %u bytes\n",
1842        msg_len);
1843   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1844   udpw->session = session;
1845   udpw->msg_buf = (char *) &udpw[1];
1846   udpw->msg_size = msg_len;
1847   udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1848   udpw->timeout = frag_ctx->timeout;
1849   udpw->start_time = frag_ctx->start_time;
1850   udpw->transmission_time = frag_ctx->next_frag_time;
1851   frag_ctx->next_frag_time
1852     = GNUNET_TIME_absolute_add (frag_ctx->next_frag_time,
1853                                 session->flow_delay_from_other_peer);
1854   udpw->frag_ctx = frag_ctx;
1855   udpw->qc = &qc_fragment_sent;
1856   udpw->qc_cls = plugin;
1857   memcpy (udpw->msg_buf,
1858           msg,
1859           msg_len);
1860   enqueue (plugin,
1861            udpw);
1862 }
1863
1864
1865 /**
1866  * We are finished with a message from the message queue.
1867  * Notify the continuation and update statistics.
1868  *
1869  * @param cls the `struct Plugin *`
1870  * @param udpw the queue entry
1871  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1872  */
1873 static void
1874 qc_message_sent (void *cls,
1875                  struct UDP_MessageWrapper *udpw,
1876                  int result)
1877 {
1878   struct Plugin *plugin = cls;
1879   size_t overhead;
1880   struct GNUNET_TIME_Relative delay;
1881
1882   if (udpw->msg_size >= udpw->payload_size)
1883     overhead = udpw->msg_size - udpw->payload_size;
1884   else
1885     overhead = udpw->msg_size;
1886
1887   if (NULL != udpw->cont)
1888   {
1889     delay = GNUNET_TIME_absolute_get_duration (udpw->start_time);
1890     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1891     {
1892       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1893                   "Message sent via UDP with delay of %s\n",
1894                   GNUNET_STRINGS_relative_time_to_string (delay,
1895                                                           GNUNET_YES));
1896     }
1897     else
1898     {
1899       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1900                   "Message sent via UDP with delay of %s\n",
1901                   GNUNET_STRINGS_relative_time_to_string (delay,
1902                                                           GNUNET_YES));
1903     }
1904     udpw->cont (udpw->cont_cls,
1905                 &udpw->session->target,
1906                 result,
1907                 udpw->payload_size,
1908                 overhead);
1909   }
1910   if (GNUNET_OK == result)
1911   {
1912     GNUNET_STATISTICS_update (plugin->env->stats,
1913                               "# UDP, unfragmented msgs, messages, sent, success",
1914                               1,
1915                               GNUNET_NO);
1916     GNUNET_STATISTICS_update (plugin->env->stats,
1917                               "# UDP, unfragmented msgs, bytes payload, sent, success",
1918                               udpw->payload_size,
1919                               GNUNET_NO);
1920     GNUNET_STATISTICS_update (plugin->env->stats,
1921                               "# UDP, unfragmented msgs, bytes overhead, sent, success",
1922                               overhead,
1923                               GNUNET_NO);
1924     GNUNET_STATISTICS_update (plugin->env->stats,
1925                               "# UDP, total, bytes overhead, sent",
1926                               overhead,
1927                               GNUNET_NO);
1928     GNUNET_STATISTICS_update (plugin->env->stats,
1929                               "# UDP, total, bytes payload, sent",
1930                               udpw->payload_size,
1931                               GNUNET_NO);
1932   }
1933   else
1934   {
1935     GNUNET_STATISTICS_update (plugin->env->stats,
1936                               "# UDP, unfragmented msgs, messages, sent, failure",
1937                               1,
1938                               GNUNET_NO);
1939     GNUNET_STATISTICS_update (plugin->env->stats,
1940                               "# UDP, unfragmented msgs, bytes payload, sent, failure",
1941                               udpw->payload_size,
1942                               GNUNET_NO);
1943     GNUNET_STATISTICS_update (plugin->env->stats,
1944                               "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1945                               overhead,
1946                               GNUNET_NO);
1947   }
1948 }
1949
1950
1951 /**
1952  * Function that can be used by the transport service to transmit a
1953  * message using the plugin.  Note that in the case of a peer
1954  * disconnecting, the continuation MUST be called prior to the
1955  * disconnect notification itself.  This function will be called with
1956  * this peer's HELLO message to initiate a fresh connection to another
1957  * peer.
1958  *
1959  * @param cls closure
1960  * @param s which session must be used
1961  * @param msgbuf the message to transmit
1962  * @param msgbuf_size number of bytes in @a msgbuf
1963  * @param priority how important is the message (most plugins will
1964  *                 ignore message priority and just FIFO)
1965  * @param to how long to wait at most for the transmission (does not
1966  *                require plugins to discard the message after the timeout,
1967  *                just advisory for the desired delay; most plugins will ignore
1968  *                this as well)
1969  * @param cont continuation to call once the message has
1970  *        been transmitted (or if the transport is ready
1971  *        for the next transmission call; or if the
1972  *        peer disconnected...); can be NULL
1973  * @param cont_cls closure for @a cont
1974  * @return number of bytes used (on the physical network, with overheads);
1975  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1976  *         and does NOT mean that the message was not transmitted (DV)
1977  */
1978 static ssize_t
1979 udp_plugin_send (void *cls,
1980                  struct GNUNET_ATS_Session *s,
1981                  const char *msgbuf,
1982                  size_t msgbuf_size,
1983                  unsigned int priority,
1984                  struct GNUNET_TIME_Relative to,
1985                  GNUNET_TRANSPORT_TransmitContinuation cont,
1986                  void *cont_cls)
1987 {
1988   struct Plugin *plugin = cls;
1989   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
1990   struct UDP_FragmentationContext *frag_ctx;
1991   struct UDP_MessageWrapper *udpw;
1992   struct UDPMessage *udp;
1993   char mbuf[udpmlen] GNUNET_ALIGN;
1994
1995   if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
1996        (NULL == plugin->sockv6) )
1997     return GNUNET_SYSERR;
1998   if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
1999        (NULL == plugin->sockv4) )
2000     return GNUNET_SYSERR;
2001   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2002   {
2003     GNUNET_break (0);
2004     return GNUNET_SYSERR;
2005   }
2006   if (GNUNET_YES !=
2007       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
2008                                                     &s->target,
2009                                                     s))
2010   {
2011     GNUNET_break (0);
2012     return GNUNET_SYSERR;
2013   }
2014   LOG (GNUNET_ERROR_TYPE_DEBUG,
2015        "UDP transmits %u-byte message to `%s' using address `%s'\n",
2016        udpmlen,
2017        GNUNET_i2s (&s->target),
2018        udp_address_to_string (plugin,
2019                               s->address->address,
2020                               s->address->address_length));
2021
2022   udp = (struct UDPMessage *) mbuf;
2023   udp->header.size = htons (udpmlen);
2024   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2025   udp->reserved = htonl (0);
2026   udp->sender = *plugin->env->my_identity;
2027
2028   /* We do not update the session time out here!  Otherwise this
2029    * session will not timeout since we send keep alive before session
2030    * can timeout.
2031    *
2032    * For UDP we update session timeout only on receive, this will
2033    * cover keep alives, since remote peer will reply with keep alive
2034    * responses!
2035    */
2036   if (udpmlen <= UDP_MTU)
2037   {
2038     /* unfragmented message */
2039     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
2040     udpw->session = s;
2041     udpw->msg_buf = (char *) &udpw[1];
2042     udpw->msg_size = udpmlen; /* message size with UDP overhead */
2043     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2044     udpw->start_time = GNUNET_TIME_absolute_get ();
2045     udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
2046     udpw->transmission_time = s->last_transmit_time;
2047     s->last_transmit_time
2048       = GNUNET_TIME_absolute_add (s->last_transmit_time,
2049                                   s->flow_delay_from_other_peer);
2050     udpw->cont = cont;
2051     udpw->cont_cls = cont_cls;
2052     udpw->frag_ctx = NULL;
2053     udpw->qc = &qc_message_sent;
2054     udpw->qc_cls = plugin;
2055     memcpy (udpw->msg_buf,
2056             udp,
2057             sizeof (struct UDPMessage));
2058     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
2059             msgbuf,
2060             msgbuf_size);
2061     enqueue (plugin,
2062              udpw);
2063     GNUNET_STATISTICS_update (plugin->env->stats,
2064                               "# UDP, unfragmented messages queued total",
2065                               1,
2066                               GNUNET_NO);
2067     GNUNET_STATISTICS_update (plugin->env->stats,
2068                               "# UDP, unfragmented bytes payload queued total",
2069                               msgbuf_size,
2070                               GNUNET_NO);
2071   }
2072   else
2073   {
2074     /* fragmented message */
2075     if (NULL != s->frag_ctx)
2076       return GNUNET_SYSERR;
2077     memcpy (&udp[1],
2078             msgbuf,
2079             msgbuf_size);
2080     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2081     frag_ctx->plugin = plugin;
2082     frag_ctx->session = s;
2083     frag_ctx->cont = cont;
2084     frag_ctx->cont_cls = cont_cls;
2085     frag_ctx->start_time = GNUNET_TIME_absolute_get ();
2086     frag_ctx->next_frag_time = s->last_transmit_time;
2087     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
2088     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
2089     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2090     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2091                                                      UDP_MTU,
2092                                                      &plugin->tracker,
2093                                                      s->last_expected_msg_delay,
2094                                                      s->last_expected_ack_delay,
2095                                                      &udp->header,
2096                                                      &enqueue_fragment,
2097                                                      frag_ctx);
2098     s->frag_ctx = frag_ctx;
2099     s->last_transmit_time = frag_ctx->next_frag_time;
2100     if (sizeof (struct IPv4UdpAddress) == s->address->address_length)
2101       schedule_select_v4 (plugin);
2102     else
2103       schedule_select_v6 (plugin);
2104     GNUNET_STATISTICS_update (plugin->env->stats,
2105                               "# UDP, fragmented messages active",
2106                               1,
2107                               GNUNET_NO);
2108     GNUNET_STATISTICS_update (plugin->env->stats,
2109                               "# UDP, fragmented messages, total",
2110                               1,
2111                               GNUNET_NO);
2112     GNUNET_STATISTICS_update (plugin->env->stats,
2113                               "# UDP, fragmented bytes (payload)",
2114                               frag_ctx->payload_size,
2115                               GNUNET_NO);
2116   }
2117   notify_session_monitor (s->plugin,
2118                           s,
2119                           GNUNET_TRANSPORT_SS_UPDATE);
2120   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2121     schedule_select_v4 (plugin);
2122   else
2123     schedule_select_v6 (plugin);
2124   return udpmlen;
2125 }
2126
2127
2128 /**
2129  * Handle an ACK message.
2130  *
2131  * @param plugin the UDP plugin
2132  * @param msg the (presumed) UDP ACK message
2133  * @param udp_addr sender address
2134  * @param udp_addr_len number of bytes in @a udp_addr
2135  */
2136 static void
2137 read_process_ack (struct Plugin *plugin,
2138                   const struct GNUNET_MessageHeader *msg,
2139                   const union UdpAddress *udp_addr,
2140                   socklen_t udp_addr_len)
2141 {
2142   const struct GNUNET_MessageHeader *ack;
2143   const struct UDP_ACK_Message *udp_ack;
2144   struct GNUNET_HELLO_Address *address;
2145   struct GNUNET_ATS_Session *s;
2146   struct GNUNET_TIME_Relative flow_delay;
2147
2148   if (ntohs (msg->size)
2149       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2150   {
2151     GNUNET_break_op (0);
2152     return;
2153   }
2154   udp_ack = (const struct UDP_ACK_Message *) msg;
2155   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2156   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2157   {
2158     GNUNET_break_op(0);
2159     return;
2160   }
2161   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2162                                            PLUGIN_NAME,
2163                                            udp_addr,
2164                                            udp_addr_len,
2165                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2166   s = udp_plugin_lookup_session (plugin,
2167                                  address);
2168   if (NULL == s)
2169   {
2170     LOG (GNUNET_ERROR_TYPE_WARNING,
2171          "UDP session of address %s for ACK not found\n",
2172          udp_address_to_string (plugin,
2173                                 address->address,
2174                                 address->address_length));
2175     GNUNET_HELLO_address_free (address);
2176     return;
2177   }
2178   if (NULL == s->frag_ctx)
2179   {
2180     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2181          "Fragmentation context of address %s for ACK (%s) not found\n",
2182          udp_address_to_string (plugin,
2183                                 address->address,
2184                                 address->address_length),
2185          GNUNET_FRAGMENT_print_ack (ack));
2186     GNUNET_HELLO_address_free (address);
2187     return;
2188   }
2189   GNUNET_HELLO_address_free (address);
2190
2191   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2192   LOG (GNUNET_ERROR_TYPE_DEBUG,
2193        "We received a sending delay of %s for %s\n",
2194        GNUNET_STRINGS_relative_time_to_string (flow_delay,
2195                                                GNUNET_YES),
2196        GNUNET_i2s (&udp_ack->sender));
2197   /* Flow delay is for the reassembled packet, however, our delay
2198      is per packet, so we need to adjust: */
2199   flow_delay = GNUNET_TIME_relative_divide (flow_delay,
2200                                             1 + (s->frag_ctx->payload_size /
2201                                                  UDP_MTU));
2202   s->flow_delay_from_other_peer = flow_delay;
2203
2204
2205   if (GNUNET_OK !=
2206       GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2207                                    ack))
2208   {
2209     LOG (GNUNET_ERROR_TYPE_DEBUG,
2210          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2211          (unsigned int) ntohs (msg->size),
2212          GNUNET_i2s (&udp_ack->sender),
2213          udp_address_to_string (plugin,
2214                                 udp_addr,
2215                                 udp_addr_len));
2216     /* Expect more ACKs to arrive */
2217     return;
2218   }
2219
2220   LOG (GNUNET_ERROR_TYPE_DEBUG,
2221        "Message from %s at %s full ACK'ed\n",
2222        GNUNET_i2s (&udp_ack->sender),
2223        udp_address_to_string (plugin,
2224                               udp_addr,
2225                               udp_addr_len));
2226
2227   /* Remove fragmented message after successful sending */
2228   fragmented_message_done (s->frag_ctx,
2229                            GNUNET_OK);
2230 }
2231
2232
2233 /* ********************** Receiving ********************** */
2234
2235
2236 /**
2237  * Closure for #find_receive_context().
2238  */
2239 struct FindReceiveContext
2240 {
2241   /**
2242    * Where to store the result.
2243    */
2244   struct DefragContext *rc;
2245
2246   /**
2247    * Session associated with this context.
2248    */
2249   struct GNUNET_ATS_Session *session;
2250
2251   /**
2252    * Address to find.
2253    */
2254   const union UdpAddress *udp_addr;
2255
2256   /**
2257    * Number of bytes in @e udp_addr.
2258    */
2259   size_t udp_addr_len;
2260
2261 };
2262
2263
2264 /**
2265  * Scan the heap for a receive context with the given address.
2266  *
2267  * @param cls the `struct FindReceiveContext`
2268  * @param node internal node of the heap
2269  * @param element value stored at the node (a `struct ReceiveContext`)
2270  * @param cost cost associated with the node
2271  * @return #GNUNET_YES if we should continue to iterate,
2272  *         #GNUNET_NO if not.
2273  */
2274 static int
2275 find_receive_context (void *cls,
2276                       struct GNUNET_CONTAINER_HeapNode *node,
2277                       void *element,
2278                       GNUNET_CONTAINER_HeapCostType cost)
2279 {
2280   struct FindReceiveContext *frc = cls;
2281   struct DefragContext *e = element;
2282
2283   if ( (frc->udp_addr_len == e->udp_addr_len) &&
2284        (0 == memcmp (frc->udp_addr,
2285                      e->udp_addr,
2286                      frc->udp_addr_len)) )
2287   {
2288     frc->rc = e;
2289     return GNUNET_NO;
2290   }
2291   return GNUNET_YES;
2292 }
2293
2294
2295 /**
2296  * Message tokenizer has broken up an incomming message. Pass it on
2297  * to the service.
2298  *
2299  * @param cls the `struct Plugin *`
2300  * @param client the `struct GNUNET_ATS_Session *`
2301  * @param hdr the actual message
2302  * @return #GNUNET_OK (always)
2303  */
2304 static int
2305 process_inbound_tokenized_messages (void *cls,
2306                                     void *client,
2307                                     const struct GNUNET_MessageHeader *hdr)
2308 {
2309   struct Plugin *plugin = cls;
2310   struct GNUNET_ATS_Session *session = client;
2311
2312   if (GNUNET_YES == session->in_destroy)
2313     return GNUNET_OK;
2314   reschedule_session_timeout (session);
2315   session->flow_delay_for_other_peer
2316     = plugin->env->receive (plugin->env->cls,
2317                             session->address,
2318                             session,
2319                             hdr);
2320   return GNUNET_OK;
2321 }
2322
2323
2324 /**
2325  * Functions with this signature are called whenever we need to close
2326  * a session due to a disconnect or failure to establish a connection.
2327  *
2328  * @param cls closure with the `struct Plugin`
2329  * @param s session to close down
2330  * @return #GNUNET_OK on success
2331  */
2332 static int
2333 udp_disconnect_session (void *cls,
2334                         struct GNUNET_ATS_Session *s)
2335 {
2336   struct Plugin *plugin = cls;
2337   struct UDP_MessageWrapper *udpw;
2338   struct UDP_MessageWrapper *next;
2339   struct FindReceiveContext frc;
2340
2341   GNUNET_assert (GNUNET_YES != s->in_destroy);
2342   LOG (GNUNET_ERROR_TYPE_DEBUG,
2343        "Session %p to peer `%s' at address %s ended\n",
2344        s,
2345        GNUNET_i2s (&s->target),
2346        udp_address_to_string (plugin,
2347                               s->address->address,
2348                               s->address->address_length));
2349   if (NULL != s->timeout_task)
2350   {
2351     GNUNET_SCHEDULER_cancel (s->timeout_task);
2352     s->timeout_task = NULL;
2353   }
2354   if (NULL != s->frag_ctx)
2355   {
2356     /* Remove fragmented message due to disconnect */
2357     fragmented_message_done (s->frag_ctx,
2358                              GNUNET_SYSERR);
2359   }
2360   GNUNET_assert (GNUNET_YES ==
2361                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
2362                                                        &s->target,
2363                                                        s));
2364   frc.rc = NULL;
2365   frc.udp_addr = s->address->address;
2366   frc.udp_addr_len = s->address->address_length;
2367   /* Lookup existing receive context for this address */
2368   if (NULL != plugin->defrag_ctxs)
2369   {
2370     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2371                                    &find_receive_context,
2372                                    &frc);
2373     if (NULL != frc.rc)
2374     {
2375       struct DefragContext *d_ctx = frc.rc;
2376
2377       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
2378       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2379       GNUNET_free (d_ctx);
2380     }
2381   }
2382   s->in_destroy = GNUNET_YES;
2383   next = plugin->ipv4_queue_head;
2384   while (NULL != (udpw = next))
2385   {
2386     next = udpw->next;
2387     if (udpw->session == s)
2388     {
2389       dequeue (plugin,
2390                udpw);
2391       udpw->qc (udpw->qc_cls,
2392                 udpw,
2393                 GNUNET_SYSERR);
2394       GNUNET_free (udpw);
2395     }
2396   }
2397   next = plugin->ipv6_queue_head;
2398   while (NULL != (udpw = next))
2399   {
2400     next = udpw->next;
2401     if (udpw->session == s)
2402     {
2403       dequeue (plugin,
2404                udpw);
2405       udpw->qc (udpw->qc_cls,
2406                 udpw,
2407                 GNUNET_SYSERR);
2408       GNUNET_free (udpw);
2409     }
2410   }
2411   if ( (NULL != s->frag_ctx) &&
2412        (NULL != s->frag_ctx->cont) )
2413   {
2414     /* The 'frag_ctx' itself will be freed in #free_session() a bit
2415        later, as it might be in use right now */
2416     LOG (GNUNET_ERROR_TYPE_DEBUG,
2417          "Calling continuation for fragemented message to `%s' with result SYSERR\n",
2418          GNUNET_i2s (&s->target));
2419     s->frag_ctx->cont (s->frag_ctx->cont_cls,
2420                        &s->target,
2421                        GNUNET_SYSERR,
2422                        s->frag_ctx->payload_size,
2423                        s->frag_ctx->on_wire_size);
2424   }
2425   notify_session_monitor (s->plugin,
2426                           s,
2427                           GNUNET_TRANSPORT_SS_DONE);
2428   plugin->env->session_end (plugin->env->cls,
2429                             s->address,
2430                             s);
2431   GNUNET_STATISTICS_set (plugin->env->stats,
2432                          "# UDP sessions active",
2433                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2434                          GNUNET_NO);
2435   if (0 == s->rc)
2436     free_session (s);
2437   return GNUNET_OK;
2438 }
2439
2440
2441 /**
2442  * Destroy a session, plugin is being unloaded.
2443  *
2444  * @param cls the `struct Plugin`
2445  * @param key hash of public key of target peer
2446  * @param value a `struct PeerSession *` to clean up
2447  * @return #GNUNET_OK (continue to iterate)
2448  */
2449 static int
2450 disconnect_and_free_it (void *cls,
2451                         const struct GNUNET_PeerIdentity *key,
2452                         void *value)
2453 {
2454   struct Plugin *plugin = cls;
2455
2456   udp_disconnect_session (plugin,
2457                           value);
2458   return GNUNET_OK;
2459 }
2460
2461
2462 /**
2463  * Disconnect from a remote node.  Clean up session if we have one for
2464  * this peer.
2465  *
2466  * @param cls closure for this call (should be handle to Plugin)
2467  * @param target the peeridentity of the peer to disconnect
2468  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
2469  */
2470 static void
2471 udp_disconnect (void *cls,
2472                 const struct GNUNET_PeerIdentity *target)
2473 {
2474   struct Plugin *plugin = cls;
2475
2476   LOG (GNUNET_ERROR_TYPE_DEBUG,
2477        "Disconnecting from peer `%s'\n",
2478        GNUNET_i2s (target));
2479   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
2480                                               target,
2481                                               &disconnect_and_free_it,
2482                                               plugin);
2483 }
2484
2485
2486 /**
2487  * Session was idle, so disconnect it.
2488  *
2489  * @param cls the `struct GNUNET_ATS_Session` to time out
2490  * @param tc scheduler context
2491  */
2492 static void
2493 session_timeout (void *cls,
2494                  const struct GNUNET_SCHEDULER_TaskContext *tc)
2495 {
2496   struct GNUNET_ATS_Session *s = cls;
2497   struct Plugin *plugin = s->plugin;
2498   struct GNUNET_TIME_Relative left;
2499
2500   s->timeout_task = NULL;
2501   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
2502   if (left.rel_value_us > 0)
2503   {
2504     /* not actually our turn yet, but let's at least update
2505        the monitor, it may think we're about to die ... */
2506     notify_session_monitor (s->plugin,
2507                             s,
2508                             GNUNET_TRANSPORT_SS_UPDATE);
2509     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
2510                                                     &session_timeout,
2511                                                     s);
2512     return;
2513   }
2514   LOG (GNUNET_ERROR_TYPE_DEBUG,
2515        "Session %p was idle for %s, disconnecting\n",
2516        s,
2517        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
2518                                                GNUNET_YES));
2519   /* call session destroy function */
2520   udp_disconnect_session (plugin,
2521                           s);
2522 }
2523
2524
2525 /**
2526  * Allocate a new session for the given endpoint address.
2527  * Note that this function does not inform the service
2528  * of the new session, this is the responsibility of the
2529  * caller (if needed).
2530  *
2531  * @param cls the `struct Plugin`
2532  * @param address address of the other peer to use
2533  * @param network_type network type the address belongs to
2534  * @return NULL on error, otherwise session handle
2535  */
2536 static struct GNUNET_ATS_Session *
2537 udp_plugin_create_session (void *cls,
2538                            const struct GNUNET_HELLO_Address *address,
2539                            enum GNUNET_ATS_Network_Type network_type)
2540 {
2541   struct Plugin *plugin = cls;
2542   struct GNUNET_ATS_Session *s;
2543
2544   s = GNUNET_new (struct GNUNET_ATS_Session);
2545   s->plugin = plugin;
2546   s->address = GNUNET_HELLO_address_copy (address);
2547   s->target = address->peer;
2548   s->last_transmit_time = GNUNET_TIME_absolute_get ();
2549   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2550                                                               250);
2551   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
2552   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO;
2553   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
2554   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
2555   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
2556                                                   &session_timeout,
2557                                                   s);
2558   s->scope = network_type;
2559
2560   LOG (GNUNET_ERROR_TYPE_DEBUG,
2561        "Creating new session %p for peer `%s' address `%s'\n",
2562        s,
2563        GNUNET_i2s (&address->peer),
2564        udp_address_to_string (plugin,
2565                               address->address,
2566                               address->address_length));
2567   GNUNET_assert (GNUNET_OK ==
2568                  GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
2569                                                     &s->target,
2570                                                     s,
2571                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2572   GNUNET_STATISTICS_set (plugin->env->stats,
2573                          "# UDP sessions active",
2574                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2575                          GNUNET_NO);
2576   notify_session_monitor (plugin,
2577                           s,
2578                           GNUNET_TRANSPORT_SS_INIT);
2579   return s;
2580 }
2581
2582
2583 /**
2584  * Creates a new outbound session the transport service will use to
2585  * send data to the peer.
2586  *
2587  * @param cls the `struct Plugin *`
2588  * @param address the address
2589  * @return the session or NULL of max connections exceeded
2590  */
2591 static struct GNUNET_ATS_Session *
2592 udp_plugin_get_session (void *cls,
2593                         const struct GNUNET_HELLO_Address *address)
2594 {
2595   struct Plugin *plugin = cls;
2596   struct GNUNET_ATS_Session *s;
2597   enum GNUNET_ATS_Network_Type network_type = GNUNET_ATS_NET_UNSPECIFIED;
2598   const struct IPv4UdpAddress *udp_v4;
2599   const struct IPv6UdpAddress *udp_v6;
2600
2601   if (NULL == address)
2602   {
2603     GNUNET_break (0);
2604     return NULL;
2605   }
2606   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
2607        (address->address_length != sizeof(struct IPv6UdpAddress)) )
2608   {
2609     GNUNET_break_op (0);
2610     return NULL;
2611   }
2612   if (NULL != (s = udp_plugin_lookup_session (cls,
2613                                               address)))
2614     return s;
2615
2616   /* need to create new session */
2617   if (sizeof (struct IPv4UdpAddress) == address->address_length)
2618   {
2619     struct sockaddr_in v4;
2620
2621     udp_v4 = (const struct IPv4UdpAddress *) address->address;
2622     memset (&v4, '\0', sizeof (v4));
2623     v4.sin_family = AF_INET;
2624 #if HAVE_SOCKADDR_IN_SIN_LEN
2625     v4.sin_len = sizeof (struct sockaddr_in);
2626 #endif
2627     v4.sin_port = udp_v4->u4_port;
2628     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
2629     network_type = plugin->env->get_address_type (plugin->env->cls,
2630                                                   (const struct sockaddr *) &v4,
2631                                                   sizeof (v4));
2632   }
2633   if (sizeof (struct IPv6UdpAddress) == address->address_length)
2634   {
2635     struct sockaddr_in6 v6;
2636
2637     udp_v6 = (const struct IPv6UdpAddress *) address->address;
2638     memset (&v6, '\0', sizeof (v6));
2639     v6.sin6_family = AF_INET6;
2640 #if HAVE_SOCKADDR_IN_SIN_LEN
2641     v6.sin6_len = sizeof (struct sockaddr_in6);
2642 #endif
2643     v6.sin6_port = udp_v6->u6_port;
2644     v6.sin6_addr = udp_v6->ipv6_addr;
2645     network_type = plugin->env->get_address_type (plugin->env->cls,
2646                                                   (const struct sockaddr *) &v6,
2647                                                   sizeof (v6));
2648   }
2649   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2650   return udp_plugin_create_session (cls,
2651                                     address,
2652                                     network_type);
2653 }
2654
2655
2656 /**
2657  * We've received a UDP Message.  Process it (pass contents to main service).
2658  *
2659  * @param plugin plugin context
2660  * @param msg the message
2661  * @param udp_addr sender address
2662  * @param udp_addr_len number of bytes in @a udp_addr
2663  * @param network_type network type the address belongs to
2664  */
2665 static void
2666 process_udp_message (struct Plugin *plugin,
2667                      const struct UDPMessage *msg,
2668                      const union UdpAddress *udp_addr,
2669                      size_t udp_addr_len,
2670                      enum GNUNET_ATS_Network_Type network_type)
2671 {
2672   struct GNUNET_ATS_Session *s;
2673   struct GNUNET_HELLO_Address *address;
2674
2675   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2676   if (0 != ntohl (msg->reserved))
2677   {
2678     GNUNET_break_op(0);
2679     return;
2680   }
2681   if (ntohs (msg->header.size)
2682       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2683   {
2684     GNUNET_break_op(0);
2685     return;
2686   }
2687
2688   address = GNUNET_HELLO_address_allocate (&msg->sender,
2689                                            PLUGIN_NAME,
2690                                            udp_addr,
2691                                            udp_addr_len,
2692                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2693   if (NULL ==
2694       (s = udp_plugin_lookup_session (plugin,
2695                                       address)))
2696   {
2697     s = udp_plugin_create_session (plugin,
2698                                    address,
2699                                    network_type);
2700     plugin->env->session_start (plugin->env->cls,
2701                                 address,
2702                                 s,
2703                                 s->scope);
2704     notify_session_monitor (plugin,
2705                             s,
2706                             GNUNET_TRANSPORT_SS_UP);
2707   }
2708   GNUNET_free (address);
2709
2710   s->rc++;
2711   GNUNET_SERVER_mst_receive (plugin->mst,
2712                              s,
2713                              (const char *) &msg[1],
2714                              ntohs (msg->header.size) - sizeof(struct UDPMessage),
2715                              GNUNET_YES,
2716                              GNUNET_NO);
2717   s->rc--;
2718   if ( (0 == s->rc) &&
2719        (GNUNET_YES == s->in_destroy) )
2720     free_session (s);
2721 }
2722
2723
2724 /**
2725  * Process a defragmented message.
2726  *
2727  * @param cls the `struct DefragContext *`
2728  * @param msg the message
2729  */
2730 static void
2731 fragment_msg_proc (void *cls,
2732                    const struct GNUNET_MessageHeader *msg)
2733 {
2734   struct DefragContext *dc = cls;
2735   const struct UDPMessage *um;
2736
2737   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2738   {
2739     GNUNET_break_op (0);
2740     return;
2741   }
2742   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2743   {
2744     GNUNET_break_op (0);
2745     return;
2746   }
2747   um = (const struct UDPMessage *) msg;
2748   dc->sender = um->sender;
2749   dc->have_sender = GNUNET_YES;
2750   process_udp_message (dc->plugin,
2751                        um,
2752                        dc->udp_addr,
2753                        dc->udp_addr_len,
2754                        dc->network_type);
2755 }
2756
2757
2758 /**
2759  * We finished sending an acknowledgement.  Update
2760  * statistics.
2761  *
2762  * @param cls the `struct Plugin`
2763  * @param udpw message queue entry of the ACK
2764  * @param result #GNUNET_OK if the transmission worked,
2765  *               #GNUNET_SYSERR if we failed to send the ACK
2766  */
2767 static void
2768 ack_message_sent (void *cls,
2769                   struct UDP_MessageWrapper *udpw,
2770                   int result)
2771 {
2772   struct Plugin *plugin = cls;
2773
2774   if (GNUNET_OK == result)
2775   {
2776     GNUNET_STATISTICS_update (plugin->env->stats,
2777                               "# UDP, ACK messages sent",
2778                               1,
2779                               GNUNET_NO);
2780   }
2781   else
2782   {
2783     GNUNET_STATISTICS_update (plugin->env->stats,
2784                               "# UDP, ACK transmissions failed",
2785                               1,
2786                               GNUNET_NO);
2787   }
2788 }
2789
2790
2791 /**
2792  * Transmit an acknowledgement.
2793  *
2794  * @param cls the `struct DefragContext *`
2795  * @param id message ID (unused)
2796  * @param msg ack to transmit
2797  */
2798 static void
2799 ack_proc (void *cls,
2800           uint32_t id,
2801           const struct GNUNET_MessageHeader *msg)
2802 {
2803   struct DefragContext *rc = cls;
2804   struct Plugin *plugin = rc->plugin;
2805   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2806   struct UDP_ACK_Message *udp_ack;
2807   uint32_t delay;
2808   struct UDP_MessageWrapper *udpw;
2809   struct GNUNET_ATS_Session *s;
2810   struct GNUNET_HELLO_Address *address;
2811
2812   if (GNUNET_NO == rc->have_sender)
2813   {
2814     /* tried to defragment but never succeeded, hence will not ACK */
2815     /* This can happen if we just lost msgs */
2816     GNUNET_STATISTICS_update (plugin->env->stats,
2817                               "# UDP, fragments discarded without ACK",
2818                               1,
2819                               GNUNET_NO);
2820     return;
2821   }
2822   address = GNUNET_HELLO_address_allocate (&rc->sender,
2823                                            PLUGIN_NAME,
2824                                            rc->udp_addr,
2825                                            rc->udp_addr_len,
2826                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2827   s = udp_plugin_lookup_session (plugin,
2828                                  address);
2829   GNUNET_HELLO_address_free (address);
2830   if (NULL == s)
2831   {
2832     LOG (GNUNET_ERROR_TYPE_ERROR,
2833          "Trying to transmit ACK to peer `%s' but no session found!\n",
2834          udp_address_to_string (plugin,
2835                                 rc->udp_addr,
2836                                 rc->udp_addr_len));
2837     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2838     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2839     GNUNET_free (rc);
2840     GNUNET_STATISTICS_update (plugin->env->stats,
2841                               "# UDP, ACK transmissions failed",
2842                               1,
2843                               GNUNET_NO);
2844     return;
2845   }
2846   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
2847     delay = s->flow_delay_for_other_peer.rel_value_us;
2848   else
2849     delay = UINT32_MAX;
2850   LOG (GNUNET_ERROR_TYPE_DEBUG,
2851        "Sending ACK to `%s' including delay of %s\n",
2852        udp_address_to_string (plugin,
2853                               rc->udp_addr,
2854                               rc->udp_addr_len),
2855        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2856                                                GNUNET_YES));
2857   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2858   udpw->msg_size = msize;
2859   udpw->payload_size = 0;
2860   udpw->session = s;
2861   udpw->start_time = GNUNET_TIME_absolute_get ();
2862   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2863   udpw->msg_buf = (char *) &udpw[1];
2864   udpw->qc = &ack_message_sent;
2865   udpw->qc_cls = plugin;
2866   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2867   udp_ack->header.size = htons ((uint16_t) msize);
2868   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2869   udp_ack->delay = htonl (delay);
2870   udp_ack->sender = *plugin->env->my_identity;
2871   memcpy (&udp_ack[1],
2872           msg,
2873           ntohs (msg->size));
2874   enqueue (plugin,
2875            udpw);
2876   notify_session_monitor (plugin,
2877                           s,
2878                           GNUNET_TRANSPORT_SS_UPDATE);
2879   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2880     schedule_select_v4 (plugin);
2881   else
2882     schedule_select_v6 (plugin);
2883 }
2884
2885
2886 /**
2887  * We received a fragment, process it.
2888  *
2889  * @param plugin our plugin
2890  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2891  * @param udp_addr sender address
2892  * @param udp_addr_len number of bytes in @a udp_addr
2893  * @param network_type network type the address belongs to
2894  */
2895 static void
2896 read_process_fragment (struct Plugin *plugin,
2897                        const struct GNUNET_MessageHeader *msg,
2898                        const union UdpAddress *udp_addr,
2899                        size_t udp_addr_len,
2900                        enum GNUNET_ATS_Network_Type network_type)
2901 {
2902   struct DefragContext *d_ctx;
2903   struct GNUNET_TIME_Absolute now;
2904   struct FindReceiveContext frc;
2905
2906   frc.rc = NULL;
2907   frc.udp_addr = udp_addr;
2908   frc.udp_addr_len = udp_addr_len;
2909
2910   /* Lookup existing receive context for this address */
2911   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2912                                  &find_receive_context,
2913                                  &frc);
2914   now = GNUNET_TIME_absolute_get ();
2915   d_ctx = frc.rc;
2916
2917   if (NULL == d_ctx)
2918   {
2919     /* Create a new defragmentation context */
2920     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
2921     memcpy (&d_ctx[1],
2922             udp_addr,
2923             udp_addr_len);
2924     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2925     d_ctx->udp_addr_len = udp_addr_len;
2926     d_ctx->network_type = network_type;
2927     d_ctx->plugin = plugin;
2928     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2929                                                       UDP_MTU,
2930                                                       UDP_MAX_MESSAGES_IN_DEFRAG,
2931                                                       d_ctx,
2932                                                       &fragment_msg_proc,
2933                                                       &ack_proc);
2934     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2935                                                  d_ctx,
2936                                                  (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2937     LOG (GNUNET_ERROR_TYPE_DEBUG,
2938          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2939          (unsigned int) ntohs (msg->size),
2940          udp_address_to_string (plugin,
2941                                 udp_addr,
2942                                 udp_addr_len));
2943   }
2944   else
2945   {
2946     LOG (GNUNET_ERROR_TYPE_DEBUG,
2947          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2948          (unsigned int) ntohs (msg->size),
2949          udp_address_to_string (plugin,
2950                                 udp_addr,
2951                                 udp_addr_len));
2952   }
2953
2954   if (GNUNET_OK ==
2955       GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag,
2956                                           msg))
2957   {
2958     /* keep this 'rc' from expiring */
2959     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
2960                                        d_ctx->hnode,
2961                                        (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2962   }
2963   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2964       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2965   {
2966     /* remove 'rc' that was inactive the longest */
2967     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2968     GNUNET_assert (NULL != d_ctx);
2969     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2970     GNUNET_free (d_ctx);
2971     GNUNET_STATISTICS_update (plugin->env->stats,
2972                               "# UDP, Defragmentations aborted",
2973                               1,
2974                               GNUNET_NO);
2975   }
2976 }
2977
2978
2979 /**
2980  * Read and process a message from the given socket.
2981  *
2982  * @param plugin the overall plugin
2983  * @param rsock socket to read from
2984  */
2985 static void
2986 udp_select_read (struct Plugin *plugin,
2987                  struct GNUNET_NETWORK_Handle *rsock)
2988 {
2989   socklen_t fromlen;
2990   struct sockaddr_storage addr;
2991   char buf[65536] GNUNET_ALIGN;
2992   ssize_t size;
2993   const struct GNUNET_MessageHeader *msg;
2994   struct IPv4UdpAddress v4;
2995   struct IPv6UdpAddress v6;
2996   const struct sockaddr *sa;
2997   const struct sockaddr_in *sa4;
2998   const struct sockaddr_in6 *sa6;
2999   const union UdpAddress *int_addr;
3000   size_t int_addr_len;
3001   enum GNUNET_ATS_Network_Type network_type;
3002
3003   fromlen = sizeof (addr);
3004   memset (&addr,
3005           0,
3006           sizeof(addr));
3007   size = GNUNET_NETWORK_socket_recvfrom (rsock,
3008                                          buf,
3009                                          sizeof(buf),
3010                                          (struct sockaddr *) &addr,
3011                                          &fromlen);
3012   sa = (const struct sockaddr *) &addr;
3013 #if MINGW
3014   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
3015    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
3016    * on this socket has failed.
3017    * Quote from MSDN:
3018    *   WSAECONNRESET - The virtual circuit was reset by the remote side
3019    *   executing a hard or abortive close. The application should close
3020    *   the socket; it is no longer usable. On a UDP-datagram socket this
3021    *   error indicates a previous send operation resulted in an ICMP Port
3022    *   Unreachable message.
3023    */
3024   if ( (-1 == size) &&
3025        (ECONNRESET == errno) )
3026     return;
3027 #endif
3028   if (-1 == size)
3029   {
3030     LOG (GNUNET_ERROR_TYPE_DEBUG,
3031          "UDP failed to receive data: %s\n",
3032          STRERROR (errno));
3033     /* Connection failure or something. Not a protocol violation. */
3034     return;
3035   }
3036
3037   /* Check if this is a STUN packet */
3038   if (GNUNET_NAT_is_valid_stun_packet (plugin->nat,
3039                                        (uint8_t *)buf,
3040                                        size))
3041     return; /* was STUN, do not process further */
3042
3043   if (size < sizeof(struct GNUNET_MessageHeader))
3044   {
3045     LOG (GNUNET_ERROR_TYPE_WARNING,
3046          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
3047          (unsigned int ) size,
3048          GNUNET_a2s (sa,
3049                      fromlen));
3050     /* _MAY_ be a connection failure (got partial message) */
3051     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
3052     GNUNET_break_op (0);
3053     return;
3054   }
3055
3056
3057
3058
3059   msg = (const struct GNUNET_MessageHeader *) buf;
3060   LOG (GNUNET_ERROR_TYPE_DEBUG,
3061        "UDP received %u-byte message from `%s' type %u\n",
3062        (unsigned int) size,
3063        GNUNET_a2s (sa,
3064                    fromlen),
3065        ntohs (msg->type));
3066   if (size != ntohs (msg->size))
3067   {
3068     LOG (GNUNET_ERROR_TYPE_WARNING,
3069          "UDP malformed message header from %s\n",
3070          (unsigned int) size,
3071          GNUNET_a2s (sa,
3072                      fromlen));
3073     GNUNET_break_op (0);
3074     return;
3075   }
3076   GNUNET_STATISTICS_update (plugin->env->stats,
3077                             "# UDP, total bytes received",
3078                             size,
3079                             GNUNET_NO);
3080   network_type = plugin->env->get_address_type (plugin->env->cls,
3081                                                 sa,
3082                                                 fromlen);
3083   switch (sa->sa_family)
3084   {
3085   case AF_INET:
3086     sa4 = (const struct sockaddr_in *) &addr;
3087     v4.options = 0;
3088     v4.ipv4_addr = sa4->sin_addr.s_addr;
3089     v4.u4_port = sa4->sin_port;
3090     int_addr = (union UdpAddress *) &v4;
3091     int_addr_len = sizeof (v4);
3092     break;
3093   case AF_INET6:
3094     sa6 = (const struct sockaddr_in6 *) &addr;
3095     v6.options = 0;
3096     v6.ipv6_addr = sa6->sin6_addr;
3097     v6.u6_port = sa6->sin6_port;
3098     int_addr = (union UdpAddress *) &v6;
3099     int_addr_len = sizeof (v6);
3100     break;
3101   default:
3102     GNUNET_break (0);
3103     return;
3104   }
3105
3106   switch (ntohs (msg->type))
3107   {
3108   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
3109     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
3110       udp_broadcast_receive (plugin,
3111                              buf,
3112                              size,
3113                              int_addr,
3114                              int_addr_len,
3115                              network_type);
3116     return;
3117   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
3118     if (ntohs (msg->size) < sizeof(struct UDPMessage))
3119     {
3120       GNUNET_break_op(0);
3121       return;
3122     }
3123     process_udp_message (plugin,
3124                          (const struct UDPMessage *) msg,
3125                          int_addr,
3126                          int_addr_len,
3127                          network_type);
3128     return;
3129   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
3130     read_process_ack (plugin,
3131                       msg,
3132                       int_addr,
3133                       int_addr_len);
3134     return;
3135   case GNUNET_MESSAGE_TYPE_FRAGMENT:
3136     read_process_fragment (plugin,
3137                            msg,
3138                            int_addr,
3139                            int_addr_len,
3140                            network_type);
3141     return;
3142   default:
3143     GNUNET_break_op(0);
3144     return;
3145   }
3146 }
3147
3148
3149 /**
3150  * Removes messages from the transmission queue that have
3151  * timed out, and then selects a message that should be
3152  * transmitted next.
3153  *
3154  * @param plugin the UDP plugin
3155  * @param sock which socket should we process the queue for (v4 or v6)
3156  * @return message selected for transmission, or NULL for none
3157  */
3158 static struct UDP_MessageWrapper *
3159 remove_timeout_messages_and_select (struct Plugin *plugin,
3160                                     struct GNUNET_NETWORK_Handle *sock)
3161 {
3162   struct UDP_MessageWrapper *udpw;
3163   struct GNUNET_TIME_Relative remaining;
3164   struct GNUNET_ATS_Session *session;
3165   int removed;
3166
3167   removed = GNUNET_NO;
3168   udpw = (sock == plugin->sockv4)
3169     ? plugin->ipv4_queue_head
3170     : plugin->ipv6_queue_head;
3171   while (NULL != udpw)
3172   {
3173     session = udpw->session;
3174     /* Find messages with timeout */
3175     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
3176     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
3177     {
3178       /* Message timed out */
3179       removed = GNUNET_YES;
3180       dequeue (plugin,
3181                udpw);
3182       udpw->qc (udpw->qc_cls,
3183                 udpw,
3184                 GNUNET_SYSERR);
3185       GNUNET_free (udpw);
3186
3187       if (sock == plugin->sockv4)
3188       {
3189         udpw = plugin->ipv4_queue_head;
3190       }
3191       else if (sock == plugin->sockv6)
3192       {
3193         udpw = plugin->ipv6_queue_head;
3194       }
3195       else
3196       {
3197         GNUNET_break (0); /* should never happen */
3198         udpw = NULL;
3199       }
3200       GNUNET_STATISTICS_update (plugin->env->stats,
3201                                 "# messages discarded due to timeout",
3202                                 1,
3203                                 GNUNET_NO);
3204     }
3205     else
3206     {
3207       /* Message did not time out, check transmission time */
3208       remaining = GNUNET_TIME_absolute_get_remaining (udpw->transmission_time);
3209       if (0 == remaining.rel_value_us)
3210       {
3211         /* this message is not delayed */
3212         LOG (GNUNET_ERROR_TYPE_DEBUG,
3213              "Message for peer `%s' (%u bytes) is not delayed \n",
3214              GNUNET_i2s (&udpw->session->target),
3215              udpw->payload_size);
3216         break; /* Found message to send, break */
3217       }
3218       else
3219       {
3220         /* Message is delayed, try next */
3221         LOG (GNUNET_ERROR_TYPE_DEBUG,
3222              "Message for peer `%s' (%u bytes) is delayed for %s\n",
3223              GNUNET_i2s (&udpw->session->target),
3224              udpw->payload_size,
3225              GNUNET_STRINGS_relative_time_to_string (remaining,
3226                                                      GNUNET_YES));
3227         udpw = udpw->next;
3228       }
3229     }
3230   }
3231   if (GNUNET_YES == removed)
3232     notify_session_monitor (session->plugin,
3233                             session,
3234                             GNUNET_TRANSPORT_SS_UPDATE);
3235   return udpw;
3236 }
3237
3238
3239 /**
3240  * We failed to transmit a message via UDP. Generate
3241  * a descriptive error message.
3242  *
3243  * @param plugin our plugin
3244  * @param sa target address we were trying to reach
3245  * @param slen number of bytes in @a sa
3246  * @param error the errno value returned from the sendto() call
3247  */
3248 static void
3249 analyze_send_error (struct Plugin *plugin,
3250                     const struct sockaddr *sa,
3251                     socklen_t slen,
3252                     int error)
3253 {
3254   enum GNUNET_ATS_Network_Type type;
3255
3256   type = plugin->env->get_address_type (plugin->env->cls,
3257                                         sa,
3258                                         slen);
3259   if ( ( (GNUNET_ATS_NET_LAN == type) ||
3260          (GNUNET_ATS_NET_WAN == type) ) &&
3261        ( (ENETUNREACH == errno) ||
3262          (ENETDOWN == errno) ) )
3263   {
3264     if (slen == sizeof (struct sockaddr_in))
3265     {
3266       /* IPv4: "Network unreachable" or "Network down"
3267        *
3268        * This indicates we do not have connectivity
3269        */
3270       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3271            _("UDP could not transmit message to `%s': "
3272              "Network seems down, please check your network configuration\n"),
3273            GNUNET_a2s (sa,
3274                        slen));
3275     }
3276     if (slen == sizeof (struct sockaddr_in6))
3277     {
3278       /* IPv6: "Network unreachable" or "Network down"
3279        *
3280        * This indicates that this system is IPv6 enabled, but does not
3281        * have a valid global IPv6 address assigned or we do not have
3282        * connectivity
3283        */
3284       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3285            _("UDP could not transmit IPv6 message! "
3286              "Please check your network configuration and disable IPv6 if your "
3287              "connection does not have a global IPv6 address\n"));
3288     }
3289   }
3290   else
3291   {
3292     LOG (GNUNET_ERROR_TYPE_WARNING,
3293          "UDP could not transmit message to `%s': `%s'\n",
3294          GNUNET_a2s (sa,
3295                      slen),
3296          STRERROR (error));
3297   }
3298 }
3299
3300
3301 /**
3302  * It is time to try to transmit a UDP message.  Select one
3303  * and send.
3304  *
3305  * @param plugin the plugin
3306  * @param sock which socket (v4/v6) to send on
3307  */
3308 static void
3309 udp_select_send (struct Plugin *plugin,
3310                  struct GNUNET_NETWORK_Handle *sock)
3311 {
3312   ssize_t sent;
3313   socklen_t slen;
3314   const struct sockaddr *a;
3315   const struct IPv4UdpAddress *u4;
3316   struct sockaddr_in a4;
3317   const struct IPv6UdpAddress *u6;
3318   struct sockaddr_in6 a6;
3319   struct UDP_MessageWrapper *udpw;
3320
3321   /* Find message(s) to send */
3322   while (NULL != (udpw = remove_timeout_messages_and_select (plugin,
3323                                                              sock)))
3324   {
3325     if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3326     {
3327       u4 = udpw->session->address->address;
3328       memset (&a4,
3329               0,
3330               sizeof(a4));
3331       a4.sin_family = AF_INET;
3332 #if HAVE_SOCKADDR_IN_SIN_LEN
3333       a4.sin_len = sizeof (a4);
3334 #endif
3335       a4.sin_port = u4->u4_port;
3336       a4.sin_addr.s_addr = u4->ipv4_addr;
3337       a = (const struct sockaddr *) &a4;
3338       slen = sizeof (a4);
3339     }
3340     else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3341     {
3342       u6 = udpw->session->address->address;
3343       memset (&a6,
3344               0,
3345               sizeof(a6));
3346       a6.sin6_family = AF_INET6;
3347 #if HAVE_SOCKADDR_IN_SIN_LEN
3348       a6.sin6_len = sizeof (a6);
3349 #endif
3350       a6.sin6_port = u6->u6_port;
3351       a6.sin6_addr = u6->ipv6_addr;
3352       a = (const struct sockaddr *) &a6;
3353       slen = sizeof (a6);
3354     }
3355     else
3356     {
3357       GNUNET_break (0);
3358       dequeue (plugin,
3359                udpw);
3360       udpw->qc (udpw->qc_cls,
3361                 udpw,
3362                 GNUNET_SYSERR);
3363       notify_session_monitor (plugin,
3364                               udpw->session,
3365                               GNUNET_TRANSPORT_SS_UPDATE);
3366       GNUNET_free (udpw);
3367       continue;
3368     }
3369     sent = GNUNET_NETWORK_socket_sendto (sock,
3370                                          udpw->msg_buf,
3371                                          udpw->msg_size,
3372                                          a,
3373                                          slen);
3374     udpw->session->last_transmit_time
3375       = GNUNET_TIME_absolute_max (GNUNET_TIME_absolute_get (),
3376                                   udpw->session->last_transmit_time);
3377     dequeue (plugin,
3378              udpw);
3379     if (GNUNET_SYSERR == sent)
3380     {
3381       /* Failure */
3382       analyze_send_error (plugin,
3383                           a,
3384                           slen,
3385                           errno);
3386       udpw->qc (udpw->qc_cls,
3387                 udpw,
3388                 GNUNET_SYSERR);
3389       GNUNET_STATISTICS_update (plugin->env->stats,
3390                                 "# UDP, total, bytes, sent, failure",
3391                                 sent,
3392                                 GNUNET_NO);
3393       GNUNET_STATISTICS_update (plugin->env->stats,
3394                                 "# UDP, total, messages, sent, failure",
3395                                 1,
3396                                 GNUNET_NO);
3397     }
3398     else
3399     {
3400       /* Success */
3401       LOG (GNUNET_ERROR_TYPE_DEBUG,
3402            "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3403            (unsigned int) (udpw->msg_size),
3404            GNUNET_i2s (&udpw->session->target),
3405            GNUNET_a2s (a,
3406                        slen),
3407            (int ) sent,
3408            (sent < 0) ? STRERROR (errno) : "ok");
3409       GNUNET_STATISTICS_update (plugin->env->stats,
3410                                 "# UDP, total, bytes, sent, success",
3411                                 sent,
3412                                 GNUNET_NO);
3413       GNUNET_STATISTICS_update (plugin->env->stats,
3414                                 "# UDP, total, messages, sent, success",
3415                                 1,
3416                                 GNUNET_NO);
3417       if (NULL != udpw->frag_ctx)
3418         udpw->frag_ctx->on_wire_size += udpw->msg_size;
3419       udpw->qc (udpw->qc_cls,
3420                 udpw,
3421                 GNUNET_OK);
3422     }
3423     notify_session_monitor (plugin,
3424                             udpw->session,
3425                             GNUNET_TRANSPORT_SS_UPDATE);
3426     GNUNET_free (udpw);
3427   }
3428 }
3429
3430
3431 /* ***************** Event loop (part 2) *************** */
3432
3433
3434 /**
3435  * We have been notified that our readset has something to read.  We don't
3436  * know which socket needs to be read, so we have to check each one
3437  * Then reschedule this function to be called again once more is available.
3438  *
3439  * @param cls the plugin handle
3440  * @param tc the scheduling context
3441  */
3442 static void
3443 udp_plugin_select_v4 (void *cls,
3444                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3445 {
3446   struct Plugin *plugin = cls;
3447
3448   plugin->select_task_v4 = NULL;
3449   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3450     return;
3451   if (NULL == plugin->sockv4)
3452     return;
3453   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3454       (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3455                                    plugin->sockv4)))
3456     udp_select_read (plugin,
3457                      plugin->sockv4);
3458   udp_select_send (plugin,
3459                    plugin->sockv4);
3460   schedule_select_v4 (plugin);
3461 }
3462
3463
3464 /**
3465  * We have been notified that our readset has something to read.  We don't
3466  * know which socket needs to be read, so we have to check each one
3467  * Then reschedule this function to be called again once more is available.
3468  *
3469  * @param cls the plugin handle
3470  * @param tc the scheduling context
3471  */
3472 static void
3473 udp_plugin_select_v6 (void *cls,
3474                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3475 {
3476   struct Plugin *plugin = cls;
3477
3478   plugin->select_task_v6 = NULL;
3479   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3480     return;
3481   if (NULL == plugin->sockv6)
3482     return;
3483   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3484        (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3485                                     plugin->sockv6)) )
3486     udp_select_read (plugin,
3487                      plugin->sockv6);
3488
3489   udp_select_send (plugin,
3490                    plugin->sockv6);
3491   schedule_select_v6 (plugin);
3492 }
3493
3494
3495 /* ******************* Initialization *************** */
3496
3497
3498 /**
3499  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3500  *
3501  * @param plugin the plugin to initialize
3502  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3503  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3504  * @return number of sockets that were successfully bound
3505  */
3506 static int
3507 setup_sockets (struct Plugin *plugin,
3508                const struct sockaddr_in6 *bind_v6,
3509                const struct sockaddr_in *bind_v4)
3510 {
3511   int tries;
3512   int sockets_created = 0;
3513   struct sockaddr_in6 server_addrv6;
3514   struct sockaddr_in server_addrv4;
3515   const struct sockaddr *server_addr;
3516   const struct sockaddr *addrs[2];
3517   socklen_t addrlens[2];
3518   socklen_t addrlen;
3519   int eno;
3520
3521   /* Create IPv6 socket */
3522   eno = EINVAL;
3523   if (GNUNET_YES == plugin->enable_ipv6)
3524   {
3525     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6,
3526                                                    SOCK_DGRAM,
3527                                                    0);
3528     if (NULL == plugin->sockv6)
3529     {
3530       LOG (GNUNET_ERROR_TYPE_INFO,
3531            _("Disabling IPv6 since it is not supported on this system!\n"));
3532       plugin->enable_ipv6 = GNUNET_NO;
3533     }
3534     else
3535     {
3536       memset (&server_addrv6,
3537               0,
3538               sizeof(struct sockaddr_in6));
3539 #if HAVE_SOCKADDR_IN_SIN_LEN
3540       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3541 #endif
3542       server_addrv6.sin6_family = AF_INET6;
3543       if (NULL != bind_v6)
3544         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3545       else
3546         server_addrv6.sin6_addr = in6addr_any;
3547
3548       if (0 == plugin->port) /* autodetect */
3549         server_addrv6.sin6_port
3550           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3551                                              33537)
3552                    + 32000);
3553       else
3554         server_addrv6.sin6_port = htons (plugin->port);
3555       addrlen = sizeof (struct sockaddr_in6);
3556       server_addr = (const struct sockaddr *) &server_addrv6;
3557
3558       tries = 0;
3559       while (tries < 10)
3560       {
3561         LOG(GNUNET_ERROR_TYPE_DEBUG,
3562             "Binding to IPv6 `%s'\n",
3563             GNUNET_a2s (server_addr,
3564                         addrlen));
3565         /* binding */
3566         if (GNUNET_OK ==
3567             GNUNET_NETWORK_socket_bind (plugin->sockv6,
3568                                         server_addr,
3569                                         addrlen))
3570           break;
3571         eno = errno;
3572         if (0 != plugin->port)
3573         {
3574           tries = 10; /* fail immediately */
3575           break; /* bind failed on specific port */
3576         }
3577         /* autodetect */
3578         server_addrv6.sin6_port
3579           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3580                                              33537)
3581                    + 32000);
3582         tries++;
3583       }
3584       if (tries >= 10)
3585       {
3586         GNUNET_NETWORK_socket_close (plugin->sockv6);
3587         plugin->enable_ipv6 = GNUNET_NO;
3588         plugin->sockv6 = NULL;
3589       }
3590       else
3591       {
3592         plugin->port = ntohs (server_addrv6.sin6_port);
3593       }
3594       if (NULL != plugin->sockv6)
3595       {
3596         LOG (GNUNET_ERROR_TYPE_DEBUG,
3597              "IPv6 UDP socket created listinging at %s\n",
3598              GNUNET_a2s (server_addr,
3599                          addrlen));
3600         addrs[sockets_created] = server_addr;
3601         addrlens[sockets_created] = addrlen;
3602         sockets_created++;
3603       }
3604       else
3605       {
3606         LOG (GNUNET_ERROR_TYPE_WARNING,
3607              _("Failed to bind UDP socket to %s: %s\n"),
3608              GNUNET_a2s (server_addr,
3609                          addrlen),
3610              STRERROR (eno));
3611       }
3612     }
3613   }
3614
3615   /* Create IPv4 socket */
3616   eno = EINVAL;
3617   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET,
3618                                                  SOCK_DGRAM,
3619                                                  0);
3620   if (NULL == plugin->sockv4)
3621   {
3622     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3623                          "socket");
3624     LOG (GNUNET_ERROR_TYPE_INFO,
3625          _("Disabling IPv4 since it is not supported on this system!\n"));
3626     plugin->enable_ipv4 = GNUNET_NO;
3627   }
3628   else
3629   {
3630     memset (&server_addrv4,
3631             0,
3632             sizeof(struct sockaddr_in));
3633 #if HAVE_SOCKADDR_IN_SIN_LEN
3634     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3635 #endif
3636     server_addrv4.sin_family = AF_INET;
3637     if (NULL != bind_v4)
3638       server_addrv4.sin_addr = bind_v4->sin_addr;
3639     else
3640       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3641
3642     if (0 == plugin->port)
3643       /* autodetect */
3644       server_addrv4.sin_port
3645         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3646                                            33537)
3647                  + 32000);
3648     else
3649       server_addrv4.sin_port = htons (plugin->port);
3650
3651     addrlen = sizeof (struct sockaddr_in);
3652     server_addr = (const struct sockaddr *) &server_addrv4;
3653
3654     tries = 0;
3655     while (tries < 10)
3656     {
3657       LOG (GNUNET_ERROR_TYPE_DEBUG,
3658            "Binding to IPv4 `%s'\n",
3659            GNUNET_a2s (server_addr,
3660                        addrlen));
3661
3662       /* binding */
3663       if (GNUNET_OK ==
3664           GNUNET_NETWORK_socket_bind (plugin->sockv4,
3665                                       server_addr,
3666                                       addrlen))
3667         break;
3668       eno = errno;
3669       if (0 != plugin->port)
3670       {
3671         tries = 10; /* fail */
3672         break; /* bind failed on specific port */
3673       }
3674
3675       /* autodetect */
3676       server_addrv4.sin_port
3677         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3678                                            33537)
3679                  + 32000);
3680       tries++;
3681     }
3682     if (tries >= 10)
3683     {
3684       GNUNET_NETWORK_socket_close (plugin->sockv4);
3685       plugin->enable_ipv4 = GNUNET_NO;
3686       plugin->sockv4 = NULL;
3687     }
3688     else
3689     {
3690       plugin->port = ntohs (server_addrv4.sin_port);
3691     }
3692
3693     if (NULL != plugin->sockv4)
3694     {
3695       LOG (GNUNET_ERROR_TYPE_DEBUG,
3696            "IPv4 socket created on port %s\n",
3697            GNUNET_a2s (server_addr,
3698                        addrlen));
3699       addrs[sockets_created] = server_addr;
3700       addrlens[sockets_created] = addrlen;
3701       sockets_created++;
3702     }
3703     else
3704     {
3705       LOG (GNUNET_ERROR_TYPE_ERROR,
3706            _("Failed to bind UDP socket to %s: %s\n"),
3707            GNUNET_a2s (server_addr,
3708                        addrlen),
3709            STRERROR (eno));
3710     }
3711   }
3712
3713   if (0 == sockets_created)
3714   {
3715     LOG (GNUNET_ERROR_TYPE_WARNING,
3716          _("Failed to open UDP sockets\n"));
3717     return 0; /* No sockets created, return */
3718   }
3719   schedule_select_v4 (plugin);
3720   schedule_select_v6 (plugin);
3721   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3722                                      GNUNET_NO,
3723                                      plugin->port,
3724                                      sockets_created,
3725                                      addrs,
3726                                      addrlens,
3727                                      &udp_nat_port_map_callback,
3728                                      NULL,
3729                                      plugin,
3730                                      plugin->sockv4);
3731   return sockets_created;
3732 }
3733
3734
3735 /**
3736  * The exported method. Makes the core api available via a global and
3737  * returns the udp transport API.
3738  *
3739  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3740  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3741  */
3742 void *
3743 libgnunet_plugin_transport_udp_init (void *cls)
3744 {
3745   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3746   struct GNUNET_TRANSPORT_PluginFunctions *api;
3747   struct Plugin *p;
3748   unsigned long long port;
3749   unsigned long long aport;
3750   unsigned long long udp_max_bps;
3751   unsigned long long enable_v6;
3752   unsigned long long enable_broadcasting;
3753   unsigned long long enable_broadcasting_recv;
3754   char *bind4_address;
3755   char *bind6_address;
3756   struct GNUNET_TIME_Relative interval;
3757   struct sockaddr_in server_addrv4;
3758   struct sockaddr_in6 server_addrv6;
3759   int res;
3760   int have_bind4;
3761   int have_bind6;
3762
3763   if (NULL == env->receive)
3764   {
3765     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3766      initialze the plugin or the API */
3767     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3768     api->cls = NULL;
3769     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3770     api->address_to_string = &udp_address_to_string;
3771     api->string_to_address = &udp_string_to_address;
3772     return api;
3773   }
3774
3775   /* Get port number: port == 0 : autodetect a port,
3776    * > 0 : use this port, not given : 2086 default */
3777   if (GNUNET_OK !=
3778       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3779                                              "transport-udp",
3780                                              "PORT",
3781                                              &port))
3782     port = 2086;
3783   if (port > 65535)
3784   {
3785     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3786                                "transport-udp",
3787                                "PORT",
3788                                _("must be in [0,65535]"));
3789     return NULL;
3790   }
3791   if (GNUNET_OK !=
3792       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3793                                              "transport-udp",
3794                                              "ADVERTISED_PORT",
3795                                              &aport))
3796     aport = port;
3797   if (aport > 65535)
3798   {
3799     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3800                                "transport-udp",
3801                                "ADVERTISED_PORT",
3802                                _("must be in [0,65535]"));
3803     return NULL;
3804   }
3805
3806   if (GNUNET_YES ==
3807       GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3808                                             "nat",
3809                                             "DISABLEV6"))
3810     enable_v6 = GNUNET_NO;
3811   else
3812     enable_v6 = GNUNET_YES;
3813
3814   have_bind4 = GNUNET_NO;
3815   memset (&server_addrv4,
3816           0,
3817           sizeof (server_addrv4));
3818   if (GNUNET_YES ==
3819       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3820                                              "transport-udp",
3821                                              "BINDTO",
3822                                              &bind4_address))
3823   {
3824     LOG (GNUNET_ERROR_TYPE_DEBUG,
3825          "Binding UDP plugin to specific address: `%s'\n",
3826          bind4_address);
3827     if (1 != inet_pton (AF_INET,
3828                         bind4_address,
3829                         &server_addrv4.sin_addr))
3830     {
3831       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3832                                  "transport-udp",
3833                                  "BINDTO",
3834                                  _("must be valid IPv4 address"));
3835       GNUNET_free (bind4_address);
3836       return NULL;
3837     }
3838     have_bind4 = GNUNET_YES;
3839   }
3840   GNUNET_free_non_null (bind4_address);
3841   have_bind6 = GNUNET_NO;
3842   memset (&server_addrv6,
3843           0,
3844           sizeof (server_addrv6));
3845   if (GNUNET_YES ==
3846       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3847                                              "transport-udp",
3848                                              "BINDTO6",
3849                                              &bind6_address))
3850   {
3851     LOG (GNUNET_ERROR_TYPE_DEBUG,
3852          "Binding udp plugin to specific address: `%s'\n",
3853          bind6_address);
3854     if (1 != inet_pton (AF_INET6,
3855                         bind6_address,
3856                         &server_addrv6.sin6_addr))
3857     {
3858       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3859                                  "transport-udp",
3860                                  "BINDTO6",
3861                                  _("must be valid IPv6 address"));
3862       GNUNET_free (bind6_address);
3863       return NULL;
3864     }
3865     have_bind6 = GNUNET_YES;
3866   }
3867   GNUNET_free_non_null (bind6_address);
3868
3869   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3870                                                               "transport-udp",
3871                                                               "BROADCAST");
3872   if (enable_broadcasting == GNUNET_SYSERR)
3873     enable_broadcasting = GNUNET_NO;
3874
3875   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3876                                                                    "transport-udp",
3877                                                                    "BROADCAST_RECEIVE");
3878   if (enable_broadcasting_recv == GNUNET_SYSERR)
3879     enable_broadcasting_recv = GNUNET_YES;
3880
3881   if (GNUNET_SYSERR ==
3882       GNUNET_CONFIGURATION_get_value_time (env->cfg,
3883                                            "transport-udp",
3884                                            "BROADCAST_INTERVAL",
3885                                            &interval))
3886   {
3887     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
3888                                               10);
3889   }
3890   if (GNUNET_OK !=
3891       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3892                                              "transport-udp",
3893                                              "MAX_BPS",
3894                                              &udp_max_bps))
3895   {
3896     /* 50 MB/s == infinity for practical purposes */
3897     udp_max_bps = 1024 * 1024 * 50;
3898   }
3899
3900   p = GNUNET_new (struct Plugin);
3901   p->port = port;
3902   p->aport = aport;
3903   p->broadcast_interval = interval;
3904   p->enable_ipv6 = enable_v6;
3905   p->enable_ipv4 = GNUNET_YES; /* default */
3906   p->enable_broadcasting = enable_broadcasting;
3907   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3908   p->env = env;
3909   p->sessions = GNUNET_CONTAINER_multipeermap_create (16,
3910                                                       GNUNET_NO);
3911   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3912   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
3913                                      p);
3914   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3915                                  NULL,
3916                                  NULL,
3917                                  GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
3918                                  30);
3919   res = setup_sockets (p,
3920                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3921                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3922   if ( (0 == res) ||
3923        ( (NULL == p->sockv4) &&
3924          (NULL == p->sockv6) ) )
3925   {
3926     LOG (GNUNET_ERROR_TYPE_ERROR,
3927         _("Failed to create UDP network sockets\n"));
3928     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3929     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3930     GNUNET_SERVER_mst_destroy (p->mst);
3931     GNUNET_free (p);
3932     return NULL;
3933   }
3934
3935   /* Setup broadcasting and receiving beacons */
3936   setup_broadcast (p,
3937                    &server_addrv6,
3938                    &server_addrv4);
3939
3940   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3941   api->cls = p;
3942   api->disconnect_session = &udp_disconnect_session;
3943   api->query_keepalive_factor = &udp_query_keepalive_factor;
3944   api->disconnect_peer = &udp_disconnect;
3945   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3946   api->address_to_string = &udp_address_to_string;
3947   api->string_to_address = &udp_string_to_address;
3948   api->check_address = &udp_plugin_check_address;
3949   api->get_session = &udp_plugin_get_session;
3950   api->send = &udp_plugin_send;
3951   api->get_network = &udp_plugin_get_network;
3952   api->get_network_for_address = &udp_plugin_get_network_for_address;
3953   api->update_session_timeout = &udp_plugin_update_session_timeout;
3954   api->setup_monitor = &udp_plugin_setup_monitor;
3955   return api;
3956 }
3957
3958
3959 /**
3960  * Function called on each entry in the defragmentation heap to
3961  * clean it up.
3962  *
3963  * @param cls NULL
3964  * @param node node in the heap (to be removed)
3965  * @param element a `struct DefragContext` to be cleaned up
3966  * @param cost unused
3967  * @return #GNUNET_YES
3968  */
3969 static int
3970 heap_cleanup_iterator (void *cls,
3971                        struct GNUNET_CONTAINER_HeapNode *node,
3972                        void *element,
3973                        GNUNET_CONTAINER_HeapCostType cost)
3974 {
3975   struct DefragContext *d_ctx = element;
3976
3977   GNUNET_CONTAINER_heap_remove_node (node);
3978   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3979   GNUNET_free (d_ctx);
3980   return GNUNET_YES;
3981 }
3982
3983
3984 /**
3985  * The exported method. Makes the core api available via a global and
3986  * returns the udp transport API.
3987  *
3988  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3989  * @return NULL
3990  */
3991 void *
3992 libgnunet_plugin_transport_udp_done (void *cls)
3993 {
3994   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3995   struct Plugin *plugin = api->cls;
3996   struct PrettyPrinterContext *cur;
3997   struct UDP_MessageWrapper *udpw;
3998
3999   if (NULL == plugin)
4000   {
4001     GNUNET_free (api);
4002     return NULL;
4003   }
4004   stop_broadcast (plugin);
4005   if (NULL != plugin->select_task_v4)
4006   {
4007     GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
4008     plugin->select_task_v4 = NULL;
4009   }
4010   if (NULL != plugin->select_task_v6)
4011   {
4012     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
4013     plugin->select_task_v6 = NULL;
4014   }
4015   if (NULL != plugin->sockv4)
4016   {
4017     GNUNET_break (GNUNET_OK ==
4018                   GNUNET_NETWORK_socket_close (plugin->sockv4));
4019     plugin->sockv4 = NULL;
4020   }
4021   if (NULL != plugin->sockv6)
4022   {
4023     GNUNET_break (GNUNET_OK ==
4024                   GNUNET_NETWORK_socket_close (plugin->sockv6));
4025     plugin->sockv6 = NULL;
4026   }
4027   if (NULL != plugin->nat)
4028   {
4029     GNUNET_NAT_unregister (plugin->nat);
4030     plugin->nat = NULL;
4031   }
4032   if (NULL != plugin->defrag_ctxs)
4033   {
4034     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
4035                                    &heap_cleanup_iterator,
4036                                    NULL);
4037     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
4038     plugin->defrag_ctxs = NULL;
4039   }
4040   if (NULL != plugin->mst)
4041   {
4042     GNUNET_SERVER_mst_destroy (plugin->mst);
4043     plugin->mst = NULL;
4044   }
4045   while (NULL != (udpw = plugin->ipv4_queue_head))
4046   {
4047     dequeue (plugin,
4048              udpw);
4049     udpw->qc (udpw->qc_cls,
4050               udpw,
4051               GNUNET_SYSERR);
4052     GNUNET_free (udpw);
4053   }
4054   while (NULL != (udpw = plugin->ipv6_queue_head))
4055   {
4056     dequeue (plugin,
4057              udpw);
4058     udpw->qc (udpw->qc_cls,
4059               udpw,
4060               GNUNET_SYSERR);
4061     GNUNET_free (udpw);
4062   }
4063   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
4064                                          &disconnect_and_free_it,
4065                                          plugin);
4066   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
4067
4068   while (NULL != (cur = plugin->ppc_dll_head))
4069   {
4070     GNUNET_break (0);
4071     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
4072                                  plugin->ppc_dll_tail,
4073                                  cur);
4074     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
4075     if (NULL != cur->timeout_task)
4076     {
4077       GNUNET_SCHEDULER_cancel (cur->timeout_task);
4078       cur->timeout_task = NULL;
4079     }
4080     GNUNET_free (cur);
4081   }
4082   GNUNET_free (plugin);
4083   GNUNET_free (api);
4084   return NULL;
4085 }
4086
4087 /* end of plugin_transport_udp.c */