fix #3649/#3591
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 /**
46  * After how much inactivity should a UDP session time out?
47  */
48 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
49
50 /**
51  * Number of messages we can defragment in parallel.  We only really
52  * defragment 1 message at a time, but if messages get re-ordered, we
53  * may want to keep knowledge about the previous message to avoid
54  * discarding the current message in favor of a single fragment of a
55  * previous message.  3 should be good since we don't expect massive
56  * message reorderings with UDP.
57  */
58 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
59
60 /**
61  * We keep a defragmentation queue per sender address.  How many
62  * sender addresses do we support at the same time? Memory consumption
63  * is roughly a factor of 32k * #UDP_MAX_MESSAGES_IN_DEFRAG times this
64  * value. (So 128 corresponds to 12 MB and should suffice for
65  * connecting to roughly 128 peers via UDP).
66  */
67 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
68
69
70 /**
71  * UDP Message-Packet header (after defragmentation).
72  */
73 struct UDPMessage
74 {
75   /**
76    * Message header.
77    */
78   struct GNUNET_MessageHeader header;
79
80   /**
81    * Always zero for now.
82    */
83   uint32_t reserved;
84
85   /**
86    * What is the identity of the sender
87    */
88   struct GNUNET_PeerIdentity sender;
89
90 };
91
92
93 /**
94  * Closure for #append_port().
95  */
96 struct PrettyPrinterContext
97 {
98   /**
99    * DLL
100    */
101   struct PrettyPrinterContext *next;
102
103   /**
104    * DLL
105    */
106   struct PrettyPrinterContext *prev;
107
108   /**
109    * Our plugin.
110    */
111   struct Plugin *plugin;
112
113   /**
114    * Resolver handle
115    */
116   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
117
118   /**
119    * Function to call with the result.
120    */
121   GNUNET_TRANSPORT_AddressStringCallback asc;
122
123   /**
124    * Clsoure for @e asc.
125    */
126   void *asc_cls;
127
128   /**
129    * Timeout task
130    */
131   struct GNUNET_SCHEDULER_Task *timeout_task;
132
133   /**
134    * Is this an IPv6 address?
135    */
136   int ipv6;
137
138   /**
139    * Options
140    */
141   uint32_t options;
142
143   /**
144    * Port to add after the IP address.
145    */
146   uint16_t port;
147
148 };
149
150
151 /**
152  * Session with another peer.
153  */
154 struct Session
155 {
156   /**
157    * Which peer is this session for?
158    */
159   struct GNUNET_PeerIdentity target;
160
161   /**
162    * Plugin this session belongs to.
163    */
164   struct Plugin *plugin;
165
166   /**
167    * Context for dealing with fragments.
168    */
169   struct UDP_FragmentationContext *frag_ctx;
170
171   /**
172    * Desired delay for next sending we send to other peer
173    */
174   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
175
176   /**
177    * Desired delay for next sending we received from other peer
178    */
179   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
180
181   /**
182    * Session timeout task
183    */
184   struct GNUNET_SCHEDULER_Task *timeout_task;
185
186   /**
187    * When does this session time out?
188    */
189   struct GNUNET_TIME_Absolute timeout;
190
191   /**
192    * expected delay for ACKs
193    */
194   struct GNUNET_TIME_Relative last_expected_ack_delay;
195
196   /**
197    * desired delay between UDP messages
198    */
199   struct GNUNET_TIME_Relative last_expected_msg_delay;
200
201   /**
202    * Our own address.
203    */
204   struct GNUNET_HELLO_Address *address;
205
206   /**
207    * Number of bytes waiting for transmission to this peer.
208    */
209   unsigned long long bytes_in_queue;
210
211   /**
212    * Number of messages waiting for transmission to this peer.
213    */
214   unsigned int msgs_in_queue;
215
216   /**
217    * Reference counter to indicate that this session is
218    * currently being used and must not be destroyed;
219    * setting @e in_destroy will destroy it as soon as
220    * possible.
221    */
222   unsigned int rc;
223
224   /**
225    * Network type of the address.
226    */
227   enum GNUNET_ATS_Network_Type scope;
228
229   /**
230    * Is this session about to be destroyed (sometimes we cannot
231    * destroy a session immediately as below us on the stack
232    * there might be code that still uses it; in this case,
233    * @e rc is non-zero).
234    */
235   int in_destroy;
236 };
237
238
239
240 /**
241  * Data structure to track defragmentation contexts based
242  * on the source of the UDP traffic.
243  */
244 struct DefragContext
245 {
246
247   /**
248    * Defragmentation context.
249    */
250   struct GNUNET_DEFRAGMENT_Context *defrag;
251
252   /**
253    * Reference to master plugin struct.
254    */
255   struct Plugin *plugin;
256
257   /**
258    * Node in the defrag heap.
259    */
260   struct GNUNET_CONTAINER_HeapNode *hnode;
261
262   /**
263    * Source address this receive context is for (allocated at the
264    * end of the struct).
265    */
266   const union UdpAddress *udp_addr;
267
268   /**
269    * Who's message(s) are we defragmenting here?
270    * Only initialized once we succeeded and
271    * @e have_sender is set.
272    */
273   struct GNUNET_PeerIdentity sender;
274
275   /**
276    * Length of @e udp_addr.
277    */
278   size_t udp_addr_len;
279
280   /**
281    * Network type the address belongs to.
282    */
283   enum GNUNET_ATS_Network_Type network_type;
284
285   /**
286    * Has the @e sender field been initialized yet?
287    */
288   int have_sender;
289 };
290
291
292 /**
293  * Context to send fragmented messages
294  */
295 struct UDP_FragmentationContext
296 {
297   /**
298    * Next in linked list
299    */
300   struct UDP_FragmentationContext *next;
301
302   /**
303    * Previous in linked list
304    */
305   struct UDP_FragmentationContext *prev;
306
307   /**
308    * The plugin
309    */
310   struct Plugin *plugin;
311
312   /**
313    * Handle for fragmentation.
314    */
315   struct GNUNET_FRAGMENT_Context *frag;
316
317   /**
318    * The session this fragmentation context belongs to
319    */
320   struct Session *session;
321
322   /**
323    * Function to call upon completion of the transmission.
324    */
325   GNUNET_TRANSPORT_TransmitContinuation cont;
326
327   /**
328    * Closure for @e cont.
329    */
330   void *cont_cls;
331
332   /**
333    * Message timeout
334    */
335   struct GNUNET_TIME_Absolute timeout;
336
337   /**
338    * Payload size of original unfragmented message
339    */
340   size_t payload_size;
341
342   /**
343    * Bytes used to send all fragments on wire including UDP overhead
344    */
345   size_t on_wire_size;
346
347 };
348
349
350 /**
351  * Function called when a message is removed from the
352  * transmission queue.
353  *
354  * @param cls closure
355  * @param udpw message wrapper finished
356  * @param result #GNUNET_OK on success (message was sent)
357  *               #GNUNET_SYSERR if the target disconnected
358  *               or we had a timeout or other trouble sending
359  */
360 typedef void
361 (*QueueContinuation) (void *cls,
362                       struct UDP_MessageWrapper *udpw,
363                       int result);
364
365
366 /**
367  * Information we track for each message in the queue.
368  */
369 struct UDP_MessageWrapper
370 {
371   /**
372    * Session this message belongs to
373    */
374   struct Session *session;
375
376   /**
377    * DLL of messages, previous element
378    */
379   struct UDP_MessageWrapper *prev;
380
381   /**
382    * DLL of messages, next element
383    */
384   struct UDP_MessageWrapper *next;
385
386   /**
387    * Message with @e msg_size bytes including UDP-specific overhead.
388    */
389   char *msg_buf;
390
391   /**
392    * Function to call once the message wrapper is being removed
393    * from the queue (with success or failure).
394    */
395   QueueContinuation qc;
396
397   /**
398    * Closure for @e qc.
399    */
400   void *qc_cls;
401
402   /**
403    * External continuation to call upon completion of the
404    * transmission, NULL if this queue entry is not for a
405    * message from the application.
406    */
407   GNUNET_TRANSPORT_TransmitContinuation cont;
408
409   /**
410    * Closure for @e cont.
411    */
412   void *cont_cls;
413
414   /**
415    * Fragmentation context.
416    * frag_ctx == NULL if transport <= MTU
417    * frag_ctx != NULL if transport > MTU
418    */
419   struct UDP_FragmentationContext *frag_ctx;
420
421   /**
422    * Message timeout.
423    */
424   struct GNUNET_TIME_Absolute timeout;
425
426   /**
427    * Size of UDP message to send, including UDP-specific overhead.
428    */
429   size_t msg_size;
430
431   /**
432    * Payload size of original message.
433    */
434   size_t payload_size;
435
436 };
437
438
439 GNUNET_NETWORK_STRUCT_BEGIN
440
441 /**
442  * UDP ACK Message-Packet header.
443  */
444 struct UDP_ACK_Message
445 {
446   /**
447    * Message header.
448    */
449   struct GNUNET_MessageHeader header;
450
451   /**
452    * Desired delay for flow control, in us (in NBO).
453    */
454   uint32_t delay GNUNET_PACKED;
455
456   /**
457    * What is the identity of the sender
458    */
459   struct GNUNET_PeerIdentity sender;
460
461 };
462
463 GNUNET_NETWORK_STRUCT_END
464
465
466 /* ************************* Monitoring *********** */
467
468
469 /**
470  * If a session monitor is attached, notify it about the new
471  * session state.
472  *
473  * @param plugin our plugin
474  * @param session session that changed state
475  * @param state new state of the session
476  */
477 static void
478 notify_session_monitor (struct Plugin *plugin,
479                         struct Session *session,
480                         enum GNUNET_TRANSPORT_SessionState state)
481 {
482   struct GNUNET_TRANSPORT_SessionInfo info;
483
484   if (NULL == plugin->sic)
485     return;
486   if (GNUNET_YES == session->in_destroy)
487     return; /* already destroyed, just RC>0 left-over actions */
488   memset (&info,
489           0,
490           sizeof (info));
491   info.state = state;
492   info.is_inbound = GNUNET_SYSERR; /* hard to say */
493   info.num_msg_pending = session->msgs_in_queue;
494   info.num_bytes_pending = session->bytes_in_queue;
495   /* info.receive_delay remains zero as this is not supported by UDP
496      (cannot selectively not receive from 'some' peer while continuing
497      to receive from others) */
498   info.session_timeout = session->timeout;
499   info.address = session->address;
500   plugin->sic (plugin->sic_cls,
501                session,
502                &info);
503 }
504
505
506 /**
507  * Return information about the given session to the monitor callback.
508  *
509  * @param cls the `struct Plugin` with the monitor callback (`sic`)
510  * @param peer peer we send information about
511  * @param value our `struct Session` to send information about
512  * @return #GNUNET_OK (continue to iterate)
513  */
514 static int
515 send_session_info_iter (void *cls,
516                         const struct GNUNET_PeerIdentity *peer,
517                         void *value)
518 {
519   struct Plugin *plugin = cls;
520   struct Session *session = value;
521
522   notify_session_monitor (plugin,
523                           session,
524                           GNUNET_TRANSPORT_SS_INIT);
525   notify_session_monitor (plugin,
526                           session,
527                           GNUNET_TRANSPORT_SS_UP);
528   return GNUNET_OK;
529 }
530
531
532 /**
533  * Begin monitoring sessions of a plugin.  There can only
534  * be one active monitor per plugin (i.e. if there are
535  * multiple monitors, the transport service needs to
536  * multiplex the generated events over all of them).
537  *
538  * @param cls closure of the plugin
539  * @param sic callback to invoke, NULL to disable monitor;
540  *            plugin will being by iterating over all active
541  *            sessions immediately and then enter monitor mode
542  * @param sic_cls closure for @a sic
543  */
544 static void
545 udp_plugin_setup_monitor (void *cls,
546                           GNUNET_TRANSPORT_SessionInfoCallback sic,
547                           void *sic_cls)
548 {
549   struct Plugin *plugin = cls;
550
551   plugin->sic = sic;
552   plugin->sic_cls = sic_cls;
553   if (NULL != sic)
554   {
555     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
556                                            &send_session_info_iter,
557                                            plugin);
558     /* signal end of first iteration */
559     sic (sic_cls,
560          NULL,
561          NULL);
562   }
563 }
564
565
566 /* ****************** Little Helpers ****************** */
567
568
569 /**
570  * Function to free last resources associated with a session.
571  *
572  * @param s session to free
573  */
574 static void
575 free_session (struct Session *s)
576 {
577   if (NULL != s->address)
578   {
579     GNUNET_HELLO_address_free (s->address);
580     s->address = NULL;
581   }
582   if (NULL != s->frag_ctx)
583   {
584     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag,
585                                      NULL,
586                                      NULL);
587     GNUNET_free (s->frag_ctx);
588     s->frag_ctx = NULL;
589   }
590   GNUNET_free (s);
591 }
592
593
594 /**
595  * Function that is called to get the keepalive factor.
596  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
597  * calculate the interval between keepalive packets.
598  *
599  * @param cls closure with the `struct Plugin`
600  * @return keepalive factor
601  */
602 static unsigned int
603 udp_query_keepalive_factor (void *cls)
604 {
605   return 15;
606 }
607
608
609 /**
610  * Function obtain the network type for a session
611  *
612  * @param cls closure (`struct Plugin *`)
613  * @param session the session
614  * @return the network type
615  */
616 static enum GNUNET_ATS_Network_Type
617 udp_get_network (void *cls,
618                  struct Session *session)
619 {
620   return session->scope;
621 }
622
623
624 /* ******************* Event loop ******************** */
625
626 /**
627  * We have been notified that our readset has something to read.  We don't
628  * know which socket needs to be read, so we have to check each one
629  * Then reschedule this function to be called again once more is available.
630  *
631  * @param cls the plugin handle
632  * @param tc the scheduling context (for rescheduling this function again)
633  */
634 static void
635 udp_plugin_select_v4 (void *cls,
636                       const struct GNUNET_SCHEDULER_TaskContext *tc);
637
638
639 /**
640  * We have been notified that our readset has something to read.  We don't
641  * know which socket needs to be read, so we have to check each one
642  * Then reschedule this function to be called again once more is available.
643  *
644  * @param cls the plugin handle
645  * @param tc the scheduling context (for rescheduling this function again)
646  */
647 static void
648 udp_plugin_select_v6 (void *cls,
649                       const struct GNUNET_SCHEDULER_TaskContext *tc);
650
651
652 /**
653  * (re)schedule IPv4-select tasks for this plugin.
654  *
655  * @param plugin plugin to reschedule
656  */
657 static void
658 schedule_select_v4 (struct Plugin *plugin)
659 {
660   struct GNUNET_TIME_Relative min_delay;
661   struct UDP_MessageWrapper *udpw;
662
663   if ( (GNUNET_YES == plugin->enable_ipv4) &&
664        (NULL != plugin->sockv4) )
665   {
666     /* Find a message ready to send:
667      * Flow delay from other peer is expired or not set (0) */
668     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
669     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
670       min_delay = GNUNET_TIME_relative_min (min_delay,
671                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
672     if (NULL != plugin->select_task_v4)
673       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
674     plugin->select_task_v4
675       = GNUNET_SCHEDULER_add_read_net (min_delay,
676                                        plugin->sockv4,
677                                        &udp_plugin_select_v4,
678                                        plugin);
679   }
680 }
681
682
683 /**
684  * (re)schedule IPv6-select tasks for this plugin.
685  *
686  * @param plugin plugin to reschedule
687  */
688 static void
689 schedule_select_v6 (struct Plugin *plugin)
690 {
691   struct GNUNET_TIME_Relative min_delay;
692   struct UDP_MessageWrapper *udpw;
693
694   if ( (GNUNET_YES == plugin->enable_ipv6) &&
695        (NULL != plugin->sockv6) )
696   {
697     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
698     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
699       min_delay = GNUNET_TIME_relative_min (min_delay,
700                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
701     if (NULL != plugin->select_task_v6)
702       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
703     plugin->select_task_v6
704       = GNUNET_SCHEDULER_add_read_net (min_delay,
705                                        plugin->sockv6,
706                                        &udp_plugin_select_v6,
707                                        plugin);
708   }
709 }
710
711
712 /* ******************* Address to string and back ***************** */
713
714
715 /**
716  * Function called for a quick conversion of the binary address to
717  * a numeric address.  Note that the caller must not free the
718  * address and that the next call to this function is allowed
719  * to override the address again.
720  *
721  * @param cls closure
722  * @param addr binary address (a `union UdpAddress`)
723  * @param addrlen length of the @a addr
724  * @return string representing the same address
725  */
726 const char *
727 udp_address_to_string (void *cls,
728                        const void *addr,
729                        size_t addrlen)
730 {
731   static char rbuf[INET6_ADDRSTRLEN + 10];
732   char buf[INET6_ADDRSTRLEN];
733   const void *sb;
734   struct in_addr a4;
735   struct in6_addr a6;
736   const struct IPv4UdpAddress *t4;
737   const struct IPv6UdpAddress *t6;
738   int af;
739   uint16_t port;
740   uint32_t options;
741
742   if (NULL == addr)
743   {
744     GNUNET_break_op (0);
745     return NULL;
746   }
747
748   if (addrlen == sizeof(struct IPv6UdpAddress))
749   {
750     t6 = addr;
751     af = AF_INET6;
752     options = ntohl (t6->options);
753     port = ntohs (t6->u6_port);
754     a6 = t6->ipv6_addr;
755     sb = &a6;
756   }
757   else if (addrlen == sizeof(struct IPv4UdpAddress))
758   {
759     t4 = addr;
760     af = AF_INET;
761     options = ntohl (t4->options);
762     port = ntohs (t4->u4_port);
763     a4.s_addr = t4->ipv4_addr;
764     sb = &a4;
765   }
766   else
767   {
768     GNUNET_break_op (0);
769     return NULL;
770   }
771   inet_ntop (af,
772              sb,
773              buf,
774              INET6_ADDRSTRLEN);
775   GNUNET_snprintf (rbuf,
776                    sizeof(rbuf),
777                    (af == AF_INET6)
778                    ? "%s.%u.[%s]:%u"
779                    : "%s.%u.%s:%u",
780                    PLUGIN_NAME,
781                    options,
782                    buf,
783                    port);
784   return rbuf;
785 }
786
787
788 /**
789  * Function called to convert a string address to a binary address.
790  *
791  * @param cls closure (`struct Plugin *`)
792  * @param addr string address
793  * @param addrlen length of the address
794  * @param buf location to store the buffer
795  * @param added location to store the number of bytes in the buffer.
796  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
797  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
798  */
799 static int
800 udp_string_to_address (void *cls,
801                        const char *addr,
802                        uint16_t addrlen,
803                        void **buf,
804                        size_t *added)
805 {
806   struct sockaddr_storage socket_address;
807   char *address;
808   char *plugin;
809   char *optionstr;
810   uint32_t options;
811
812   /* Format tcp.options.address:port */
813   address = NULL;
814   plugin = NULL;
815   optionstr = NULL;
816
817   if ((NULL == addr) || (0 == addrlen))
818   {
819     GNUNET_break (0);
820     return GNUNET_SYSERR;
821   }
822   if ('\0' != addr[addrlen - 1])
823   {
824     GNUNET_break (0);
825     return GNUNET_SYSERR;
826   }
827   if (strlen (addr) != addrlen - 1)
828   {
829     GNUNET_break (0);
830     return GNUNET_SYSERR;
831   }
832   plugin = GNUNET_strdup (addr);
833   optionstr = strchr (plugin, '.');
834   if (NULL == optionstr)
835   {
836     GNUNET_break (0);
837     GNUNET_free (plugin);
838     return GNUNET_SYSERR;
839   }
840   optionstr[0] = '\0';
841   optionstr++;
842   options = atol (optionstr);
843   address = strchr (optionstr, '.');
844   if (NULL == address)
845   {
846     GNUNET_break (0);
847     GNUNET_free (plugin);
848     return GNUNET_SYSERR;
849   }
850   address[0] = '\0';
851   address++;
852
853   if (GNUNET_OK !=
854       GNUNET_STRINGS_to_address_ip (address,
855                                     strlen (address),
856                                     &socket_address))
857   {
858     GNUNET_break (0);
859     GNUNET_free (plugin);
860     return GNUNET_SYSERR;
861   }
862   GNUNET_free(plugin);
863
864   switch (socket_address.ss_family)
865   {
866   case AF_INET:
867     {
868       struct IPv4UdpAddress *u4;
869       const struct sockaddr_in *in4 = (const struct sockaddr_in *) &socket_address;
870
871       u4 = GNUNET_new (struct IPv4UdpAddress);
872       u4->options = htonl (options);
873       u4->ipv4_addr = in4->sin_addr.s_addr;
874       u4->u4_port = in4->sin_port;
875       *buf = u4;
876       *added = sizeof (struct IPv4UdpAddress);
877       return GNUNET_OK;
878     }
879   case AF_INET6:
880     {
881       struct IPv6UdpAddress *u6;
882       const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &socket_address;
883
884       u6 = GNUNET_new (struct IPv6UdpAddress);
885       u6->options = htonl (options);
886       u6->ipv6_addr = in6->sin6_addr;
887       u6->u6_port = in6->sin6_port;
888       *buf = u6;
889       *added = sizeof (struct IPv6UdpAddress);
890       return GNUNET_OK;
891     }
892   default:
893     GNUNET_break (0);
894     return GNUNET_SYSERR;
895   }
896 }
897
898
899 /**
900  * Append our port and forward the result.
901  *
902  * @param cls a `struct PrettyPrinterContext *`
903  * @param hostname result from DNS resolver
904  */
905 static void
906 append_port (void *cls,
907              const char *hostname)
908 {
909   struct PrettyPrinterContext *ppc = cls;
910   struct Plugin *plugin = ppc->plugin;
911   char *ret;
912
913   if (NULL == hostname)
914   {
915     /* Final call, done */
916     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
917                                  plugin->ppc_dll_tail,
918                                  ppc);
919     ppc->resolver_handle = NULL;
920     ppc->asc (ppc->asc_cls,
921               NULL,
922               GNUNET_OK);
923     GNUNET_free (ppc);
924     return;
925   }
926   if (GNUNET_YES == ppc->ipv6)
927     GNUNET_asprintf (&ret,
928                      "%s.%u.[%s]:%d",
929                      PLUGIN_NAME,
930                      ppc->options,
931                      hostname,
932                      ppc->port);
933   else
934     GNUNET_asprintf (&ret,
935                      "%s.%u.%s:%d",
936                      PLUGIN_NAME,
937                      ppc->options,
938                      hostname,
939                      ppc->port);
940   ppc->asc (ppc->asc_cls,
941             ret,
942             GNUNET_OK);
943   GNUNET_free (ret);
944 }
945
946
947 /**
948  * Convert the transports address to a nice, human-readable format.
949  *
950  * @param cls closure with the `struct Plugin *`
951  * @param type name of the transport that generated the address
952  * @param addr one of the addresses of the host, NULL for the last address
953  *        the specific address format depends on the transport;
954  *        a `union UdpAddress`
955  * @param addrlen length of the address
956  * @param numeric should (IP) addresses be displayed in numeric form?
957  * @param timeout after how long should we give up?
958  * @param asc function to call on each string
959  * @param asc_cls closure for @a asc
960  */
961 static void
962 udp_plugin_address_pretty_printer (void *cls,
963                                    const char *type,
964                                    const void *addr,
965                                    size_t addrlen,
966                                    int numeric,
967                                    struct GNUNET_TIME_Relative timeout,
968                                    GNUNET_TRANSPORT_AddressStringCallback asc,
969                                    void *asc_cls)
970 {
971   struct Plugin *plugin = cls;
972   struct PrettyPrinterContext *ppc;
973   const struct sockaddr *sb;
974   size_t sbs;
975   struct sockaddr_in a4;
976   struct sockaddr_in6 a6;
977   const struct IPv4UdpAddress *u4;
978   const struct IPv6UdpAddress *u6;
979   uint16_t port;
980   uint32_t options;
981
982   if (addrlen == sizeof(struct IPv6UdpAddress))
983   {
984     u6 = addr;
985     memset (&a6,
986             0,
987             sizeof (a6));
988     a6.sin6_family = AF_INET6;
989 #if HAVE_SOCKADDR_IN_SIN_LEN
990     a6.sin6_len = sizeof (a6);
991 #endif
992     a6.sin6_port = u6->u6_port;
993     a6.sin6_addr = u6->ipv6_addr;
994     port = ntohs (u6->u6_port);
995     options = ntohl (u6->options);
996     sb = (const struct sockaddr *) &a6;
997     sbs = sizeof (a6);
998   }
999   else if (addrlen == sizeof (struct IPv4UdpAddress))
1000   {
1001     u4 = addr;
1002     memset (&a4,
1003             0,
1004             sizeof(a4));
1005     a4.sin_family = AF_INET;
1006 #if HAVE_SOCKADDR_IN_SIN_LEN
1007     a4.sin_len = sizeof (a4);
1008 #endif
1009     a4.sin_port = u4->u4_port;
1010     a4.sin_addr.s_addr = u4->ipv4_addr;
1011     port = ntohs (u4->u4_port);
1012     options = ntohl (u4->options);
1013     sb = (const struct sockaddr *) &a4;
1014     sbs = sizeof(a4);
1015   }
1016   else
1017   {
1018     /* invalid address */
1019     GNUNET_break_op (0);
1020     asc (asc_cls,
1021          NULL,
1022          GNUNET_SYSERR);
1023     asc (asc_cls,
1024          NULL,
1025          GNUNET_OK);
1026     return;
1027   }
1028   ppc = GNUNET_new (struct PrettyPrinterContext);
1029   ppc->plugin = plugin;
1030   ppc->asc = asc;
1031   ppc->asc_cls = asc_cls;
1032   ppc->port = port;
1033   ppc->options = options;
1034   if (addrlen == sizeof (struct IPv6UdpAddress))
1035     ppc->ipv6 = GNUNET_YES;
1036   else
1037     ppc->ipv6 = GNUNET_NO;
1038   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
1039                                plugin->ppc_dll_tail,
1040                                ppc);
1041   ppc->resolver_handle
1042     = GNUNET_RESOLVER_hostname_get (sb,
1043                                     sbs,
1044                                     ! numeric,
1045                                     timeout,
1046                                     &append_port,
1047                                     ppc);
1048 }
1049
1050
1051 /**
1052  * Check if the given port is plausible (must be either our listen
1053  * port or our advertised port).  If it is neither, we return
1054  * #GNUNET_SYSERR.
1055  *
1056  * @param plugin global variables
1057  * @param in_port port number to check
1058  * @return #GNUNET_OK if port is either our open or advertised port
1059  */
1060 static int
1061 check_port (const struct Plugin *plugin,
1062             uint16_t in_port)
1063 {
1064   if ( (plugin->port == in_port) ||
1065        (plugin->aport == in_port) )
1066     return GNUNET_OK;
1067   return GNUNET_SYSERR;
1068 }
1069
1070
1071 /**
1072  * Function that will be called to check if a binary address for this
1073  * plugin is well-formed and corresponds to an address for THIS peer
1074  * (as per our configuration).  Naturally, if absolutely necessary,
1075  * plugins can be a bit conservative in their answer, but in general
1076  * plugins should make sure that the address does not redirect
1077  * traffic to a 3rd party that might try to man-in-the-middle our
1078  * traffic.
1079  *
1080  * @param cls closure, should be our handle to the Plugin
1081  * @param addr pointer to a `union UdpAddress`
1082  * @param addrlen length of @a addr
1083  * @return #GNUNET_OK if this is a plausible address for this peer
1084  *         and transport, #GNUNET_SYSERR if not
1085  */
1086 static int
1087 udp_plugin_check_address (void *cls,
1088                           const void *addr,
1089                           size_t addrlen)
1090 {
1091   struct Plugin *plugin = cls;
1092   const struct IPv4UdpAddress *v4;
1093   const struct IPv6UdpAddress *v6;
1094
1095   if (sizeof(struct IPv4UdpAddress) == addrlen)
1096   {
1097     v4 = (const struct IPv4UdpAddress *) addr;
1098     if (GNUNET_OK != check_port (plugin,
1099                                  ntohs (v4->u4_port)))
1100       return GNUNET_SYSERR;
1101     if (GNUNET_OK !=
1102         GNUNET_NAT_test_address (plugin->nat,
1103                                  &v4->ipv4_addr,
1104                                  sizeof (struct in_addr)))
1105       return GNUNET_SYSERR;
1106   }
1107   else if (sizeof(struct IPv6UdpAddress) == addrlen)
1108   {
1109     v6 = (const struct IPv6UdpAddress *) addr;
1110     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1111     {
1112       GNUNET_break_op (0);
1113       return GNUNET_SYSERR;
1114     }
1115     if (GNUNET_OK != check_port (plugin,
1116                                  ntohs (v6->u6_port)))
1117       return GNUNET_SYSERR;
1118     if (GNUNET_OK !=
1119         GNUNET_NAT_test_address (plugin->nat,
1120                                  &v6->ipv6_addr,
1121                                  sizeof (struct in6_addr)))
1122       return GNUNET_SYSERR;
1123   }
1124   else
1125   {
1126     GNUNET_break_op (0);
1127     return GNUNET_SYSERR;
1128   }
1129   return GNUNET_OK;
1130 }
1131
1132
1133 /**
1134  * Our external IP address/port mapping has changed.
1135  *
1136  * @param cls closure, the `struct Plugin`
1137  * @param add_remove #GNUNET_YES to mean the new public IP address,
1138  *                   #GNUNET_NO to mean the previous (now invalid) one
1139  * @param addr either the previous or the new public IP address
1140  * @param addrlen actual length of the @a addr
1141  */
1142 static void
1143 udp_nat_port_map_callback (void *cls,
1144                            int add_remove,
1145                            const struct sockaddr *addr,
1146                            socklen_t addrlen)
1147 {
1148   struct Plugin *plugin = cls;
1149   struct GNUNET_HELLO_Address *address;
1150   struct IPv4UdpAddress u4;
1151   struct IPv6UdpAddress u6;
1152   void *arg;
1153   size_t args;
1154
1155   LOG (GNUNET_ERROR_TYPE_DEBUG,
1156        (GNUNET_YES == add_remove)
1157        ? "NAT notification to add address `%s'\n"
1158        : "NAT notification to remove address `%s'\n",
1159        GNUNET_a2s (addr,
1160                    addrlen));
1161   /* convert 'address' to our internal format */
1162   switch (addr->sa_family)
1163   {
1164   case AF_INET:
1165     {
1166       const struct sockaddr_in *i4;
1167
1168       GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
1169       i4 = (const struct sockaddr_in *) addr;
1170       if (0 == ntohs (i4->sin_port))
1171       {
1172         GNUNET_break (0);
1173         return;
1174       }
1175       memset (&u4,
1176               0,
1177               sizeof(u4));
1178       u4.options = htonl (plugin->myoptions);
1179       u4.ipv4_addr = i4->sin_addr.s_addr;
1180       u4.u4_port = i4->sin_port;
1181       arg = &u4;
1182       args = sizeof (struct IPv4UdpAddress);
1183       break;
1184     }
1185   case AF_INET6:
1186     {
1187       const struct sockaddr_in6 *i6;
1188
1189       GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
1190       i6 = (const struct sockaddr_in6 *) addr;
1191       if (0 == ntohs (i6->sin6_port))
1192       {
1193         GNUNET_break (0);
1194         return;
1195       }
1196       memset (&u6,
1197               0,
1198               sizeof(u6));
1199       u6.options = htonl (plugin->myoptions);
1200       u6.ipv6_addr = i6->sin6_addr;
1201       u6.u6_port = i6->sin6_port;
1202       arg = &u6;
1203       args = sizeof (struct IPv6UdpAddress);
1204       break;
1205     }
1206   default:
1207     GNUNET_break (0);
1208     return;
1209   }
1210   /* modify our published address list */
1211   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1212                                            PLUGIN_NAME,
1213                                            arg,
1214                                            args,
1215                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
1216   plugin->env->notify_address (plugin->env->cls,
1217                                add_remove,
1218                                address);
1219   GNUNET_HELLO_address_free (address);
1220 }
1221
1222
1223 /* ********************* Finding sessions ******************* */
1224
1225
1226 /**
1227  * Closure for #session_cmp_it().
1228  */
1229 struct SessionCompareContext
1230 {
1231   /**
1232    * Set to session matching the address.
1233    */
1234   struct Session *res;
1235
1236   /**
1237    * Address we are looking for.
1238    */
1239   const struct GNUNET_HELLO_Address *address;
1240 };
1241
1242
1243 /**
1244  * Find a session with a matching address.
1245  *
1246  * @param cls the `struct SessionCompareContext *`
1247  * @param key peer identity (unused)
1248  * @param value the `struct Session *`
1249  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1250  */
1251 static int
1252 session_cmp_it (void *cls,
1253                 const struct GNUNET_PeerIdentity *key,
1254                 void *value)
1255 {
1256   struct SessionCompareContext *cctx = cls;
1257   struct Session *s = value;
1258
1259   if (0 == GNUNET_HELLO_address_cmp (s->address,
1260                                      cctx->address))
1261   {
1262     GNUNET_assert (GNUNET_NO == s->in_destroy);
1263     cctx->res = s;
1264     return GNUNET_NO;
1265   }
1266   return GNUNET_OK;
1267 }
1268
1269
1270 /**
1271  * Locate an existing session the transport service is using to
1272  * send data to another peer.  Performs some basic sanity checks
1273  * on the address and then tries to locate a matching session.
1274  *
1275  * @param cls the plugin
1276  * @param address the address we should locate the session by
1277  * @return the session if it exists, or NULL if it is not found
1278  */
1279 static struct Session *
1280 udp_plugin_lookup_session (void *cls,
1281                            const struct GNUNET_HELLO_Address *address)
1282 {
1283   struct Plugin *plugin = cls;
1284   const struct IPv6UdpAddress *udp_a6;
1285   const struct IPv4UdpAddress *udp_a4;
1286   struct SessionCompareContext cctx;
1287
1288   if (NULL == address->address)
1289   {
1290     GNUNET_break (0);
1291     return NULL;
1292   }
1293   if (sizeof(struct IPv4UdpAddress) == address->address_length)
1294   {
1295     if (NULL == plugin->sockv4)
1296       return NULL;
1297     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1298     if (0 == udp_a4->u4_port)
1299     {
1300       GNUNET_break (0);
1301       return NULL;
1302     }
1303   }
1304   else if (sizeof(struct IPv6UdpAddress) == address->address_length)
1305   {
1306     if (NULL == plugin->sockv6)
1307       return NULL;
1308     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1309     if (0 == udp_a6->u6_port)
1310     {
1311       GNUNET_break (0);
1312       return NULL;
1313     }
1314   }
1315   else
1316   {
1317     GNUNET_break (0);
1318     return NULL;
1319   }
1320
1321   /* check if session already exists */
1322   cctx.address = address;
1323   cctx.res = NULL;
1324   LOG (GNUNET_ERROR_TYPE_DEBUG,
1325        "Looking for existing session for peer `%s' with address `%s'\n",
1326        GNUNET_i2s (&address->peer),
1327        udp_address_to_string (plugin,
1328                               address->address,
1329                               address->address_length));
1330   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1331                                               &address->peer,
1332                                               &session_cmp_it,
1333                                               &cctx);
1334   if (NULL == cctx.res)
1335     return NULL;
1336   LOG (GNUNET_ERROR_TYPE_DEBUG,
1337        "Found existing session %p\n",
1338        cctx.res);
1339   return cctx.res;
1340 }
1341
1342
1343 /* ********************** Timeout ****************** */
1344
1345
1346 /**
1347  * Increment session timeout due to activity.
1348  *
1349  * @param s session to reschedule timeout activity for
1350  */
1351 static void
1352 reschedule_session_timeout (struct Session *s)
1353 {
1354   if (GNUNET_YES == s->in_destroy)
1355     return;
1356   GNUNET_assert (NULL != s->timeout_task);
1357   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1358 }
1359
1360
1361
1362 /**
1363  * Function that will be called whenever the transport service wants to
1364  * notify the plugin that a session is still active and in use and
1365  * therefore the session timeout for this session has to be updated
1366  *
1367  * @param cls closure with the `struct Plugin`
1368  * @param peer which peer was the session for
1369  * @param session which session is being updated
1370  */
1371 static void
1372 udp_plugin_update_session_timeout (void *cls,
1373                                    const struct GNUNET_PeerIdentity *peer,
1374                                    struct Session *session)
1375 {
1376   struct Plugin *plugin = cls;
1377
1378   if (GNUNET_YES !=
1379       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1380                                                     peer,
1381                                                     session))
1382   {
1383     GNUNET_break (0);
1384     return;
1385   }
1386   /* Reschedule session timeout */
1387   reschedule_session_timeout (session);
1388 }
1389
1390
1391 /* ************************* Sending ************************ */
1392
1393
1394 /**
1395  * Remove the given message from the transmission queue and
1396  * update all applicable statistics.
1397  *
1398  * @param plugin the UDP plugin
1399  * @param udpw message wrapper to dequeue
1400  */
1401 static void
1402 dequeue (struct Plugin *plugin,
1403          struct UDP_MessageWrapper *udpw)
1404 {
1405   struct Session *session = udpw->session;
1406
1407   if (plugin->bytes_in_buffer < udpw->msg_size)
1408   {
1409     GNUNET_break (0);
1410   }
1411   else
1412   {
1413     GNUNET_STATISTICS_update (plugin->env->stats,
1414                               "# UDP, total bytes in send buffers",
1415                               - (long long) udpw->msg_size,
1416                               GNUNET_NO);
1417     plugin->bytes_in_buffer -= udpw->msg_size;
1418   }
1419   GNUNET_STATISTICS_update (plugin->env->stats,
1420                             "# UDP, total messages in send buffers",
1421                             -1,
1422                             GNUNET_NO);
1423   if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1424   {
1425     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1426                                  plugin->ipv4_queue_tail,
1427                                  udpw);
1428   }
1429   else if (sizeof(struct IPv6UdpAddress) == udpw->session->address->address_length)
1430   {
1431     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1432                                  plugin->ipv6_queue_tail,
1433                                  udpw);
1434   }
1435   else
1436   {
1437     GNUNET_break (0);
1438     return;
1439   }
1440   GNUNET_assert (session->msgs_in_queue > 0);
1441   session->msgs_in_queue--;
1442   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1443   session->bytes_in_queue -= udpw->msg_size;
1444 }
1445
1446
1447 /**
1448  * Enqueue a message for transmission and update statistics.
1449  *
1450  * @param plugin the UDP plugin
1451  * @param udpw message wrapper to queue
1452  */
1453 static void
1454 enqueue (struct Plugin *plugin,
1455          struct UDP_MessageWrapper *udpw)
1456 {
1457   struct Session *session = udpw->session;
1458
1459   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1460   {
1461     GNUNET_break (0);
1462   }
1463   else
1464   {
1465     GNUNET_STATISTICS_update (plugin->env->stats,
1466                               "# UDP, total bytes in send buffers",
1467                               udpw->msg_size,
1468                               GNUNET_NO);
1469     plugin->bytes_in_buffer += udpw->msg_size;
1470   }
1471   GNUNET_STATISTICS_update (plugin->env->stats,
1472                             "# UDP, total messages in send buffers",
1473                             1,
1474                             GNUNET_NO);
1475   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
1476   {
1477     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1478                                 plugin->ipv4_queue_tail,
1479                                 udpw);
1480   }
1481   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
1482   {
1483     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1484                                  plugin->ipv6_queue_tail,
1485                                  udpw);
1486   }
1487   else
1488   {
1489     GNUNET_break (0);
1490     udpw->cont (udpw->cont_cls,
1491                 &session->target,
1492                 GNUNET_SYSERR,
1493                 udpw->msg_size,
1494                 0);
1495     GNUNET_free (udpw);
1496     return;
1497   }
1498   session->msgs_in_queue++;
1499   session->bytes_in_queue += udpw->msg_size;
1500 }
1501
1502
1503 /**
1504  * We have completed our (attempt) to transmit a message that had to
1505  * be fragmented -- either because we got an ACK saying that all
1506  * fragments were received, or because of timeout / disconnect.  Clean
1507  * up our state.
1508  *
1509  * @param frag_ctx fragmentation context to clean up
1510  * @param result #GNUNET_OK if we succeeded (got ACK),
1511  *               #GNUNET_SYSERR if the transmission failed
1512  */
1513 static void
1514 fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
1515                          int result)
1516 {
1517   struct Plugin *plugin = frag_ctx->plugin;
1518   struct Session *s = frag_ctx->session;
1519   struct UDP_MessageWrapper *udpw;
1520   struct UDP_MessageWrapper *tmp;
1521   size_t overhead;
1522
1523   LOG (GNUNET_ERROR_TYPE_DEBUG,
1524        "%p: Fragmented message removed with result %s\n",
1525        frag_ctx,
1526        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1527   /* Call continuation for fragmented message */
1528   if (frag_ctx->on_wire_size >= frag_ctx->payload_size)
1529     overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1530   else
1531     overhead = frag_ctx->on_wire_size;
1532   if (NULL != frag_ctx->cont)
1533     frag_ctx->cont (frag_ctx->cont_cls,
1534                     &s->target,
1535                     result,
1536                     s->frag_ctx->payload_size,
1537                     frag_ctx->on_wire_size);
1538   GNUNET_STATISTICS_update (plugin->env->stats,
1539                             "# UDP, fragmented messages active",
1540                             -1,
1541                             GNUNET_NO);
1542
1543   if (GNUNET_OK == result)
1544   {
1545     GNUNET_STATISTICS_update (plugin->env->stats,
1546                               "# UDP, fragmented msgs, messages, sent, success",
1547                               1,
1548                               GNUNET_NO);
1549     GNUNET_STATISTICS_update (plugin->env->stats,
1550                               "# UDP, fragmented msgs, bytes payload, sent, success",
1551                               s->frag_ctx->payload_size,
1552                               GNUNET_NO);
1553     GNUNET_STATISTICS_update (plugin->env->stats,
1554                               "# UDP, fragmented msgs, bytes overhead, sent, success",
1555                               overhead,
1556                               GNUNET_NO);
1557     GNUNET_STATISTICS_update (plugin->env->stats,
1558                               "# UDP, total, bytes overhead, sent",
1559                               overhead,
1560                               GNUNET_NO);
1561     GNUNET_STATISTICS_update (plugin->env->stats,
1562                               "# UDP, total, bytes payload, sent",
1563                               s->frag_ctx->payload_size,
1564                               GNUNET_NO);
1565   }
1566   else
1567   {
1568     GNUNET_STATISTICS_update (plugin->env->stats,
1569                               "# UDP, fragmented msgs, messages, sent, failure",
1570                               1,
1571                               GNUNET_NO);
1572     GNUNET_STATISTICS_update (plugin->env->stats,
1573                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1574                               s->frag_ctx->payload_size,
1575                               GNUNET_NO);
1576     GNUNET_STATISTICS_update (plugin->env->stats,
1577                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1578                               overhead,
1579                               GNUNET_NO);
1580     GNUNET_STATISTICS_update (plugin->env->stats,
1581                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1582                               overhead,
1583                               GNUNET_NO);
1584   }
1585
1586   /* Remove remaining fragments from queue, no need to transmit those
1587      any longer. */
1588   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1589   {
1590     udpw = plugin->ipv6_queue_head;
1591     while (NULL != udpw)
1592     {
1593       tmp = udpw->next;
1594       if ( (udpw->frag_ctx != NULL) &&
1595            (udpw->frag_ctx == frag_ctx) )
1596       {
1597         dequeue (plugin,
1598                  udpw);
1599         GNUNET_free (udpw);
1600       }
1601       udpw = tmp;
1602     }
1603   }
1604   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1605   {
1606     udpw = plugin->ipv4_queue_head;
1607     while (NULL != udpw)
1608     {
1609       tmp = udpw->next;
1610       if ( (NULL != udpw->frag_ctx) &&
1611            (udpw->frag_ctx == frag_ctx) )
1612       {
1613         dequeue (plugin,
1614                  udpw);
1615         GNUNET_free (udpw);
1616       }
1617       udpw = tmp;
1618     }
1619   }
1620   notify_session_monitor (s->plugin,
1621                           s,
1622                           GNUNET_TRANSPORT_SS_UPDATE);
1623   GNUNET_FRAGMENT_context_destroy (frag_ctx->frag,
1624                                    &s->last_expected_msg_delay,
1625                                    &s->last_expected_ack_delay);
1626   s->frag_ctx = NULL;
1627   GNUNET_free (frag_ctx);
1628 }
1629
1630
1631 /**
1632  * We are finished with a fragment in the message queue.
1633  * Notify the continuation and update statistics.
1634  *
1635  * @param cls the `struct Plugin *`
1636  * @param udpw the queue entry
1637  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1638  */
1639 static void
1640 qc_fragment_sent (void *cls,
1641                   struct UDP_MessageWrapper *udpw,
1642                   int result)
1643 {
1644   struct Plugin *plugin = cls;
1645
1646   GNUNET_assert (NULL != udpw->frag_ctx);
1647   if (GNUNET_OK == result)
1648   {
1649     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1650     GNUNET_STATISTICS_update (plugin->env->stats,
1651                               "# UDP, fragmented msgs, fragments, sent, success",
1652                               1,
1653                               GNUNET_NO);
1654     GNUNET_STATISTICS_update (plugin->env->stats,
1655                               "# UDP, fragmented msgs, fragments bytes, sent, success",
1656                               udpw->msg_size,
1657                               GNUNET_NO);
1658   }
1659   else
1660   {
1661     fragmented_message_done (udpw->frag_ctx,
1662                              GNUNET_SYSERR);
1663     GNUNET_STATISTICS_update (plugin->env->stats,
1664                               "# UDP, fragmented msgs, fragments, sent, failure",
1665                               1,
1666                               GNUNET_NO);
1667     GNUNET_STATISTICS_update (plugin->env->stats,
1668                               "# UDP, fragmented msgs, fragments bytes, sent, failure",
1669                               udpw->msg_size,
1670                               GNUNET_NO);
1671   }
1672 }
1673
1674
1675 /**
1676  * Function that is called with messages created by the fragmentation
1677  * module.  In the case of the `proc` callback of the
1678  * #GNUNET_FRAGMENT_context_create() function, this function must
1679  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1680  *
1681  * @param cls closure, the `struct UDP_FragmentationContext`
1682  * @param msg the message that was created
1683  */
1684 static void
1685 enqueue_fragment (void *cls,
1686                   const struct GNUNET_MessageHeader *msg)
1687 {
1688   struct UDP_FragmentationContext *frag_ctx = cls;
1689   struct Plugin *plugin = frag_ctx->plugin;
1690   struct UDP_MessageWrapper *udpw;
1691   struct Session *session = frag_ctx->session;
1692   size_t msg_len = ntohs (msg->size);
1693
1694   LOG (GNUNET_ERROR_TYPE_DEBUG,
1695        "Enqueuing fragment with %u bytes\n",
1696        msg_len);
1697   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1698   udpw->session = session;
1699   udpw->msg_buf = (char *) &udpw[1];
1700   udpw->msg_size = msg_len;
1701   udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1702   udpw->timeout = frag_ctx->timeout;
1703   udpw->frag_ctx = frag_ctx;
1704   udpw->qc = &qc_fragment_sent;
1705   udpw->qc_cls = plugin;
1706   memcpy (udpw->msg_buf,
1707           msg,
1708           msg_len);
1709   enqueue (plugin,
1710            udpw);
1711   if (sizeof (struct IPv4UdpAddress) == session->address->address_length)
1712     schedule_select_v4 (plugin);
1713   else
1714     schedule_select_v6 (plugin);
1715 }
1716
1717
1718 /**
1719  * We are finished with a message from the message queue.
1720  * Notify the continuation and update statistics.
1721  *
1722  * @param cls the `struct Plugin *`
1723  * @param udpw the queue entry
1724  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1725  */
1726 static void
1727 qc_message_sent (void *cls,
1728                  struct UDP_MessageWrapper *udpw,
1729                  int result)
1730 {
1731   struct Plugin *plugin = cls;
1732   size_t overhead;
1733
1734   if (udpw->msg_size >= udpw->payload_size)
1735     overhead = udpw->msg_size - udpw->payload_size;
1736   else
1737     overhead = udpw->msg_size;
1738
1739   if (NULL != udpw->cont)
1740     udpw->cont (udpw->cont_cls,
1741                 &udpw->session->target,
1742                 result,
1743                 udpw->payload_size,
1744                 overhead);
1745   if (GNUNET_OK == result)
1746   {
1747     GNUNET_STATISTICS_update (plugin->env->stats,
1748                               "# UDP, unfragmented msgs, messages, sent, success",
1749                               1,
1750                               GNUNET_NO);
1751     GNUNET_STATISTICS_update (plugin->env->stats,
1752                               "# UDP, unfragmented msgs, bytes payload, sent, success",
1753                               udpw->payload_size,
1754                               GNUNET_NO);
1755     GNUNET_STATISTICS_update (plugin->env->stats,
1756                               "# UDP, unfragmented msgs, bytes overhead, sent, success",
1757                               overhead,
1758                               GNUNET_NO);
1759     GNUNET_STATISTICS_update (plugin->env->stats,
1760                               "# UDP, total, bytes overhead, sent",
1761                               overhead,
1762                               GNUNET_NO);
1763     GNUNET_STATISTICS_update (plugin->env->stats,
1764                               "# UDP, total, bytes payload, sent",
1765                               udpw->payload_size,
1766                               GNUNET_NO);
1767   }
1768   else
1769   {
1770     GNUNET_STATISTICS_update (plugin->env->stats,
1771                               "# UDP, unfragmented msgs, messages, sent, failure",
1772                               1,
1773                               GNUNET_NO);
1774     GNUNET_STATISTICS_update (plugin->env->stats,
1775                               "# UDP, unfragmented msgs, bytes payload, sent, failure",
1776                               udpw->payload_size,
1777                               GNUNET_NO);
1778     GNUNET_STATISTICS_update (plugin->env->stats,
1779                               "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1780                               overhead,
1781                               GNUNET_NO);
1782   }
1783 }
1784
1785
1786 /**
1787  * Function that can be used by the transport service to transmit a
1788  * message using the plugin.  Note that in the case of a peer
1789  * disconnecting, the continuation MUST be called prior to the
1790  * disconnect notification itself.  This function will be called with
1791  * this peer's HELLO message to initiate a fresh connection to another
1792  * peer.
1793  *
1794  * @param cls closure
1795  * @param s which session must be used
1796  * @param msgbuf the message to transmit
1797  * @param msgbuf_size number of bytes in @a msgbuf
1798  * @param priority how important is the message (most plugins will
1799  *                 ignore message priority and just FIFO)
1800  * @param to how long to wait at most for the transmission (does not
1801  *                require plugins to discard the message after the timeout,
1802  *                just advisory for the desired delay; most plugins will ignore
1803  *                this as well)
1804  * @param cont continuation to call once the message has
1805  *        been transmitted (or if the transport is ready
1806  *        for the next transmission call; or if the
1807  *        peer disconnected...); can be NULL
1808  * @param cont_cls closure for @a cont
1809  * @return number of bytes used (on the physical network, with overheads);
1810  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1811  *         and does NOT mean that the message was not transmitted (DV)
1812  */
1813 static ssize_t
1814 udp_plugin_send (void *cls,
1815                  struct Session *s,
1816                  const char *msgbuf,
1817                  size_t msgbuf_size,
1818                  unsigned int priority,
1819                  struct GNUNET_TIME_Relative to,
1820                  GNUNET_TRANSPORT_TransmitContinuation cont,
1821                  void *cont_cls)
1822 {
1823   struct Plugin *plugin = cls;
1824   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
1825   struct UDP_FragmentationContext *frag_ctx;
1826   struct UDP_MessageWrapper *udpw;
1827   struct UDPMessage *udp;
1828   char mbuf[udpmlen] GNUNET_ALIGN;
1829
1830   if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
1831        (NULL == plugin->sockv6) )
1832     return GNUNET_SYSERR;
1833   if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
1834        (NULL == plugin->sockv4) )
1835     return GNUNET_SYSERR;
1836   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1837   {
1838     GNUNET_break (0);
1839     return GNUNET_SYSERR;
1840   }
1841   if (GNUNET_YES !=
1842       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1843                                                     &s->target,
1844                                                     s))
1845   {
1846     GNUNET_break (0);
1847     return GNUNET_SYSERR;
1848   }
1849   LOG (GNUNET_ERROR_TYPE_DEBUG,
1850        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1851        udpmlen,
1852        GNUNET_i2s (&s->target),
1853        udp_address_to_string (plugin,
1854                               s->address->address,
1855                               s->address->address_length));
1856
1857   udp = (struct UDPMessage *) mbuf;
1858   udp->header.size = htons (udpmlen);
1859   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1860   udp->reserved = htonl (0);
1861   udp->sender = *plugin->env->my_identity;
1862
1863   /* We do not update the session time out here!  Otherwise this
1864    * session will not timeout since we send keep alive before session
1865    * can timeout.
1866    *
1867    * For UDP we update session timeout only on receive, this will
1868    * cover keep alives, since remote peer will reply with keep alive
1869    * responses!
1870    */
1871   if (udpmlen <= UDP_MTU)
1872   {
1873     /* unfragmented message */
1874     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1875     udpw->session = s;
1876     udpw->msg_buf = (char *) &udpw[1];
1877     udpw->msg_size = udpmlen; /* message size with UDP overhead */
1878     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1879     udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
1880     udpw->cont = cont;
1881     udpw->cont_cls = cont_cls;
1882     udpw->frag_ctx = NULL;
1883     udpw->qc = &qc_message_sent;
1884     udpw->qc_cls = plugin;
1885     memcpy (udpw->msg_buf,
1886             udp,
1887             sizeof (struct UDPMessage));
1888     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
1889             msgbuf,
1890             msgbuf_size);
1891     enqueue (plugin,
1892              udpw);
1893     GNUNET_STATISTICS_update (plugin->env->stats,
1894                               "# UDP, unfragmented messages queued total",
1895                               1,
1896                               GNUNET_NO);
1897     GNUNET_STATISTICS_update (plugin->env->stats,
1898                               "# UDP, unfragmented bytes payload queued total",
1899                               udpw->payload_size,
1900                               GNUNET_NO);
1901   }
1902   else
1903   {
1904     /* fragmented message */
1905     if (NULL != s->frag_ctx)
1906       return GNUNET_SYSERR;
1907     memcpy (&udp[1],
1908             msgbuf,
1909             msgbuf_size);
1910     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
1911     frag_ctx->plugin = plugin;
1912     frag_ctx->session = s;
1913     frag_ctx->cont = cont;
1914     frag_ctx->cont_cls = cont_cls;
1915     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
1916     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1917     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1918     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1919                                                      UDP_MTU,
1920                                                      &plugin->tracker,
1921                                                      s->last_expected_msg_delay,
1922                                                      s->last_expected_ack_delay,
1923                                                      &udp->header,
1924                                                      &enqueue_fragment,
1925                                                      frag_ctx);
1926     s->frag_ctx = frag_ctx;
1927     GNUNET_STATISTICS_update (plugin->env->stats,
1928                               "# UDP, fragmented messages active",
1929                               1,
1930                               GNUNET_NO);
1931     GNUNET_STATISTICS_update (plugin->env->stats,
1932                               "# UDP, fragmented messages, total",
1933                               1,
1934                               GNUNET_NO);
1935     GNUNET_STATISTICS_update (plugin->env->stats,
1936                               "# UDP, fragmented bytes (payload)",
1937                               frag_ctx->payload_size,
1938                               GNUNET_NO);
1939   }
1940   notify_session_monitor (s->plugin,
1941                           s,
1942                           GNUNET_TRANSPORT_SS_UPDATE);
1943   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
1944     schedule_select_v4 (plugin);
1945   else
1946     schedule_select_v6 (plugin);
1947   return udpmlen;
1948 }
1949
1950
1951 /**
1952  * Handle an ACK message.
1953  *
1954  * @param plugin the UDP plugin
1955  * @param msg the (presumed) UDP ACK message
1956  * @param udp_addr sender address
1957  * @param udp_addr_len number of bytes in @a udp_addr
1958  */
1959 static void
1960 read_process_ack (struct Plugin *plugin,
1961                   const struct GNUNET_MessageHeader *msg,
1962                   const union UdpAddress *udp_addr,
1963                   socklen_t udp_addr_len)
1964 {
1965   const struct GNUNET_MessageHeader *ack;
1966   const struct UDP_ACK_Message *udp_ack;
1967   struct GNUNET_HELLO_Address *address;
1968   struct Session *s;
1969   struct GNUNET_TIME_Relative flow_delay;
1970
1971   if (ntohs (msg->size)
1972       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
1973   {
1974     GNUNET_break_op (0);
1975     return;
1976   }
1977   udp_ack = (const struct UDP_ACK_Message *) msg;
1978   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
1979                                            PLUGIN_NAME,
1980                                            udp_addr,
1981                                            udp_addr_len,
1982                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
1983   s = udp_plugin_lookup_session (plugin,
1984                                  address);
1985   if (NULL == s)
1986   {
1987     LOG (GNUNET_ERROR_TYPE_WARNING,
1988          "UDP session of address %s for ACK not found\n",
1989          udp_address_to_string (plugin,
1990                                 address->address,
1991                                 address->address_length));
1992     GNUNET_HELLO_address_free (address);
1993     return;
1994   }
1995   if (NULL == s->frag_ctx)
1996   {
1997     LOG (GNUNET_ERROR_TYPE_WARNING,
1998          "Fragmentation context of address %s for ACK not found\n",
1999          udp_address_to_string (plugin,
2000                                 address->address,
2001                                 address->address_length));
2002     GNUNET_HELLO_address_free (address);
2003     return;
2004   }
2005   GNUNET_HELLO_address_free (address);
2006
2007   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2008   LOG (GNUNET_ERROR_TYPE_DEBUG,
2009        "We received a sending delay of %s for %s\n",
2010        GNUNET_STRINGS_relative_time_to_string (flow_delay,
2011                                                GNUNET_YES),
2012        GNUNET_i2s (&udp_ack->sender));
2013   s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
2014
2015   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2016   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2017   {
2018     GNUNET_break_op(0);
2019     return;
2020   }
2021
2022   if (GNUNET_OK !=
2023       GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2024                                    ack))
2025   {
2026     LOG (GNUNET_ERROR_TYPE_DEBUG,
2027          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2028          (unsigned int) ntohs (msg->size),
2029          GNUNET_i2s (&udp_ack->sender),
2030          udp_address_to_string (plugin,
2031                                 udp_addr,
2032                                 udp_addr_len));
2033     /* Expect more ACKs to arrive */
2034     return;
2035   }
2036
2037   LOG (GNUNET_ERROR_TYPE_DEBUG,
2038        "Message from %s at %s full ACK'ed\n",
2039        GNUNET_i2s (&udp_ack->sender),
2040        udp_address_to_string (plugin,
2041                               udp_addr,
2042                               udp_addr_len));
2043
2044   /* Remove fragmented message after successful sending */
2045   fragmented_message_done (s->frag_ctx,
2046                            GNUNET_OK);
2047 }
2048
2049
2050 /* ********************** Receiving ********************** */
2051
2052
2053 /**
2054  * Closure for #find_receive_context().
2055  */
2056 struct FindReceiveContext
2057 {
2058   /**
2059    * Where to store the result.
2060    */
2061   struct DefragContext *rc;
2062
2063   /**
2064    * Session associated with this context.
2065    */
2066   struct Session *session;
2067
2068   /**
2069    * Address to find.
2070    */
2071   const union UdpAddress *udp_addr;
2072
2073   /**
2074    * Number of bytes in @e udp_addr.
2075    */
2076   size_t udp_addr_len;
2077
2078 };
2079
2080
2081 /**
2082  * Scan the heap for a receive context with the given address.
2083  *
2084  * @param cls the `struct FindReceiveContext`
2085  * @param node internal node of the heap
2086  * @param element value stored at the node (a `struct ReceiveContext`)
2087  * @param cost cost associated with the node
2088  * @return #GNUNET_YES if we should continue to iterate,
2089  *         #GNUNET_NO if not.
2090  */
2091 static int
2092 find_receive_context (void *cls,
2093                       struct GNUNET_CONTAINER_HeapNode *node,
2094                       void *element,
2095                       GNUNET_CONTAINER_HeapCostType cost)
2096 {
2097   struct FindReceiveContext *frc = cls;
2098   struct DefragContext *e = element;
2099
2100   if ( (frc->udp_addr_len == e->udp_addr_len) &&
2101        (0 == memcmp (frc->udp_addr,
2102                      e->udp_addr,
2103                      frc->udp_addr_len)) )
2104   {
2105     frc->rc = e;
2106     return GNUNET_NO;
2107   }
2108   return GNUNET_YES;
2109 }
2110
2111
2112 /**
2113  * Message tokenizer has broken up an incomming message. Pass it on
2114  * to the service.
2115  *
2116  * @param cls the `struct Plugin *`
2117  * @param client the `struct Session *`
2118  * @param hdr the actual message
2119  * @return #GNUNET_OK (always)
2120  */
2121 static int
2122 process_inbound_tokenized_messages (void *cls,
2123                                     void *client,
2124                                     const struct GNUNET_MessageHeader *hdr)
2125 {
2126   struct Plugin *plugin = cls;
2127   struct Session *session = client;
2128
2129   if (GNUNET_YES == session->in_destroy)
2130     return GNUNET_OK;
2131   reschedule_session_timeout (session);
2132   session->flow_delay_for_other_peer
2133     = plugin->env->receive (plugin->env->cls,
2134                             session->address,
2135                             session,
2136                             hdr);
2137   return GNUNET_OK;
2138 }
2139
2140
2141 /**
2142  * Functions with this signature are called whenever we need to close
2143  * a session due to a disconnect or failure to establish a connection.
2144  *
2145  * @param cls closure with the `struct Plugin`
2146  * @param s session to close down
2147  * @return #GNUNET_OK on success
2148  */
2149 static int
2150 udp_disconnect_session (void *cls,
2151                         struct Session *s)
2152 {
2153   struct Plugin *plugin = cls;
2154   struct UDP_MessageWrapper *udpw;
2155   struct UDP_MessageWrapper *next;
2156   struct FindReceiveContext frc;
2157
2158   GNUNET_assert (GNUNET_YES != s->in_destroy);
2159   LOG (GNUNET_ERROR_TYPE_DEBUG,
2160        "Session %p to peer `%s' at address %s ended\n",
2161        s,
2162        GNUNET_i2s (&s->target),
2163        udp_address_to_string (plugin,
2164                               s->address->address,
2165                               s->address->address_length));
2166   if (NULL != s->timeout_task)
2167   {
2168     GNUNET_SCHEDULER_cancel (s->timeout_task);
2169     s->timeout_task = NULL;
2170   }
2171   if (NULL != s->frag_ctx)
2172   {
2173     /* Remove fragmented message due to disconnect */
2174     fragmented_message_done (s->frag_ctx,
2175                              GNUNET_SYSERR);
2176   }
2177
2178   frc.rc = NULL;
2179   frc.udp_addr = s->address->address;
2180   frc.udp_addr_len = s->address->address_length;
2181   /* Lookup existing receive context for this address */
2182   if (NULL != plugin->defrag_ctxs)
2183   {
2184     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2185                                    &find_receive_context,
2186                                    &frc);
2187     if (NULL != frc.rc)
2188     {
2189       struct DefragContext *d_ctx = frc.rc;
2190
2191       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
2192       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2193       GNUNET_free (d_ctx);
2194     }
2195   }
2196   next = plugin->ipv4_queue_head;
2197   while (NULL != (udpw = next))
2198   {
2199     next = udpw->next;
2200     if (udpw->session == s)
2201     {
2202       dequeue (plugin,
2203                udpw);
2204       udpw->qc (udpw->qc_cls,
2205                 udpw,
2206                 GNUNET_SYSERR);
2207       GNUNET_free (udpw);
2208     }
2209   }
2210   next = plugin->ipv6_queue_head;
2211   while (NULL != (udpw = next))
2212   {
2213     next = udpw->next;
2214     if (udpw->session == s)
2215     {
2216       dequeue (plugin,
2217                udpw);
2218       udpw->qc (udpw->qc_cls,
2219                 udpw,
2220                 GNUNET_SYSERR);
2221       GNUNET_free (udpw);
2222     }
2223   }
2224   notify_session_monitor (s->plugin,
2225                           s,
2226                           GNUNET_TRANSPORT_SS_DONE);
2227   plugin->env->session_end (plugin->env->cls,
2228                             s->address,
2229                             s);
2230
2231   if ( (NULL != s->frag_ctx) &&
2232        (NULL != s->frag_ctx->cont) )
2233   {
2234     /* The 'frag_ctx' itself will be freed in #free_session() a bit
2235        later, as it might be in use right now */
2236     LOG (GNUNET_ERROR_TYPE_DEBUG,
2237          "Calling continuation for fragemented message to `%s' with result SYSERR\n",
2238          GNUNET_i2s (&s->target));
2239     s->frag_ctx->cont (s->frag_ctx->cont_cls,
2240                        &s->target,
2241                        GNUNET_SYSERR,
2242                        s->frag_ctx->payload_size,
2243                        s->frag_ctx->on_wire_size);
2244   }
2245
2246   GNUNET_assert (GNUNET_YES ==
2247                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
2248                                                        &s->target,
2249                                                        s));
2250   GNUNET_STATISTICS_set (plugin->env->stats,
2251                          "# UDP sessions active",
2252                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2253                          GNUNET_NO);
2254   if (s->rc > 0)
2255   {
2256     s->in_destroy = GNUNET_YES;
2257   }
2258   else
2259   {
2260     free_session (s);
2261   }
2262   return GNUNET_OK;
2263 }
2264
2265
2266 /**
2267  * Destroy a session, plugin is being unloaded.
2268  *
2269  * @param cls the `struct Plugin`
2270  * @param key hash of public key of target peer
2271  * @param value a `struct PeerSession *` to clean up
2272  * @return #GNUNET_OK (continue to iterate)
2273  */
2274 static int
2275 disconnect_and_free_it (void *cls,
2276                         const struct GNUNET_PeerIdentity *key,
2277                         void *value)
2278 {
2279   struct Plugin *plugin = cls;
2280
2281   udp_disconnect_session (plugin,
2282                           value);
2283   return GNUNET_OK;
2284 }
2285
2286
2287 /**
2288  * Disconnect from a remote node.  Clean up session if we have one for
2289  * this peer.
2290  *
2291  * @param cls closure for this call (should be handle to Plugin)
2292  * @param target the peeridentity of the peer to disconnect
2293  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
2294  */
2295 static void
2296 udp_disconnect (void *cls,
2297                 const struct GNUNET_PeerIdentity *target)
2298 {
2299   struct Plugin *plugin = cls;
2300
2301   LOG (GNUNET_ERROR_TYPE_DEBUG,
2302        "Disconnecting from peer `%s'\n",
2303        GNUNET_i2s (target));
2304   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
2305                                               target,
2306                                               &disconnect_and_free_it,
2307                                               plugin);
2308 }
2309
2310
2311 /**
2312  * Session was idle, so disconnect it.
2313  *
2314  * @param cls the `struct Session` to time out
2315  * @param tc scheduler context
2316  */
2317 static void
2318 session_timeout (void *cls,
2319                  const struct GNUNET_SCHEDULER_TaskContext *tc)
2320 {
2321   struct Session *s = cls;
2322   struct Plugin *plugin = s->plugin;
2323   struct GNUNET_TIME_Relative left;
2324
2325   s->timeout_task = NULL;
2326   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
2327   if (left.rel_value_us > 0)
2328   {
2329     /* not actually our turn yet, but let's at least update
2330        the monitor, it may think we're about to die ... */
2331     notify_session_monitor (s->plugin,
2332                             s,
2333                             GNUNET_TRANSPORT_SS_UPDATE);
2334     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
2335                                                     &session_timeout,
2336                                                     s);
2337     return;
2338   }
2339   LOG (GNUNET_ERROR_TYPE_DEBUG,
2340        "Session %p was idle for %s, disconnecting\n",
2341        s,
2342        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
2343                                                GNUNET_YES));
2344   /* call session destroy function */
2345   udp_disconnect_session (plugin,
2346                           s);
2347 }
2348
2349
2350 /**
2351  * Allocate a new session for the given endpoint address.
2352  * Note that this function does not inform the service
2353  * of the new session, this is the responsibility of the
2354  * caller (if needed).
2355  *
2356  * @param cls the `struct Plugin`
2357  * @param address address of the other peer to use
2358  * @param network_type network type the address belongs to
2359  * @return NULL on error, otherwise session handle
2360  */
2361 static struct Session *
2362 udp_plugin_create_session (void *cls,
2363                            const struct GNUNET_HELLO_Address *address,
2364                            enum GNUNET_ATS_Network_Type network_type)
2365 {
2366   struct Plugin *plugin = cls;
2367   struct Session *s;
2368
2369   s = GNUNET_new (struct Session);
2370   s->plugin = plugin;
2371   s->address = GNUNET_HELLO_address_copy (address);
2372   s->target = address->peer;
2373   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2374                                                               250);
2375   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
2376   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
2377   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
2378   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
2379   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
2380                                                   &session_timeout,
2381                                                   s);
2382   s->scope = network_type;
2383
2384   LOG (GNUNET_ERROR_TYPE_DEBUG,
2385        "Creating new session %p for peer `%s' address `%s'\n",
2386        s,
2387        GNUNET_i2s (&address->peer),
2388        udp_address_to_string (plugin,
2389                               address->address,
2390                               address->address_length));
2391   GNUNET_assert (GNUNET_OK ==
2392                  GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
2393                                                     &s->target,
2394                                                     s,
2395                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2396   GNUNET_STATISTICS_set (plugin->env->stats,
2397                          "# UDP sessions active",
2398                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2399                          GNUNET_NO);
2400   notify_session_monitor (plugin,
2401                           s,
2402                           GNUNET_TRANSPORT_SS_INIT);
2403   return s;
2404 }
2405
2406
2407 /**
2408  * Creates a new outbound session the transport service will use to
2409  * send data to the peer.
2410  *
2411  * @param cls the `struct Plugin *`
2412  * @param address the address
2413  * @return the session or NULL of max connections exceeded
2414  */
2415 static struct Session *
2416 udp_plugin_get_session (void *cls,
2417                         const struct GNUNET_HELLO_Address *address)
2418 {
2419   struct Plugin *plugin = cls;
2420   struct Session *s;
2421   enum GNUNET_ATS_Network_Type network_type;
2422   const struct IPv4UdpAddress *udp_v4;
2423   const struct IPv6UdpAddress *udp_v6;
2424
2425   if (NULL == address)
2426   {
2427     GNUNET_break (0);
2428     return NULL;
2429   }
2430   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
2431        (address->address_length != sizeof(struct IPv6UdpAddress)) )
2432   {
2433     GNUNET_break_op (0);
2434     return NULL;
2435   }
2436   if (NULL != (s = udp_plugin_lookup_session (cls,
2437                                               address)))
2438     return s;
2439
2440   /* need to create new session */
2441   if (sizeof (struct IPv4UdpAddress) == address->address_length)
2442   {
2443     struct sockaddr_in v4;
2444
2445     udp_v4 = (const struct IPv4UdpAddress *) address->address;
2446     memset (&v4, '\0', sizeof (v4));
2447     v4.sin_family = AF_INET;
2448 #if HAVE_SOCKADDR_IN_SIN_LEN
2449     v4.sin_len = sizeof (struct sockaddr_in);
2450 #endif
2451     v4.sin_port = udp_v4->u4_port;
2452     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
2453     network_type = plugin->env->get_address_type (plugin->env->cls,
2454                                                   (const struct sockaddr *) &v4,
2455                                                   sizeof (v4));
2456   }
2457   if (sizeof (struct IPv6UdpAddress) == address->address_length)
2458   {
2459     struct sockaddr_in6 v6;
2460
2461     udp_v6 = (const struct IPv6UdpAddress *) address->address;
2462     memset (&v6, '\0', sizeof (v6));
2463     v6.sin6_family = AF_INET6;
2464 #if HAVE_SOCKADDR_IN_SIN_LEN
2465     v6.sin6_len = sizeof (struct sockaddr_in6);
2466 #endif
2467     v6.sin6_port = udp_v6->u6_port;
2468     v6.sin6_addr = udp_v6->ipv6_addr;
2469     network_type = plugin->env->get_address_type (plugin->env->cls,
2470                                                   (const struct sockaddr *) &v6,
2471                                                   sizeof (v6));
2472   }
2473   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2474   return udp_plugin_create_session (cls,
2475                                     address,
2476                                     network_type);
2477 }
2478
2479
2480 /**
2481  * We've received a UDP Message.  Process it (pass contents to main service).
2482  *
2483  * @param plugin plugin context
2484  * @param msg the message
2485  * @param udp_addr sender address
2486  * @param udp_addr_len number of bytes in @a udp_addr
2487  * @param network_type network type the address belongs to
2488  */
2489 static void
2490 process_udp_message (struct Plugin *plugin,
2491                      const struct UDPMessage *msg,
2492                      const union UdpAddress *udp_addr,
2493                      size_t udp_addr_len,
2494                      enum GNUNET_ATS_Network_Type network_type)
2495 {
2496   struct Session *s;
2497   struct GNUNET_HELLO_Address *address;
2498
2499   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2500   if (0 != ntohl (msg->reserved))
2501   {
2502     GNUNET_break_op(0);
2503     return;
2504   }
2505   if (ntohs (msg->header.size)
2506       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2507   {
2508     GNUNET_break_op(0);
2509     return;
2510   }
2511
2512   address = GNUNET_HELLO_address_allocate (&msg->sender,
2513                                            PLUGIN_NAME,
2514                                            udp_addr,
2515                                            udp_addr_len,
2516                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2517   if (NULL ==
2518       (s = udp_plugin_lookup_session (plugin,
2519                                       address)))
2520   {
2521     s = udp_plugin_create_session (plugin,
2522                                    address,
2523                                    network_type);
2524     plugin->env->session_start (plugin->env->cls,
2525                                 address,
2526                                 s,
2527                                 s->scope);
2528     notify_session_monitor (plugin,
2529                             s,
2530                             GNUNET_TRANSPORT_SS_UP);
2531   }
2532   GNUNET_free (address);
2533
2534   s->rc++;
2535   GNUNET_SERVER_mst_receive (plugin->mst,
2536                              s,
2537                              (const char *) &msg[1],
2538                              ntohs (msg->header.size) - sizeof(struct UDPMessage),
2539                              GNUNET_YES,
2540                              GNUNET_NO);
2541   s->rc--;
2542   if ( (0 == s->rc) &&
2543        (GNUNET_YES == s->in_destroy) )
2544     free_session (s);
2545 }
2546
2547
2548 /**
2549  * Process a defragmented message.
2550  *
2551  * @param cls the `struct DefragContext *`
2552  * @param msg the message
2553  */
2554 static void
2555 fragment_msg_proc (void *cls,
2556                    const struct GNUNET_MessageHeader *msg)
2557 {
2558   struct DefragContext *dc = cls;
2559   const struct UDPMessage *um;
2560
2561   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2562   {
2563     GNUNET_break_op (0);
2564     return;
2565   }
2566   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2567   {
2568     GNUNET_break_op (0);
2569     return;
2570   }
2571   um = (const struct UDPMessage *) msg;
2572   dc->sender = um->sender;
2573   dc->have_sender = GNUNET_YES;
2574   process_udp_message (dc->plugin,
2575                        um,
2576                        dc->udp_addr,
2577                        dc->udp_addr_len,
2578                        dc->network_type);
2579 }
2580
2581
2582 /**
2583  * We finished sending an acknowledgement.  Update
2584  * statistics.
2585  *
2586  * @param cls the `struct Plugin`
2587  * @param udpw message queue entry of the ACK
2588  * @param result #GNUNET_OK if the transmission worked,
2589  *               #GNUNET_SYSERR if we failed to send the ACK
2590  */
2591 static void
2592 ack_message_sent (void *cls,
2593                   struct UDP_MessageWrapper *udpw,
2594                   int result)
2595 {
2596   struct Plugin *plugin = cls;
2597
2598   if (GNUNET_OK == result)
2599   {
2600     GNUNET_STATISTICS_update (plugin->env->stats,
2601                               "# UDP, ACK messages sent",
2602                               1,
2603                               GNUNET_NO);
2604   }
2605   else
2606   {
2607     GNUNET_STATISTICS_update (plugin->env->stats,
2608                               "# UDP, ACK transmissions failed",
2609                               1,
2610                               GNUNET_NO);
2611   }
2612 }
2613
2614
2615 /**
2616  * Transmit an acknowledgement.
2617  *
2618  * @param cls the `struct DefragContext *`
2619  * @param id message ID (unused)
2620  * @param msg ack to transmit
2621  */
2622 static void
2623 ack_proc (void *cls,
2624           uint32_t id,
2625           const struct GNUNET_MessageHeader *msg)
2626 {
2627   struct DefragContext *rc = cls;
2628   struct Plugin *plugin = rc->plugin;
2629   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2630   struct UDP_ACK_Message *udp_ack;
2631   uint32_t delay;
2632   struct UDP_MessageWrapper *udpw;
2633   struct Session *s;
2634   struct GNUNET_HELLO_Address *address;
2635
2636   if (GNUNET_NO == rc->have_sender)
2637   {
2638     /* tried to defragment but never succeeded, hence will not ACK */
2639     GNUNET_break_op (0);
2640     return;
2641   }
2642   address = GNUNET_HELLO_address_allocate (&rc->sender,
2643                                            PLUGIN_NAME,
2644                                            rc->udp_addr,
2645                                            rc->udp_addr_len,
2646                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2647   s = udp_plugin_lookup_session (plugin,
2648                                  address);
2649   GNUNET_HELLO_address_free (address);
2650   if (NULL == s)
2651   {
2652     LOG (GNUNET_ERROR_TYPE_ERROR,
2653          "Trying to transmit ACK to peer `%s' but no session found!\n",
2654          udp_address_to_string (plugin,
2655                                 rc->udp_addr,
2656                                 rc->udp_addr_len));
2657     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2658     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2659     GNUNET_free (rc);
2660     GNUNET_STATISTICS_update (plugin->env->stats,
2661                               "# UDP, ACK transmissions failed",
2662                               1,
2663                               GNUNET_NO);
2664     return;
2665   }
2666   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
2667     delay = s->flow_delay_for_other_peer.rel_value_us;
2668   else
2669     delay = UINT32_MAX;
2670
2671   LOG (GNUNET_ERROR_TYPE_DEBUG,
2672        "Sending ACK to `%s' including delay of %s\n",
2673        udp_address_to_string (plugin,
2674                               rc->udp_addr,
2675                               rc->udp_addr_len),
2676        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2677                                                GNUNET_YES));
2678   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2679   udpw->msg_size = msize;
2680   udpw->payload_size = 0;
2681   udpw->session = s;
2682   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2683   udpw->msg_buf = (char *) &udpw[1];
2684   udpw->qc = &ack_message_sent;
2685   udpw->qc_cls = plugin;
2686   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2687   udp_ack->header.size = htons ((uint16_t) msize);
2688   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2689   udp_ack->delay = htonl (delay);
2690   udp_ack->sender = *plugin->env->my_identity;
2691   memcpy (&udp_ack[1],
2692           msg,
2693           ntohs (msg->size));
2694   enqueue (plugin,
2695            udpw);
2696   notify_session_monitor (plugin,
2697                           s,
2698                           GNUNET_TRANSPORT_SS_UPDATE);
2699   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2700     schedule_select_v4 (plugin);
2701   else
2702     schedule_select_v6 (plugin);
2703 }
2704
2705
2706 /**
2707  * We received a fragment, process it.
2708  *
2709  * @param plugin our plugin
2710  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2711  * @param udp_addr sender address
2712  * @param udp_addr_len number of bytes in @a udp_addr
2713  * @param network_type network type the address belongs to
2714  */
2715 static void
2716 read_process_fragment (struct Plugin *plugin,
2717                        const struct GNUNET_MessageHeader *msg,
2718                        const union UdpAddress *udp_addr,
2719                        size_t udp_addr_len,
2720                        enum GNUNET_ATS_Network_Type network_type)
2721 {
2722   struct DefragContext *d_ctx;
2723   struct GNUNET_TIME_Absolute now;
2724   struct FindReceiveContext frc;
2725
2726   frc.rc = NULL;
2727   frc.udp_addr = udp_addr;
2728   frc.udp_addr_len = udp_addr_len;
2729
2730   /* Lookup existing receive context for this address */
2731   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2732                                  &find_receive_context,
2733                                  &frc);
2734   now = GNUNET_TIME_absolute_get ();
2735   d_ctx = frc.rc;
2736
2737   if (NULL == d_ctx)
2738   {
2739     /* Create a new defragmentation context */
2740     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
2741     memcpy (&d_ctx[1],
2742             udp_addr,
2743             udp_addr_len);
2744     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2745     d_ctx->udp_addr_len = udp_addr_len;
2746     d_ctx->network_type = network_type;
2747     d_ctx->plugin = plugin;
2748     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2749                                                       UDP_MTU,
2750                                                       UDP_MAX_MESSAGES_IN_DEFRAG,
2751                                                       d_ctx,
2752                                                       &fragment_msg_proc,
2753                                                       &ack_proc);
2754     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2755                                                  d_ctx,
2756                                                  (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2757     LOG (GNUNET_ERROR_TYPE_DEBUG,
2758          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2759          (unsigned int) ntohs (msg->size),
2760          udp_address_to_string (plugin,
2761                                 udp_addr,
2762                                 udp_addr_len));
2763   }
2764   else
2765   {
2766     LOG (GNUNET_ERROR_TYPE_DEBUG,
2767          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2768          (unsigned int) ntohs (msg->size),
2769          udp_address_to_string (plugin,
2770                                 udp_addr,
2771                                 udp_addr_len));
2772   }
2773
2774   if (GNUNET_OK ==
2775       GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag,
2776                                           msg))
2777   {
2778     /* keep this 'rc' from expiring */
2779     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
2780                                        d_ctx->hnode,
2781                                        (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2782   }
2783   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2784       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2785   {
2786     /* remove 'rc' that was inactive the longest */
2787     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2788     GNUNET_assert (NULL != d_ctx);
2789     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2790     GNUNET_free (d_ctx);
2791     GNUNET_STATISTICS_update (plugin->env->stats,
2792                               "# UDP, Defragmentations aborted",
2793                               1,
2794                               GNUNET_NO);
2795   }
2796 }
2797
2798
2799 /**
2800  * Read and process a message from the given socket.
2801  *
2802  * @param plugin the overall plugin
2803  * @param rsock socket to read from
2804  */
2805 static void
2806 udp_select_read (struct Plugin *plugin,
2807                  struct GNUNET_NETWORK_Handle *rsock)
2808 {
2809   socklen_t fromlen;
2810   struct sockaddr_storage addr;
2811   char buf[65536] GNUNET_ALIGN;
2812   ssize_t size;
2813   const struct GNUNET_MessageHeader *msg;
2814   struct IPv4UdpAddress v4;
2815   struct IPv6UdpAddress v6;
2816   const struct sockaddr *sa;
2817   const struct sockaddr_in *sa4;
2818   const struct sockaddr_in6 *sa6;
2819   const union UdpAddress *int_addr;
2820   size_t int_addr_len;
2821   enum GNUNET_ATS_Network_Type network_type;
2822
2823   fromlen = sizeof (addr);
2824   memset (&addr,
2825           0,
2826           sizeof(addr));
2827   size = GNUNET_NETWORK_socket_recvfrom (rsock,
2828                                          buf,
2829                                          sizeof(buf),
2830                                          (struct sockaddr *) &addr,
2831                                          &fromlen);
2832   sa = (const struct sockaddr *) &addr;
2833 #if MINGW
2834   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2835    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2836    * on this socket has failed.
2837    * Quote from MSDN:
2838    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2839    *   executing a hard or abortive close. The application should close
2840    *   the socket; it is no longer usable. On a UDP-datagram socket this
2841    *   error indicates a previous send operation resulted in an ICMP Port
2842    *   Unreachable message.
2843    */
2844   if ( (-1 == size) &&
2845        (ECONNRESET == errno) )
2846     return;
2847 #endif
2848   if (-1 == size)
2849   {
2850     LOG (GNUNET_ERROR_TYPE_DEBUG,
2851          "UDP failed to receive data: %s\n",
2852          STRERROR (errno));
2853     /* Connection failure or something. Not a protocol violation. */
2854     return;
2855   }
2856   if (size < sizeof(struct GNUNET_MessageHeader))
2857   {
2858     LOG (GNUNET_ERROR_TYPE_WARNING,
2859          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
2860          (unsigned int ) size,
2861          GNUNET_a2s (sa,
2862                      fromlen));
2863     /* _MAY_ be a connection failure (got partial message) */
2864     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2865     GNUNET_break_op (0);
2866     return;
2867   }
2868   msg = (const struct GNUNET_MessageHeader *) buf;
2869   LOG (GNUNET_ERROR_TYPE_DEBUG,
2870        "UDP received %u-byte message from `%s' type %u\n",
2871        (unsigned int) size,
2872        GNUNET_a2s (sa,
2873                    fromlen),
2874        ntohs (msg->type));
2875   if (size != ntohs (msg->size))
2876   {
2877     LOG (GNUNET_ERROR_TYPE_WARNING,
2878          "UDP malformed message header from %s\n",
2879          (unsigned int) size,
2880          GNUNET_a2s (sa,
2881                      fromlen));
2882     GNUNET_break_op (0);
2883     return;
2884   }
2885   GNUNET_STATISTICS_update (plugin->env->stats,
2886                             "# UDP, total bytes received",
2887                             size,
2888                             GNUNET_NO);
2889   network_type = plugin->env->get_address_type (plugin->env->cls,
2890                                                 sa,
2891                                                 fromlen);
2892   switch (sa->sa_family)
2893   {
2894   case AF_INET:
2895     sa4 = (const struct sockaddr_in *) &addr;
2896     v4.options = 0;
2897     v4.ipv4_addr = sa4->sin_addr.s_addr;
2898     v4.u4_port = sa4->sin_port;
2899     int_addr = (union UdpAddress *) &v4;
2900     int_addr_len = sizeof (v4);
2901     break;
2902   case AF_INET6:
2903     sa6 = (const struct sockaddr_in6 *) &addr;
2904     v6.options = 0;
2905     v6.ipv6_addr = sa6->sin6_addr;
2906     v6.u6_port = sa6->sin6_port;
2907     int_addr = (union UdpAddress *) &v6;
2908     int_addr_len = sizeof (v6);
2909     break;
2910   default:
2911     GNUNET_break (0);
2912     return;
2913   }
2914
2915   switch (ntohs (msg->type))
2916   {
2917   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2918     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
2919       udp_broadcast_receive (plugin,
2920                              buf,
2921                              size,
2922                              int_addr,
2923                              int_addr_len,
2924                              network_type);
2925     return;
2926   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2927     if (ntohs (msg->size) < sizeof(struct UDPMessage))
2928     {
2929       GNUNET_break_op(0);
2930       return;
2931     }
2932     process_udp_message (plugin,
2933                          (const struct UDPMessage *) msg,
2934                          int_addr,
2935                          int_addr_len,
2936                          network_type);
2937     return;
2938   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2939     read_process_ack (plugin,
2940                       msg,
2941                       int_addr,
2942                       int_addr_len);
2943     return;
2944   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2945     read_process_fragment (plugin,
2946                            msg,
2947                            int_addr,
2948                            int_addr_len,
2949                            network_type);
2950     return;
2951   default:
2952     GNUNET_break_op(0);
2953     return;
2954   }
2955 }
2956
2957
2958 /**
2959  * Removes messages from the transmission queue that have
2960  * timed out, and then selects a message that should be
2961  * transmitted next.
2962  *
2963  * @param plugin the UDP plugin
2964  * @param sock which socket should we process the queue for (v4 or v6)
2965  * @return message selected for transmission, or NULL for none
2966  */
2967 static struct UDP_MessageWrapper *
2968 remove_timeout_messages_and_select (struct Plugin *plugin,
2969                                     struct GNUNET_NETWORK_Handle *sock)
2970 {
2971   struct UDP_MessageWrapper *udpw;
2972   struct GNUNET_TIME_Relative remaining;
2973   struct Session *session;
2974   int removed;
2975
2976   removed = GNUNET_NO;
2977   udpw = (sock == plugin->sockv4)
2978     ? plugin->ipv4_queue_head
2979     : plugin->ipv6_queue_head;
2980   while (NULL != udpw)
2981   {
2982     session = udpw->session;
2983     /* Find messages with timeout */
2984     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2985     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2986     {
2987       /* Message timed out */
2988       udpw->qc (udpw->qc_cls,
2989                 udpw,
2990                 GNUNET_SYSERR);
2991       /* Remove message */
2992       removed = GNUNET_YES;
2993       dequeue (plugin,
2994                udpw);
2995       GNUNET_free (udpw);
2996
2997       if (sock == plugin->sockv4)
2998       {
2999         udpw = plugin->ipv4_queue_head;
3000       }
3001       else if (sock == plugin->sockv6)
3002       {
3003         udpw = plugin->ipv6_queue_head;
3004       }
3005       else
3006       {
3007         GNUNET_break (0); /* should never happen */
3008         udpw = NULL;
3009       }
3010       GNUNET_STATISTICS_update (plugin->env->stats,
3011                                 "# messages discarded due to timeout",
3012                                 1,
3013                                 GNUNET_NO);
3014     }
3015     else
3016     {
3017       /* Message did not time out, check flow delay */
3018       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
3019       if (0 == remaining.rel_value_us)
3020       {
3021         /* this message is not delayed */
3022         LOG (GNUNET_ERROR_TYPE_DEBUG,
3023              "Message for peer `%s' (%u bytes) is not delayed \n",
3024              GNUNET_i2s (&udpw->session->target),
3025              udpw->payload_size);
3026         break; /* Found message to send, break */
3027       }
3028       else
3029       {
3030         /* Message is delayed, try next */
3031         LOG (GNUNET_ERROR_TYPE_DEBUG,
3032              "Message for peer `%s' (%u bytes) is delayed for %s\n",
3033              GNUNET_i2s (&udpw->session->target),
3034              udpw->payload_size,
3035              GNUNET_STRINGS_relative_time_to_string (remaining,
3036                                                      GNUNET_YES));
3037         udpw = udpw->next;
3038       }
3039     }
3040   }
3041   if (GNUNET_YES == removed)
3042     notify_session_monitor (session->plugin,
3043                             session,
3044                             GNUNET_TRANSPORT_SS_UPDATE);
3045   return udpw;
3046 }
3047
3048
3049 /**
3050  * We failed to transmit a message via UDP. Generate
3051  * a descriptive error message.
3052  *
3053  * @param plugin our plugin
3054  * @param sa target address we were trying to reach
3055  * @param slen number of bytes in @a sa
3056  * @param error the errno value returned from the sendto() call
3057  */
3058 static void
3059 analyze_send_error (struct Plugin *plugin,
3060                     const struct sockaddr *sa,
3061                     socklen_t slen,
3062                     int error)
3063 {
3064   enum GNUNET_ATS_Network_Type type;
3065
3066   type = plugin->env->get_address_type (plugin->env->cls,
3067                                         sa,
3068                                         slen);
3069   if ( ( (GNUNET_ATS_NET_LAN == type) ||
3070          (GNUNET_ATS_NET_WAN == type) ) &&
3071        ( (ENETUNREACH == errno) ||
3072          (ENETDOWN == errno) ) )
3073   {
3074     if (slen == sizeof (struct sockaddr_in))
3075     {
3076       /* IPv4: "Network unreachable" or "Network down"
3077        *
3078        * This indicates we do not have connectivity
3079        */
3080       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3081            _("UDP could not transmit message to `%s': "
3082              "Network seems down, please check your network configuration\n"),
3083            GNUNET_a2s (sa,
3084                        slen));
3085     }
3086     if (slen == sizeof (struct sockaddr_in6))
3087     {
3088       /* IPv6: "Network unreachable" or "Network down"
3089        *
3090        * This indicates that this system is IPv6 enabled, but does not
3091        * have a valid global IPv6 address assigned or we do not have
3092        * connectivity
3093        */
3094       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3095            _("UDP could not transmit IPv6 message! "
3096              "Please check your network configuration and disable IPv6 if your "
3097              "connection does not have a global IPv6 address\n"));
3098     }
3099   }
3100   else
3101   {
3102     LOG (GNUNET_ERROR_TYPE_WARNING,
3103          "UDP could not transmit message to `%s': `%s'\n",
3104          GNUNET_a2s (sa,
3105                      slen),
3106          STRERROR (error));
3107   }
3108 }
3109
3110
3111 /**
3112  * It is time to try to transmit a UDP message.  Select one
3113  * and send.
3114  *
3115  * @param plugin the plugin
3116  * @param sock which socket (v4/v6) to send on
3117  */
3118 static void
3119 udp_select_send (struct Plugin *plugin,
3120                  struct GNUNET_NETWORK_Handle *sock)
3121 {
3122   ssize_t sent;
3123   socklen_t slen;
3124   const struct sockaddr *a;
3125   const struct IPv4UdpAddress *u4;
3126   struct sockaddr_in a4;
3127   const struct IPv6UdpAddress *u6;
3128   struct sockaddr_in6 a6;
3129   struct UDP_MessageWrapper *udpw;
3130
3131   /* Find message(s) to send */
3132   while (NULL != (udpw = remove_timeout_messages_and_select (plugin,
3133                                                              sock)))
3134   {
3135     if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3136     {
3137       u4 = udpw->session->address->address;
3138       memset (&a4,
3139               0,
3140               sizeof(a4));
3141       a4.sin_family = AF_INET;
3142 #if HAVE_SOCKADDR_IN_SIN_LEN
3143       a4.sin_len = sizeof (a4);
3144 #endif
3145       a4.sin_port = u4->u4_port;
3146       a4.sin_addr.s_addr = u4->ipv4_addr;
3147       a = (const struct sockaddr *) &a4;
3148       slen = sizeof (a4);
3149     }
3150     else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3151     {
3152       u6 = udpw->session->address->address;
3153       memset (&a6,
3154               0,
3155               sizeof(a6));
3156       a6.sin6_family = AF_INET6;
3157 #if HAVE_SOCKADDR_IN_SIN_LEN
3158       a6.sin6_len = sizeof (a6);
3159 #endif
3160       a6.sin6_port = u6->u6_port;
3161       a6.sin6_addr = u6->ipv6_addr;
3162       a = (const struct sockaddr *) &a6;
3163       slen = sizeof (a6);
3164     }
3165     else
3166     {
3167       GNUNET_break (0);
3168       udpw->qc (udpw->qc_cls,
3169                 udpw,
3170                 GNUNET_SYSERR);
3171       dequeue (plugin,
3172                udpw);
3173       notify_session_monitor (plugin,
3174                               udpw->session,
3175                               GNUNET_TRANSPORT_SS_UPDATE);
3176       GNUNET_free (udpw);
3177       continue;
3178     }
3179     sent = GNUNET_NETWORK_socket_sendto (sock,
3180                                          udpw->msg_buf,
3181                                          udpw->msg_size,
3182                                          a,
3183                                          slen);
3184     if (GNUNET_SYSERR == sent)
3185     {
3186       /* Failure */
3187       analyze_send_error (plugin,
3188                           a,
3189                           slen,
3190                           errno);
3191       udpw->qc (udpw->qc_cls,
3192                 udpw,
3193                 GNUNET_SYSERR);
3194       GNUNET_STATISTICS_update (plugin->env->stats,
3195                                 "# UDP, total, bytes, sent, failure",
3196                                 sent,
3197                                 GNUNET_NO);
3198       GNUNET_STATISTICS_update (plugin->env->stats,
3199                                 "# UDP, total, messages, sent, failure",
3200                                 1,
3201                                 GNUNET_NO);
3202     }
3203     else
3204     {
3205       /* Success */
3206       LOG (GNUNET_ERROR_TYPE_DEBUG,
3207            "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3208            (unsigned int) (udpw->msg_size),
3209            GNUNET_i2s (&udpw->session->target),
3210            GNUNET_a2s (a,
3211                        slen),
3212            (int ) sent,
3213            (sent < 0) ? STRERROR (errno) : "ok");
3214       GNUNET_STATISTICS_update (plugin->env->stats,
3215                                 "# UDP, total, bytes, sent, success",
3216                                 sent,
3217                                 GNUNET_NO);
3218       GNUNET_STATISTICS_update (plugin->env->stats,
3219                                 "# UDP, total, messages, sent, success",
3220                                 1,
3221                                 GNUNET_NO);
3222       if (NULL != udpw->frag_ctx)
3223         udpw->frag_ctx->on_wire_size += udpw->msg_size;
3224       udpw->qc (udpw->qc_cls,
3225                 udpw,
3226                 GNUNET_OK);
3227     }
3228     dequeue (plugin,
3229              udpw);
3230     notify_session_monitor (plugin,
3231                             udpw->session,
3232                             GNUNET_TRANSPORT_SS_UPDATE);
3233     GNUNET_free (udpw);
3234   }
3235 }
3236
3237
3238 /* ***************** Event loop (part 2) *************** */
3239
3240
3241 /**
3242  * We have been notified that our readset has something to read.  We don't
3243  * know which socket needs to be read, so we have to check each one
3244  * Then reschedule this function to be called again once more is available.
3245  *
3246  * @param cls the plugin handle
3247  * @param tc the scheduling context
3248  */
3249 static void
3250 udp_plugin_select_v4 (void *cls,
3251                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3252 {
3253   struct Plugin *plugin = cls;
3254
3255   plugin->select_task_v4 = NULL;
3256   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3257     return;
3258   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3259       (NULL != plugin->sockv4) &&
3260       (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3261                                    plugin->sockv4)))
3262     udp_select_read (plugin,
3263                      plugin->sockv4);
3264   udp_select_send (plugin,
3265                    plugin->sockv4);
3266   schedule_select_v4 (plugin);
3267 }
3268
3269
3270 /**
3271  * We have been notified that our readset has something to read.  We don't
3272  * know which socket needs to be read, so we have to check each one
3273  * Then reschedule this function to be called again once more is available.
3274  *
3275  * @param cls the plugin handle
3276  * @param tc the scheduling context
3277  */
3278 static void
3279 udp_plugin_select_v6 (void *cls,
3280                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3281 {
3282   struct Plugin *plugin = cls;
3283
3284   plugin->select_task_v6 = NULL;
3285   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3286     return;
3287   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3288        (NULL != plugin->sockv6) &&
3289        (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3290                                     plugin->sockv6)) )
3291     udp_select_read (plugin,
3292                      plugin->sockv6);
3293   udp_select_send (plugin,
3294                    plugin->sockv6);
3295   schedule_select_v6 (plugin);
3296 }
3297
3298
3299 /* ******************* Initialization *************** */
3300
3301
3302 /**
3303  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3304  *
3305  * @param plugin the plugin to initialize
3306  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3307  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3308  * @return number of sockets that were successfully bound
3309  */
3310 static int
3311 setup_sockets (struct Plugin *plugin,
3312                const struct sockaddr_in6 *bind_v6,
3313                const struct sockaddr_in *bind_v4)
3314 {
3315   int tries;
3316   int sockets_created = 0;
3317   struct sockaddr_in6 server_addrv6;
3318   struct sockaddr_in server_addrv4;
3319   const struct sockaddr *server_addr;
3320   const struct sockaddr *addrs[2];
3321   socklen_t addrlens[2];
3322   socklen_t addrlen;
3323   int eno;
3324
3325   /* Create IPv6 socket */
3326   eno = EINVAL;
3327   if (GNUNET_YES == plugin->enable_ipv6)
3328   {
3329     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6,
3330                                                    SOCK_DGRAM,
3331                                                    0);
3332     if (NULL == plugin->sockv6)
3333     {
3334       LOG (GNUNET_ERROR_TYPE_INFO,
3335            _("Disabling IPv6 since it is not supported on this system!\n"));
3336       plugin->enable_ipv6 = GNUNET_NO;
3337     }
3338     else
3339     {
3340       memset (&server_addrv6,
3341               0,
3342               sizeof(struct sockaddr_in6));
3343 #if HAVE_SOCKADDR_IN_SIN_LEN
3344       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3345 #endif
3346       server_addrv6.sin6_family = AF_INET6;
3347       if (NULL != bind_v6)
3348         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3349       else
3350         server_addrv6.sin6_addr = in6addr_any;
3351
3352       if (0 == plugin->port) /* autodetect */
3353         server_addrv6.sin6_port
3354           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3355                                              33537)
3356                    + 32000);
3357       else
3358         server_addrv6.sin6_port = htons (plugin->port);
3359       addrlen = sizeof (struct sockaddr_in6);
3360       server_addr = (const struct sockaddr *) &server_addrv6;
3361
3362       tries = 0;
3363       while (tries < 10)
3364       {
3365         LOG(GNUNET_ERROR_TYPE_DEBUG,
3366             "Binding to IPv6 `%s'\n",
3367             GNUNET_a2s (server_addr,
3368                         addrlen));
3369         /* binding */
3370         if (GNUNET_OK ==
3371             GNUNET_NETWORK_socket_bind (plugin->sockv6,
3372                                         server_addr,
3373                                         addrlen))
3374           break;
3375         eno = errno;
3376         if (0 != plugin->port)
3377         {
3378           tries = 10; /* fail immediately */
3379           break; /* bind failed on specific port */
3380         }
3381         /* autodetect */
3382         server_addrv6.sin6_port
3383           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3384                                              33537)
3385                    + 32000);
3386         tries++;
3387       }
3388       if (tries >= 10)
3389       {
3390         GNUNET_NETWORK_socket_close (plugin->sockv6);
3391         plugin->enable_ipv6 = GNUNET_NO;
3392         plugin->sockv6 = NULL;
3393       }
3394       else
3395       {
3396         plugin->port = ntohs (server_addrv6.sin6_port);
3397       }
3398       if (NULL != plugin->sockv6)
3399       {
3400         LOG (GNUNET_ERROR_TYPE_DEBUG,
3401              "IPv6 UDP socket created listinging at %s\n",
3402              GNUNET_a2s (server_addr,
3403                          addrlen));
3404         addrs[sockets_created] = server_addr;
3405         addrlens[sockets_created] = addrlen;
3406         sockets_created++;
3407       }
3408       else
3409       {
3410         LOG (GNUNET_ERROR_TYPE_WARNING,
3411              _("Failed to bind UDP socket to %s: %s\n"),
3412              GNUNET_a2s (server_addr,
3413                          addrlen),
3414              STRERROR (eno));
3415       }
3416     }
3417   }
3418
3419   /* Create IPv4 socket */
3420   eno = EINVAL;
3421   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET,
3422                                                  SOCK_DGRAM,
3423                                                  0);
3424   if (NULL == plugin->sockv4)
3425   {
3426     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3427                          "socket");
3428     LOG (GNUNET_ERROR_TYPE_INFO,
3429          _("Disabling IPv4 since it is not supported on this system!\n"));
3430     plugin->enable_ipv4 = GNUNET_NO;
3431   }
3432   else
3433   {
3434     memset (&server_addrv4,
3435             0,
3436             sizeof(struct sockaddr_in));
3437 #if HAVE_SOCKADDR_IN_SIN_LEN
3438     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3439 #endif
3440     server_addrv4.sin_family = AF_INET;
3441     if (NULL != bind_v4)
3442       server_addrv4.sin_addr = bind_v4->sin_addr;
3443     else
3444       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3445
3446     if (0 == plugin->port)
3447       /* autodetect */
3448       server_addrv4.sin_port
3449         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3450                                            33537)
3451                  + 32000);
3452     else
3453       server_addrv4.sin_port = htons (plugin->port);
3454
3455     addrlen = sizeof (struct sockaddr_in);
3456     server_addr = (const struct sockaddr *) &server_addrv4;
3457
3458     tries = 0;
3459     while (tries < 10)
3460     {
3461       LOG (GNUNET_ERROR_TYPE_DEBUG,
3462            "Binding to IPv4 `%s'\n",
3463            GNUNET_a2s (server_addr,
3464                        addrlen));
3465
3466       /* binding */
3467       if (GNUNET_OK ==
3468           GNUNET_NETWORK_socket_bind (plugin->sockv4,
3469                                       server_addr,
3470                                       addrlen))
3471         break;
3472       eno = errno;
3473       if (0 != plugin->port)
3474       {
3475         tries = 10; /* fail */
3476         break; /* bind failed on specific port */
3477       }
3478
3479       /* autodetect */
3480       server_addrv4.sin_port
3481         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3482                                            33537)
3483                  + 32000);
3484       tries++;
3485     }
3486     if (tries >= 10)
3487     {
3488       GNUNET_NETWORK_socket_close (plugin->sockv4);
3489       plugin->enable_ipv4 = GNUNET_NO;
3490       plugin->sockv4 = NULL;
3491     }
3492     else
3493     {
3494       plugin->port = ntohs (server_addrv4.sin_port);
3495     }
3496
3497     if (NULL != plugin->sockv4)
3498     {
3499       LOG (GNUNET_ERROR_TYPE_DEBUG,
3500            "IPv4 socket created on port %s\n",
3501            GNUNET_a2s (server_addr,
3502                        addrlen));
3503       addrs[sockets_created] = server_addr;
3504       addrlens[sockets_created] = addrlen;
3505       sockets_created++;
3506     }
3507     else
3508     {
3509       LOG (GNUNET_ERROR_TYPE_ERROR,
3510            _("Failed to bind UDP socket to %s: %s\n"),
3511            GNUNET_a2s (server_addr,
3512                        addrlen),
3513            STRERROR (eno));
3514     }
3515   }
3516
3517   if (0 == sockets_created)
3518   {
3519     LOG (GNUNET_ERROR_TYPE_WARNING,
3520          _("Failed to open UDP sockets\n"));
3521     return 0; /* No sockets created, return */
3522   }
3523   schedule_select_v4 (plugin);
3524   schedule_select_v6 (plugin);
3525   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3526                                      GNUNET_NO,
3527                                      plugin->port,
3528                                      sockets_created,
3529                                      addrs,
3530                                      addrlens,
3531                                      &udp_nat_port_map_callback,
3532                                      NULL,
3533                                      plugin);
3534   return sockets_created;
3535 }
3536
3537
3538 /**
3539  * The exported method. Makes the core api available via a global and
3540  * returns the udp transport API.
3541  *
3542  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3543  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3544  */
3545 void *
3546 libgnunet_plugin_transport_udp_init (void *cls)
3547 {
3548   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3549   struct GNUNET_TRANSPORT_PluginFunctions *api;
3550   struct Plugin *p;
3551   unsigned long long port;
3552   unsigned long long aport;
3553   unsigned long long udp_max_bps;
3554   unsigned long long enable_v6;
3555   unsigned long long enable_broadcasting;
3556   unsigned long long enable_broadcasting_recv;
3557   char *bind4_address;
3558   char *bind6_address;
3559   struct GNUNET_TIME_Relative interval;
3560   struct sockaddr_in server_addrv4;
3561   struct sockaddr_in6 server_addrv6;
3562   int res;
3563   int have_bind4;
3564   int have_bind6;
3565
3566   if (NULL == env->receive)
3567   {
3568     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3569      initialze the plugin or the API */
3570     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3571     api->cls = NULL;
3572     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3573     api->address_to_string = &udp_address_to_string;
3574     api->string_to_address = &udp_string_to_address;
3575     return api;
3576   }
3577
3578   /* Get port number: port == 0 : autodetect a port,
3579    * > 0 : use this port, not given : 2086 default */
3580   if (GNUNET_OK !=
3581       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3582                                              "transport-udp",
3583                                              "PORT",
3584                                              &port))
3585     port = 2086;
3586   if (port > 65535)
3587   {
3588     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3589                                "transport-udp",
3590                                "PORT",
3591                                _("must be in [0,65535]"));
3592     return NULL;
3593   }
3594   if (GNUNET_OK !=
3595       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3596                                              "transport-udp",
3597                                              "ADVERTISED_PORT",
3598                                              &aport))
3599     aport = port;
3600   if (aport > 65535)
3601   {
3602     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3603                                "transport-udp",
3604                                "ADVERTISED_PORT",
3605                                _("must be in [0,65535]"));
3606     return NULL;
3607   }
3608
3609   if (GNUNET_YES ==
3610       GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3611                                             "nat",
3612                                             "DISABLEV6"))
3613     enable_v6 = GNUNET_NO;
3614   else
3615     enable_v6 = GNUNET_YES;
3616
3617   have_bind4 = GNUNET_NO;
3618   memset (&server_addrv4,
3619           0,
3620           sizeof (server_addrv4));
3621   if (GNUNET_YES ==
3622       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3623                                              "transport-udp",
3624                                              "BINDTO",
3625                                              &bind4_address))
3626   {
3627     LOG (GNUNET_ERROR_TYPE_DEBUG,
3628          "Binding UDP plugin to specific address: `%s'\n",
3629          bind4_address);
3630     if (1 != inet_pton (AF_INET,
3631                         bind4_address,
3632                         &server_addrv4.sin_addr))
3633     {
3634       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3635                                  "transport-udp",
3636                                  "BINDTO",
3637                                  _("must be valid IPv4 address"));
3638       GNUNET_free (bind4_address);
3639       return NULL;
3640     }
3641     have_bind4 = GNUNET_YES;
3642   }
3643   GNUNET_free_non_null (bind4_address);
3644   have_bind6 = GNUNET_NO;
3645   memset (&server_addrv6,
3646           0,
3647           sizeof (server_addrv6));
3648   if (GNUNET_YES ==
3649       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3650                                              "transport-udp",
3651                                              "BINDTO6",
3652                                              &bind6_address))
3653   {
3654     LOG (GNUNET_ERROR_TYPE_DEBUG,
3655          "Binding udp plugin to specific address: `%s'\n",
3656          bind6_address);
3657     if (1 != inet_pton (AF_INET6,
3658                         bind6_address,
3659                         &server_addrv6.sin6_addr))
3660     {
3661       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3662                                  "transport-udp",
3663                                  "BINDTO6",
3664                                  _("must be valid IPv6 address"));
3665       GNUNET_free (bind6_address);
3666       return NULL;
3667     }
3668     have_bind6 = GNUNET_YES;
3669   }
3670   GNUNET_free_non_null (bind6_address);
3671
3672   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3673                                                               "transport-udp",
3674                                                               "BROADCAST");
3675   if (enable_broadcasting == GNUNET_SYSERR)
3676     enable_broadcasting = GNUNET_NO;
3677
3678   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3679                                                                    "transport-udp",
3680                                                                    "BROADCAST_RECEIVE");
3681   if (enable_broadcasting_recv == GNUNET_SYSERR)
3682     enable_broadcasting_recv = GNUNET_YES;
3683
3684   if (GNUNET_SYSERR ==
3685       GNUNET_CONFIGURATION_get_value_time (env->cfg,
3686                                            "transport-udp",
3687                                            "BROADCAST_INTERVAL",
3688                                            &interval))
3689   {
3690     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
3691                                               10);
3692   }
3693   if (GNUNET_OK !=
3694       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3695                                              "transport-udp",
3696                                              "MAX_BPS",
3697                                              &udp_max_bps))
3698   {
3699     /* 50 MB/s == infinity for practical purposes */
3700     udp_max_bps = 1024 * 1024 * 50;
3701   }
3702
3703   p = GNUNET_new (struct Plugin);
3704   p->port = port;
3705   p->aport = aport;
3706   p->broadcast_interval = interval;
3707   p->enable_ipv6 = enable_v6;
3708   p->enable_ipv4 = GNUNET_YES; /* default */
3709   p->enable_broadcasting = enable_broadcasting;
3710   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3711   p->env = env;
3712   p->sessions = GNUNET_CONTAINER_multipeermap_create (16,
3713                                                       GNUNET_NO);
3714   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3715   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
3716                                      p);
3717   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3718                                  NULL,
3719                                  NULL,
3720                                  GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
3721                                  30);
3722   res = setup_sockets (p,
3723                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3724                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3725   if ( (0 == res) ||
3726        ( (NULL == p->sockv4) &&
3727          (NULL == p->sockv6) ) )
3728   {
3729     LOG (GNUNET_ERROR_TYPE_ERROR,
3730         _("Failed to create UDP network sockets\n"));
3731     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3732     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3733     GNUNET_SERVER_mst_destroy (p->mst);
3734     GNUNET_free (p);
3735     return NULL;
3736   }
3737
3738   /* Setup broadcasting and receiving beacons */
3739   setup_broadcast (p,
3740                    &server_addrv6,
3741                    &server_addrv4);
3742
3743   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3744   api->cls = p;
3745   api->disconnect_session = &udp_disconnect_session;
3746   api->query_keepalive_factor = &udp_query_keepalive_factor;
3747   api->disconnect_peer = &udp_disconnect;
3748   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3749   api->address_to_string = &udp_address_to_string;
3750   api->string_to_address = &udp_string_to_address;
3751   api->check_address = &udp_plugin_check_address;
3752   api->get_session = &udp_plugin_get_session;
3753   api->send = &udp_plugin_send;
3754   api->get_network = &udp_get_network;
3755   api->update_session_timeout = &udp_plugin_update_session_timeout;
3756   api->setup_monitor = &udp_plugin_setup_monitor;
3757   return api;
3758 }
3759
3760
3761 /**
3762  * Function called on each entry in the defragmentation heap to
3763  * clean it up.
3764  *
3765  * @param cls NULL
3766  * @param node node in the heap (to be removed)
3767  * @param element a `struct DefragContext` to be cleaned up
3768  * @param cost unused
3769  * @return #GNUNET_YES
3770  */
3771 static int
3772 heap_cleanup_iterator (void *cls,
3773                        struct GNUNET_CONTAINER_HeapNode *node,
3774                        void *element,
3775                        GNUNET_CONTAINER_HeapCostType cost)
3776 {
3777   struct DefragContext *d_ctx = element;
3778
3779   GNUNET_CONTAINER_heap_remove_node (node);
3780   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3781   GNUNET_free (d_ctx);
3782   return GNUNET_YES;
3783 }
3784
3785
3786 /**
3787  * The exported method. Makes the core api available via a global and
3788  * returns the udp transport API.
3789  *
3790  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3791  * @return NULL
3792  */
3793 void *
3794 libgnunet_plugin_transport_udp_done (void *cls)
3795 {
3796   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3797   struct Plugin *plugin = api->cls;
3798   struct PrettyPrinterContext *cur;
3799   struct UDP_MessageWrapper *udpw;
3800
3801   if (NULL == plugin)
3802   {
3803     GNUNET_free (api);
3804     return NULL;
3805   }
3806   stop_broadcast (plugin);
3807   if (NULL != plugin->select_task_v4)
3808   {
3809     GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
3810     plugin->select_task_v4 = NULL;
3811   }
3812   if (NULL != plugin->select_task_v6)
3813   {
3814     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3815     plugin->select_task_v6 = NULL;
3816   }
3817   if (NULL != plugin->sockv4)
3818   {
3819     GNUNET_break (GNUNET_OK ==
3820                   GNUNET_NETWORK_socket_close (plugin->sockv4));
3821     plugin->sockv4 = NULL;
3822   }
3823   if (NULL != plugin->sockv6)
3824   {
3825     GNUNET_break (GNUNET_OK ==
3826                   GNUNET_NETWORK_socket_close (plugin->sockv6));
3827     plugin->sockv6 = NULL;
3828   }
3829   if (NULL != plugin->nat)
3830   {
3831     GNUNET_NAT_unregister (plugin->nat);
3832     plugin->nat = NULL;
3833   }
3834   if (NULL != plugin->defrag_ctxs)
3835   {
3836     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3837                                    &heap_cleanup_iterator,
3838                                    NULL);
3839     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3840     plugin->defrag_ctxs = NULL;
3841   }
3842   if (NULL != plugin->mst)
3843   {
3844     GNUNET_SERVER_mst_destroy (plugin->mst);
3845     plugin->mst = NULL;
3846   }
3847   while (NULL != (udpw = plugin->ipv4_queue_head))
3848   {
3849     dequeue (plugin,
3850              udpw);
3851     udpw->qc (udpw->qc_cls,
3852               udpw,
3853               GNUNET_SYSERR);
3854     GNUNET_free (udpw);
3855   }
3856   while (NULL != (udpw = plugin->ipv6_queue_head))
3857   {
3858     dequeue (plugin,
3859              udpw);
3860     udpw->qc (udpw->qc_cls,
3861               udpw,
3862               GNUNET_SYSERR);
3863     GNUNET_free (udpw);
3864   }
3865   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3866                                          &disconnect_and_free_it,
3867                                          plugin);
3868   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3869
3870   while (NULL != (cur = plugin->ppc_dll_head))
3871   {
3872     GNUNET_break (0);
3873     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
3874                                  plugin->ppc_dll_tail,
3875                                  cur);
3876     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
3877     GNUNET_free (cur);
3878   }
3879   GNUNET_free (plugin);
3880   GNUNET_free (api);
3881   return NULL;
3882 }
3883
3884 /* end of plugin_transport_udp.c */