-this does not help #3719, likely diagnosis was wrong as well, undo
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 /**
46  * After how much inactivity should a UDP session time out?
47  */
48 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
49
50 /**
51  * Number of messages we can defragment in parallel.  We only really
52  * defragment 1 message at a time, but if messages get re-ordered, we
53  * may want to keep knowledge about the previous message to avoid
54  * discarding the current message in favor of a single fragment of a
55  * previous message.  3 should be good since we don't expect massive
56  * message reorderings with UDP.
57  */
58 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
59
60 /**
61  * We keep a defragmentation queue per sender address.  How many
62  * sender addresses do we support at the same time? Memory consumption
63  * is roughly a factor of 32k * #UDP_MAX_MESSAGES_IN_DEFRAG times this
64  * value. (So 128 corresponds to 12 MB and should suffice for
65  * connecting to roughly 128 peers via UDP).
66  */
67 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
68
69
70 /**
71  * UDP Message-Packet header (after defragmentation).
72  */
73 struct UDPMessage
74 {
75   /**
76    * Message header.
77    */
78   struct GNUNET_MessageHeader header;
79
80   /**
81    * Always zero for now.
82    */
83   uint32_t reserved;
84
85   /**
86    * What is the identity of the sender
87    */
88   struct GNUNET_PeerIdentity sender;
89
90 };
91
92
93 /**
94  * Closure for #append_port().
95  */
96 struct PrettyPrinterContext
97 {
98   /**
99    * DLL
100    */
101   struct PrettyPrinterContext *next;
102
103   /**
104    * DLL
105    */
106   struct PrettyPrinterContext *prev;
107
108   /**
109    * Our plugin.
110    */
111   struct Plugin *plugin;
112
113   /**
114    * Resolver handle
115    */
116   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
117
118   /**
119    * Function to call with the result.
120    */
121   GNUNET_TRANSPORT_AddressStringCallback asc;
122
123   /**
124    * Clsoure for @e asc.
125    */
126   void *asc_cls;
127
128   /**
129    * Timeout task
130    */
131   struct GNUNET_SCHEDULER_Task *timeout_task;
132
133   /**
134    * Is this an IPv6 address?
135    */
136   int ipv6;
137
138   /**
139    * Options
140    */
141   uint32_t options;
142
143   /**
144    * Port to add after the IP address.
145    */
146   uint16_t port;
147
148 };
149
150
151 /**
152  * Session with another peer.
153  */
154 struct Session
155 {
156   /**
157    * Which peer is this session for?
158    */
159   struct GNUNET_PeerIdentity target;
160
161   /**
162    * Plugin this session belongs to.
163    */
164   struct Plugin *plugin;
165
166   /**
167    * Context for dealing with fragments.
168    */
169   struct UDP_FragmentationContext *frag_ctx;
170
171   /**
172    * Desired delay for next sending we send to other peer
173    */
174   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
175
176   /**
177    * Desired delay for next sending we received from other peer
178    */
179   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
180
181   /**
182    * Session timeout task
183    */
184   struct GNUNET_SCHEDULER_Task *timeout_task;
185
186   /**
187    * When does this session time out?
188    */
189   struct GNUNET_TIME_Absolute timeout;
190
191   /**
192    * expected delay for ACKs
193    */
194   struct GNUNET_TIME_Relative last_expected_ack_delay;
195
196   /**
197    * desired delay between UDP messages
198    */
199   struct GNUNET_TIME_Relative last_expected_msg_delay;
200
201   /**
202    * Our own address.
203    */
204   struct GNUNET_HELLO_Address *address;
205
206   /**
207    * Number of bytes waiting for transmission to this peer.
208    */
209   unsigned long long bytes_in_queue;
210
211   /**
212    * Number of messages waiting for transmission to this peer.
213    */
214   unsigned int msgs_in_queue;
215
216   /**
217    * Reference counter to indicate that this session is
218    * currently being used and must not be destroyed;
219    * setting @e in_destroy will destroy it as soon as
220    * possible.
221    */
222   unsigned int rc;
223
224   /**
225    * Network type of the address.
226    */
227   enum GNUNET_ATS_Network_Type scope;
228
229   /**
230    * Is this session about to be destroyed (sometimes we cannot
231    * destroy a session immediately as below us on the stack
232    * there might be code that still uses it; in this case,
233    * @e rc is non-zero).
234    */
235   int in_destroy;
236 };
237
238
239
240 /**
241  * Data structure to track defragmentation contexts based
242  * on the source of the UDP traffic.
243  */
244 struct DefragContext
245 {
246
247   /**
248    * Defragmentation context.
249    */
250   struct GNUNET_DEFRAGMENT_Context *defrag;
251
252   /**
253    * Reference to master plugin struct.
254    */
255   struct Plugin *plugin;
256
257   /**
258    * Node in the defrag heap.
259    */
260   struct GNUNET_CONTAINER_HeapNode *hnode;
261
262   /**
263    * Source address this receive context is for (allocated at the
264    * end of the struct).
265    */
266   const union UdpAddress *udp_addr;
267
268   /**
269    * Who's message(s) are we defragmenting here?
270    * Only initialized once we succeeded and
271    * @e have_sender is set.
272    */
273   struct GNUNET_PeerIdentity sender;
274
275   /**
276    * Length of @e udp_addr.
277    */
278   size_t udp_addr_len;
279
280   /**
281    * Network type the address belongs to.
282    */
283   enum GNUNET_ATS_Network_Type network_type;
284
285   /**
286    * Has the @e sender field been initialized yet?
287    */
288   int have_sender;
289 };
290
291
292 /**
293  * Context to send fragmented messages
294  */
295 struct UDP_FragmentationContext
296 {
297   /**
298    * Next in linked list
299    */
300   struct UDP_FragmentationContext *next;
301
302   /**
303    * Previous in linked list
304    */
305   struct UDP_FragmentationContext *prev;
306
307   /**
308    * The plugin
309    */
310   struct Plugin *plugin;
311
312   /**
313    * Handle for fragmentation.
314    */
315   struct GNUNET_FRAGMENT_Context *frag;
316
317   /**
318    * The session this fragmentation context belongs to
319    */
320   struct Session *session;
321
322   /**
323    * Function to call upon completion of the transmission.
324    */
325   GNUNET_TRANSPORT_TransmitContinuation cont;
326
327   /**
328    * Closure for @e cont.
329    */
330   void *cont_cls;
331
332   /**
333    * Message timeout
334    */
335   struct GNUNET_TIME_Absolute timeout;
336
337   /**
338    * Payload size of original unfragmented message
339    */
340   size_t payload_size;
341
342   /**
343    * Bytes used to send all fragments on wire including UDP overhead
344    */
345   size_t on_wire_size;
346
347 };
348
349
350 /**
351  * Function called when a message is removed from the
352  * transmission queue.
353  *
354  * @param cls closure
355  * @param udpw message wrapper finished
356  * @param result #GNUNET_OK on success (message was sent)
357  *               #GNUNET_SYSERR if the target disconnected
358  *               or we had a timeout or other trouble sending
359  */
360 typedef void
361 (*QueueContinuation) (void *cls,
362                       struct UDP_MessageWrapper *udpw,
363                       int result);
364
365
366 /**
367  * Information we track for each message in the queue.
368  */
369 struct UDP_MessageWrapper
370 {
371   /**
372    * Session this message belongs to
373    */
374   struct Session *session;
375
376   /**
377    * DLL of messages, previous element
378    */
379   struct UDP_MessageWrapper *prev;
380
381   /**
382    * DLL of messages, next element
383    */
384   struct UDP_MessageWrapper *next;
385
386   /**
387    * Message with @e msg_size bytes including UDP-specific overhead.
388    */
389   char *msg_buf;
390
391   /**
392    * Function to call once the message wrapper is being removed
393    * from the queue (with success or failure).
394    */
395   QueueContinuation qc;
396
397   /**
398    * Closure for @e qc.
399    */
400   void *qc_cls;
401
402   /**
403    * External continuation to call upon completion of the
404    * transmission, NULL if this queue entry is not for a
405    * message from the application.
406    */
407   GNUNET_TRANSPORT_TransmitContinuation cont;
408
409   /**
410    * Closure for @e cont.
411    */
412   void *cont_cls;
413
414   /**
415    * Fragmentation context.
416    * frag_ctx == NULL if transport <= MTU
417    * frag_ctx != NULL if transport > MTU
418    */
419   struct UDP_FragmentationContext *frag_ctx;
420
421   /**
422    * Message timeout.
423    */
424   struct GNUNET_TIME_Absolute timeout;
425
426   /**
427    * Size of UDP message to send, including UDP-specific overhead.
428    */
429   size_t msg_size;
430
431   /**
432    * Payload size of original message.
433    */
434   size_t payload_size;
435
436 };
437
438
439 GNUNET_NETWORK_STRUCT_BEGIN
440
441 /**
442  * UDP ACK Message-Packet header.
443  */
444 struct UDP_ACK_Message
445 {
446   /**
447    * Message header.
448    */
449   struct GNUNET_MessageHeader header;
450
451   /**
452    * Desired delay for flow control, in us (in NBO).
453    */
454   uint32_t delay GNUNET_PACKED;
455
456   /**
457    * What is the identity of the sender
458    */
459   struct GNUNET_PeerIdentity sender;
460
461 };
462
463 GNUNET_NETWORK_STRUCT_END
464
465
466 /* ************************* Monitoring *********** */
467
468
469 /**
470  * If a session monitor is attached, notify it about the new
471  * session state.
472  *
473  * @param plugin our plugin
474  * @param session session that changed state
475  * @param state new state of the session
476  */
477 static void
478 notify_session_monitor (struct Plugin *plugin,
479                         struct Session *session,
480                         enum GNUNET_TRANSPORT_SessionState state)
481 {
482   struct GNUNET_TRANSPORT_SessionInfo info;
483
484   if (NULL == plugin->sic)
485     return;
486   if (GNUNET_YES == session->in_destroy)
487     return; /* already destroyed, just RC>0 left-over actions */
488   memset (&info,
489           0,
490           sizeof (info));
491   info.state = state;
492   info.is_inbound = GNUNET_SYSERR; /* hard to say */
493   info.num_msg_pending = session->msgs_in_queue;
494   info.num_bytes_pending = session->bytes_in_queue;
495   /* info.receive_delay remains zero as this is not supported by UDP
496      (cannot selectively not receive from 'some' peer while continuing
497      to receive from others) */
498   info.session_timeout = session->timeout;
499   info.address = session->address;
500   plugin->sic (plugin->sic_cls,
501                session,
502                &info);
503 }
504
505
506 /**
507  * Return information about the given session to the monitor callback.
508  *
509  * @param cls the `struct Plugin` with the monitor callback (`sic`)
510  * @param peer peer we send information about
511  * @param value our `struct Session` to send information about
512  * @return #GNUNET_OK (continue to iterate)
513  */
514 static int
515 send_session_info_iter (void *cls,
516                         const struct GNUNET_PeerIdentity *peer,
517                         void *value)
518 {
519   struct Plugin *plugin = cls;
520   struct Session *session = value;
521
522   notify_session_monitor (plugin,
523                           session,
524                           GNUNET_TRANSPORT_SS_INIT);
525   notify_session_monitor (plugin,
526                           session,
527                           GNUNET_TRANSPORT_SS_UP);
528   return GNUNET_OK;
529 }
530
531
532 /**
533  * Begin monitoring sessions of a plugin.  There can only
534  * be one active monitor per plugin (i.e. if there are
535  * multiple monitors, the transport service needs to
536  * multiplex the generated events over all of them).
537  *
538  * @param cls closure of the plugin
539  * @param sic callback to invoke, NULL to disable monitor;
540  *            plugin will being by iterating over all active
541  *            sessions immediately and then enter monitor mode
542  * @param sic_cls closure for @a sic
543  */
544 static void
545 udp_plugin_setup_monitor (void *cls,
546                           GNUNET_TRANSPORT_SessionInfoCallback sic,
547                           void *sic_cls)
548 {
549   struct Plugin *plugin = cls;
550
551   plugin->sic = sic;
552   plugin->sic_cls = sic_cls;
553   if (NULL != sic)
554   {
555     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
556                                            &send_session_info_iter,
557                                            plugin);
558     /* signal end of first iteration */
559     sic (sic_cls,
560          NULL,
561          NULL);
562   }
563 }
564
565
566 /* ****************** Little Helpers ****************** */
567
568
569 /**
570  * Function to free last resources associated with a session.
571  *
572  * @param s session to free
573  */
574 static void
575 free_session (struct Session *s)
576 {
577   if (NULL != s->address)
578   {
579     GNUNET_HELLO_address_free (s->address);
580     s->address = NULL;
581   }
582   if (NULL != s->frag_ctx)
583   {
584     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag,
585                                      NULL,
586                                      NULL);
587     GNUNET_free (s->frag_ctx);
588     s->frag_ctx = NULL;
589   }
590   GNUNET_free (s);
591 }
592
593
594 /**
595  * Function that is called to get the keepalive factor.
596  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
597  * calculate the interval between keepalive packets.
598  *
599  * @param cls closure with the `struct Plugin`
600  * @return keepalive factor
601  */
602 static unsigned int
603 udp_query_keepalive_factor (void *cls)
604 {
605   return 15;
606 }
607
608
609 /**
610  * Function obtain the network type for a session
611  *
612  * @param cls closure (`struct Plugin *`)
613  * @param session the session
614  * @return the network type
615  */
616 static enum GNUNET_ATS_Network_Type
617 udp_get_network (void *cls,
618                  struct Session *session)
619 {
620   return session->scope;
621 }
622
623
624 /* ******************* Event loop ******************** */
625
626 /**
627  * We have been notified that our readset has something to read.  We don't
628  * know which socket needs to be read, so we have to check each one
629  * Then reschedule this function to be called again once more is available.
630  *
631  * @param cls the plugin handle
632  * @param tc the scheduling context (for rescheduling this function again)
633  */
634 static void
635 udp_plugin_select_v4 (void *cls,
636                       const struct GNUNET_SCHEDULER_TaskContext *tc);
637
638
639 /**
640  * We have been notified that our readset has something to read.  We don't
641  * know which socket needs to be read, so we have to check each one
642  * Then reschedule this function to be called again once more is available.
643  *
644  * @param cls the plugin handle
645  * @param tc the scheduling context (for rescheduling this function again)
646  */
647 static void
648 udp_plugin_select_v6 (void *cls,
649                       const struct GNUNET_SCHEDULER_TaskContext *tc);
650
651
652 /**
653  * (re)schedule IPv4-select tasks for this plugin.
654  *
655  * @param plugin plugin to reschedule
656  */
657 static void
658 schedule_select_v4 (struct Plugin *plugin)
659 {
660   struct GNUNET_TIME_Relative min_delay;
661   struct UDP_MessageWrapper *udpw;
662
663   if ( (GNUNET_YES == plugin->enable_ipv4) &&
664        (NULL != plugin->sockv4) )
665   {
666     /* Find a message ready to send:
667      * Flow delay from other peer is expired or not set (0) */
668     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
669     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
670       min_delay = GNUNET_TIME_relative_min (min_delay,
671                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
672     if (NULL != plugin->select_task_v4)
673       GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
674     plugin->select_task_v4
675       = GNUNET_SCHEDULER_add_read_net (min_delay,
676                                        plugin->sockv4,
677                                        &udp_plugin_select_v4,
678                                        plugin);
679   }
680 }
681
682
683 /**
684  * (re)schedule IPv6-select tasks for this plugin.
685  *
686  * @param plugin plugin to reschedule
687  */
688 static void
689 schedule_select_v6 (struct Plugin *plugin)
690 {
691   struct GNUNET_TIME_Relative min_delay;
692   struct UDP_MessageWrapper *udpw;
693
694   if ( (GNUNET_YES == plugin->enable_ipv6) &&
695        (NULL != plugin->sockv6) )
696   {
697     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
698     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
699       min_delay = GNUNET_TIME_relative_min (min_delay,
700                                             GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer));
701     if (NULL != plugin->select_task_v6)
702       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
703     plugin->select_task_v6
704       = GNUNET_SCHEDULER_add_read_net (min_delay,
705                                        plugin->sockv6,
706                                        &udp_plugin_select_v6,
707                                        plugin);
708   }
709 }
710
711
712 /* ******************* Address to string and back ***************** */
713
714
715 /**
716  * Function called for a quick conversion of the binary address to
717  * a numeric address.  Note that the caller must not free the
718  * address and that the next call to this function is allowed
719  * to override the address again.
720  *
721  * @param cls closure
722  * @param addr binary address (a `union UdpAddress`)
723  * @param addrlen length of the @a addr
724  * @return string representing the same address
725  */
726 const char *
727 udp_address_to_string (void *cls,
728                        const void *addr,
729                        size_t addrlen)
730 {
731   static char rbuf[INET6_ADDRSTRLEN + 10];
732   char buf[INET6_ADDRSTRLEN];
733   const void *sb;
734   struct in_addr a4;
735   struct in6_addr a6;
736   const struct IPv4UdpAddress *t4;
737   const struct IPv6UdpAddress *t6;
738   int af;
739   uint16_t port;
740   uint32_t options;
741
742   if (NULL == addr)
743   {
744     GNUNET_break_op (0);
745     return NULL;
746   }
747
748   if (addrlen == sizeof(struct IPv6UdpAddress))
749   {
750     t6 = addr;
751     af = AF_INET6;
752     options = ntohl (t6->options);
753     port = ntohs (t6->u6_port);
754     a6 = t6->ipv6_addr;
755     sb = &a6;
756   }
757   else if (addrlen == sizeof(struct IPv4UdpAddress))
758   {
759     t4 = addr;
760     af = AF_INET;
761     options = ntohl (t4->options);
762     port = ntohs (t4->u4_port);
763     a4.s_addr = t4->ipv4_addr;
764     sb = &a4;
765   }
766   else
767   {
768     GNUNET_break_op (0);
769     return NULL;
770   }
771   inet_ntop (af,
772              sb,
773              buf,
774              INET6_ADDRSTRLEN);
775   GNUNET_snprintf (rbuf,
776                    sizeof(rbuf),
777                    (af == AF_INET6)
778                    ? "%s.%u.[%s]:%u"
779                    : "%s.%u.%s:%u",
780                    PLUGIN_NAME,
781                    options,
782                    buf,
783                    port);
784   return rbuf;
785 }
786
787
788 /**
789  * Function called to convert a string address to a binary address.
790  *
791  * @param cls closure (`struct Plugin *`)
792  * @param addr string address
793  * @param addrlen length of the address
794  * @param buf location to store the buffer
795  * @param added location to store the number of bytes in the buffer.
796  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
797  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
798  */
799 static int
800 udp_string_to_address (void *cls,
801                        const char *addr,
802                        uint16_t addrlen,
803                        void **buf,
804                        size_t *added)
805 {
806   struct sockaddr_storage socket_address;
807   char *address;
808   char *plugin;
809   char *optionstr;
810   uint32_t options;
811
812   /* Format tcp.options.address:port */
813   address = NULL;
814   plugin = NULL;
815   optionstr = NULL;
816
817   if ((NULL == addr) || (0 == addrlen))
818   {
819     GNUNET_break (0);
820     return GNUNET_SYSERR;
821   }
822   if ('\0' != addr[addrlen - 1])
823   {
824     GNUNET_break (0);
825     return GNUNET_SYSERR;
826   }
827   if (strlen (addr) != addrlen - 1)
828   {
829     GNUNET_break (0);
830     return GNUNET_SYSERR;
831   }
832   plugin = GNUNET_strdup (addr);
833   optionstr = strchr (plugin, '.');
834   if (NULL == optionstr)
835   {
836     GNUNET_break (0);
837     GNUNET_free (plugin);
838     return GNUNET_SYSERR;
839   }
840   optionstr[0] = '\0';
841   optionstr++;
842   options = atol (optionstr);
843   address = strchr (optionstr, '.');
844   if (NULL == address)
845   {
846     GNUNET_break (0);
847     GNUNET_free (plugin);
848     return GNUNET_SYSERR;
849   }
850   address[0] = '\0';
851   address++;
852
853   if (GNUNET_OK !=
854       GNUNET_STRINGS_to_address_ip (address,
855                                     strlen (address),
856                                     &socket_address))
857   {
858     GNUNET_break (0);
859     GNUNET_free (plugin);
860     return GNUNET_SYSERR;
861   }
862   GNUNET_free(plugin);
863
864   switch (socket_address.ss_family)
865   {
866   case AF_INET:
867     {
868       struct IPv4UdpAddress *u4;
869       const struct sockaddr_in *in4 = (const struct sockaddr_in *) &socket_address;
870
871       u4 = GNUNET_new (struct IPv4UdpAddress);
872       u4->options = htonl (options);
873       u4->ipv4_addr = in4->sin_addr.s_addr;
874       u4->u4_port = in4->sin_port;
875       *buf = u4;
876       *added = sizeof (struct IPv4UdpAddress);
877       return GNUNET_OK;
878     }
879   case AF_INET6:
880     {
881       struct IPv6UdpAddress *u6;
882       const struct sockaddr_in6 *in6 = (const struct sockaddr_in6 *) &socket_address;
883
884       u6 = GNUNET_new (struct IPv6UdpAddress);
885       u6->options = htonl (options);
886       u6->ipv6_addr = in6->sin6_addr;
887       u6->u6_port = in6->sin6_port;
888       *buf = u6;
889       *added = sizeof (struct IPv6UdpAddress);
890       return GNUNET_OK;
891     }
892   default:
893     GNUNET_break (0);
894     return GNUNET_SYSERR;
895   }
896 }
897
898
899 /**
900  * Append our port and forward the result.
901  *
902  * @param cls a `struct PrettyPrinterContext *`
903  * @param hostname result from DNS resolver
904  */
905 static void
906 append_port (void *cls,
907              const char *hostname)
908 {
909   struct PrettyPrinterContext *ppc = cls;
910   struct Plugin *plugin = ppc->plugin;
911   char *ret;
912
913   if (NULL == hostname)
914   {
915     /* Final call, done */
916     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
917                                  plugin->ppc_dll_tail,
918                                  ppc);
919     ppc->resolver_handle = NULL;
920     ppc->asc (ppc->asc_cls,
921               NULL,
922               GNUNET_OK);
923     GNUNET_free (ppc);
924     return;
925   }
926   if (GNUNET_YES == ppc->ipv6)
927     GNUNET_asprintf (&ret,
928                      "%s.%u.[%s]:%d",
929                      PLUGIN_NAME,
930                      ppc->options,
931                      hostname,
932                      ppc->port);
933   else
934     GNUNET_asprintf (&ret,
935                      "%s.%u.%s:%d",
936                      PLUGIN_NAME,
937                      ppc->options,
938                      hostname,
939                      ppc->port);
940   ppc->asc (ppc->asc_cls,
941             ret,
942             GNUNET_OK);
943   GNUNET_free (ret);
944 }
945
946
947 /**
948  * Convert the transports address to a nice, human-readable format.
949  *
950  * @param cls closure with the `struct Plugin *`
951  * @param type name of the transport that generated the address
952  * @param addr one of the addresses of the host, NULL for the last address
953  *        the specific address format depends on the transport;
954  *        a `union UdpAddress`
955  * @param addrlen length of the address
956  * @param numeric should (IP) addresses be displayed in numeric form?
957  * @param timeout after how long should we give up?
958  * @param asc function to call on each string
959  * @param asc_cls closure for @a asc
960  */
961 static void
962 udp_plugin_address_pretty_printer (void *cls,
963                                    const char *type,
964                                    const void *addr,
965                                    size_t addrlen,
966                                    int numeric,
967                                    struct GNUNET_TIME_Relative timeout,
968                                    GNUNET_TRANSPORT_AddressStringCallback asc,
969                                    void *asc_cls)
970 {
971   struct Plugin *plugin = cls;
972   struct PrettyPrinterContext *ppc;
973   const struct sockaddr *sb;
974   size_t sbs;
975   struct sockaddr_in a4;
976   struct sockaddr_in6 a6;
977   const struct IPv4UdpAddress *u4;
978   const struct IPv6UdpAddress *u6;
979   uint16_t port;
980   uint32_t options;
981
982   if (addrlen == sizeof(struct IPv6UdpAddress))
983   {
984     u6 = addr;
985     memset (&a6,
986             0,
987             sizeof (a6));
988     a6.sin6_family = AF_INET6;
989 #if HAVE_SOCKADDR_IN_SIN_LEN
990     a6.sin6_len = sizeof (a6);
991 #endif
992     a6.sin6_port = u6->u6_port;
993     a6.sin6_addr = u6->ipv6_addr;
994     port = ntohs (u6->u6_port);
995     options = ntohl (u6->options);
996     sb = (const struct sockaddr *) &a6;
997     sbs = sizeof (a6);
998   }
999   else if (addrlen == sizeof (struct IPv4UdpAddress))
1000   {
1001     u4 = addr;
1002     memset (&a4,
1003             0,
1004             sizeof(a4));
1005     a4.sin_family = AF_INET;
1006 #if HAVE_SOCKADDR_IN_SIN_LEN
1007     a4.sin_len = sizeof (a4);
1008 #endif
1009     a4.sin_port = u4->u4_port;
1010     a4.sin_addr.s_addr = u4->ipv4_addr;
1011     port = ntohs (u4->u4_port);
1012     options = ntohl (u4->options);
1013     sb = (const struct sockaddr *) &a4;
1014     sbs = sizeof(a4);
1015   }
1016   else
1017   {
1018     /* invalid address */
1019     GNUNET_break_op (0);
1020     asc (asc_cls,
1021          NULL,
1022          GNUNET_SYSERR);
1023     asc (asc_cls,
1024          NULL,
1025          GNUNET_OK);
1026     return;
1027   }
1028   ppc = GNUNET_new (struct PrettyPrinterContext);
1029   ppc->plugin = plugin;
1030   ppc->asc = asc;
1031   ppc->asc_cls = asc_cls;
1032   ppc->port = port;
1033   ppc->options = options;
1034   if (addrlen == sizeof (struct IPv6UdpAddress))
1035     ppc->ipv6 = GNUNET_YES;
1036   else
1037     ppc->ipv6 = GNUNET_NO;
1038   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
1039                                plugin->ppc_dll_tail,
1040                                ppc);
1041   ppc->resolver_handle
1042     = GNUNET_RESOLVER_hostname_get (sb,
1043                                     sbs,
1044                                     ! numeric,
1045                                     timeout,
1046                                     &append_port,
1047                                     ppc);
1048 }
1049
1050
1051 /**
1052  * Check if the given port is plausible (must be either our listen
1053  * port or our advertised port).  If it is neither, we return
1054  * #GNUNET_SYSERR.
1055  *
1056  * @param plugin global variables
1057  * @param in_port port number to check
1058  * @return #GNUNET_OK if port is either our open or advertised port
1059  */
1060 static int
1061 check_port (const struct Plugin *plugin,
1062             uint16_t in_port)
1063 {
1064   if ( (plugin->port == in_port) ||
1065        (plugin->aport == in_port) )
1066     return GNUNET_OK;
1067   return GNUNET_SYSERR;
1068 }
1069
1070
1071 /**
1072  * Function that will be called to check if a binary address for this
1073  * plugin is well-formed and corresponds to an address for THIS peer
1074  * (as per our configuration).  Naturally, if absolutely necessary,
1075  * plugins can be a bit conservative in their answer, but in general
1076  * plugins should make sure that the address does not redirect
1077  * traffic to a 3rd party that might try to man-in-the-middle our
1078  * traffic.
1079  *
1080  * @param cls closure, should be our handle to the Plugin
1081  * @param addr pointer to a `union UdpAddress`
1082  * @param addrlen length of @a addr
1083  * @return #GNUNET_OK if this is a plausible address for this peer
1084  *         and transport, #GNUNET_SYSERR if not
1085  */
1086 static int
1087 udp_plugin_check_address (void *cls,
1088                           const void *addr,
1089                           size_t addrlen)
1090 {
1091   struct Plugin *plugin = cls;
1092   const struct IPv4UdpAddress *v4;
1093   const struct IPv6UdpAddress *v6;
1094
1095   if (sizeof(struct IPv4UdpAddress) == addrlen)
1096   {
1097     v4 = (const struct IPv4UdpAddress *) addr;
1098     if (GNUNET_OK != check_port (plugin,
1099                                  ntohs (v4->u4_port)))
1100       return GNUNET_SYSERR;
1101     if (GNUNET_OK !=
1102         GNUNET_NAT_test_address (plugin->nat,
1103                                  &v4->ipv4_addr,
1104                                  sizeof (struct in_addr)))
1105       return GNUNET_SYSERR;
1106   }
1107   else if (sizeof(struct IPv6UdpAddress) == addrlen)
1108   {
1109     v6 = (const struct IPv6UdpAddress *) addr;
1110     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1111     {
1112       GNUNET_break_op (0);
1113       return GNUNET_SYSERR;
1114     }
1115     if (GNUNET_OK != check_port (plugin,
1116                                  ntohs (v6->u6_port)))
1117       return GNUNET_SYSERR;
1118     if (GNUNET_OK !=
1119         GNUNET_NAT_test_address (plugin->nat,
1120                                  &v6->ipv6_addr,
1121                                  sizeof (struct in6_addr)))
1122       return GNUNET_SYSERR;
1123   }
1124   else
1125   {
1126     GNUNET_break_op (0);
1127     return GNUNET_SYSERR;
1128   }
1129   return GNUNET_OK;
1130 }
1131
1132
1133 /**
1134  * Our external IP address/port mapping has changed.
1135  *
1136  * @param cls closure, the `struct Plugin`
1137  * @param add_remove #GNUNET_YES to mean the new public IP address,
1138  *                   #GNUNET_NO to mean the previous (now invalid) one
1139  * @param addr either the previous or the new public IP address
1140  * @param addrlen actual length of the @a addr
1141  */
1142 static void
1143 udp_nat_port_map_callback (void *cls,
1144                            int add_remove,
1145                            const struct sockaddr *addr,
1146                            socklen_t addrlen)
1147 {
1148   struct Plugin *plugin = cls;
1149   struct GNUNET_HELLO_Address *address;
1150   struct IPv4UdpAddress u4;
1151   struct IPv6UdpAddress u6;
1152   void *arg;
1153   size_t args;
1154
1155   LOG (GNUNET_ERROR_TYPE_DEBUG,
1156        (GNUNET_YES == add_remove)
1157        ? "NAT notification to add address `%s'\n"
1158        : "NAT notification to remove address `%s'\n",
1159        GNUNET_a2s (addr,
1160                    addrlen));
1161   /* convert 'address' to our internal format */
1162   switch (addr->sa_family)
1163   {
1164   case AF_INET:
1165     {
1166       const struct sockaddr_in *i4;
1167
1168       GNUNET_assert (sizeof(struct sockaddr_in) == addrlen);
1169       i4 = (const struct sockaddr_in *) addr;
1170       if (0 == ntohs (i4->sin_port))
1171       {
1172         GNUNET_break (0);
1173         return;
1174       }
1175       memset (&u4,
1176               0,
1177               sizeof(u4));
1178       u4.options = htonl (plugin->myoptions);
1179       u4.ipv4_addr = i4->sin_addr.s_addr;
1180       u4.u4_port = i4->sin_port;
1181       arg = &u4;
1182       args = sizeof (struct IPv4UdpAddress);
1183       break;
1184     }
1185   case AF_INET6:
1186     {
1187       const struct sockaddr_in6 *i6;
1188
1189       GNUNET_assert (sizeof(struct sockaddr_in6) == addrlen);
1190       i6 = (const struct sockaddr_in6 *) addr;
1191       if (0 == ntohs (i6->sin6_port))
1192       {
1193         GNUNET_break (0);
1194         return;
1195       }
1196       memset (&u6,
1197               0,
1198               sizeof(u6));
1199       u6.options = htonl (plugin->myoptions);
1200       u6.ipv6_addr = i6->sin6_addr;
1201       u6.u6_port = i6->sin6_port;
1202       arg = &u6;
1203       args = sizeof (struct IPv6UdpAddress);
1204       break;
1205     }
1206   default:
1207     GNUNET_break (0);
1208     return;
1209   }
1210   /* modify our published address list */
1211   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
1212                                            PLUGIN_NAME,
1213                                            arg,
1214                                            args,
1215                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
1216   plugin->env->notify_address (plugin->env->cls,
1217                                add_remove,
1218                                address);
1219   GNUNET_HELLO_address_free (address);
1220 }
1221
1222
1223 /* ********************* Finding sessions ******************* */
1224
1225
1226 /**
1227  * Closure for #session_cmp_it().
1228  */
1229 struct SessionCompareContext
1230 {
1231   /**
1232    * Set to session matching the address.
1233    */
1234   struct Session *res;
1235
1236   /**
1237    * Address we are looking for.
1238    */
1239   const struct GNUNET_HELLO_Address *address;
1240 };
1241
1242
1243 /**
1244  * Find a session with a matching address.
1245  *
1246  * @param cls the `struct SessionCompareContext *`
1247  * @param key peer identity (unused)
1248  * @param value the `struct Session *`
1249  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1250  */
1251 static int
1252 session_cmp_it (void *cls,
1253                 const struct GNUNET_PeerIdentity *key,
1254                 void *value)
1255 {
1256   struct SessionCompareContext *cctx = cls;
1257   struct Session *s = value;
1258
1259   if (0 == GNUNET_HELLO_address_cmp (s->address,
1260                                      cctx->address))
1261   {
1262     GNUNET_assert (GNUNET_NO == s->in_destroy);
1263     cctx->res = s;
1264     return GNUNET_NO;
1265   }
1266   return GNUNET_OK;
1267 }
1268
1269
1270 /**
1271  * Locate an existing session the transport service is using to
1272  * send data to another peer.  Performs some basic sanity checks
1273  * on the address and then tries to locate a matching session.
1274  *
1275  * @param cls the plugin
1276  * @param address the address we should locate the session by
1277  * @return the session if it exists, or NULL if it is not found
1278  */
1279 static struct Session *
1280 udp_plugin_lookup_session (void *cls,
1281                            const struct GNUNET_HELLO_Address *address)
1282 {
1283   struct Plugin *plugin = cls;
1284   const struct IPv6UdpAddress *udp_a6;
1285   const struct IPv4UdpAddress *udp_a4;
1286   struct SessionCompareContext cctx;
1287
1288   if (NULL == address->address)
1289   {
1290     GNUNET_break (0);
1291     return NULL;
1292   }
1293   if (sizeof(struct IPv4UdpAddress) == address->address_length)
1294   {
1295     if (NULL == plugin->sockv4)
1296       return NULL;
1297     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1298     if (0 == udp_a4->u4_port)
1299     {
1300       GNUNET_break (0);
1301       return NULL;
1302     }
1303   }
1304   else if (sizeof(struct IPv6UdpAddress) == address->address_length)
1305   {
1306     if (NULL == plugin->sockv6)
1307       return NULL;
1308     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1309     if (0 == udp_a6->u6_port)
1310     {
1311       GNUNET_break (0);
1312       return NULL;
1313     }
1314   }
1315   else
1316   {
1317     GNUNET_break (0);
1318     return NULL;
1319   }
1320
1321   /* check if session already exists */
1322   cctx.address = address;
1323   cctx.res = NULL;
1324   LOG (GNUNET_ERROR_TYPE_DEBUG,
1325        "Looking for existing session for peer `%s' with address `%s'\n",
1326        GNUNET_i2s (&address->peer),
1327        udp_address_to_string (plugin,
1328                               address->address,
1329                               address->address_length));
1330   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1331                                               &address->peer,
1332                                               &session_cmp_it,
1333                                               &cctx);
1334   if (NULL == cctx.res)
1335     return NULL;
1336   LOG (GNUNET_ERROR_TYPE_DEBUG,
1337        "Found existing session %p\n",
1338        cctx.res);
1339   return cctx.res;
1340 }
1341
1342
1343 /* ********************** Timeout ****************** */
1344
1345
1346 /**
1347  * Increment session timeout due to activity.
1348  *
1349  * @param s session to reschedule timeout activity for
1350  */
1351 static void
1352 reschedule_session_timeout (struct Session *s)
1353 {
1354   if (GNUNET_YES == s->in_destroy)
1355     return;
1356   GNUNET_assert (NULL != s->timeout_task);
1357   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1358 }
1359
1360
1361
1362 /**
1363  * Function that will be called whenever the transport service wants to
1364  * notify the plugin that a session is still active and in use and
1365  * therefore the session timeout for this session has to be updated
1366  *
1367  * @param cls closure with the `struct Plugin`
1368  * @param peer which peer was the session for
1369  * @param session which session is being updated
1370  */
1371 static void
1372 udp_plugin_update_session_timeout (void *cls,
1373                                    const struct GNUNET_PeerIdentity *peer,
1374                                    struct Session *session)
1375 {
1376   struct Plugin *plugin = cls;
1377
1378   if (GNUNET_YES !=
1379       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1380                                                     peer,
1381                                                     session))
1382   {
1383     GNUNET_break (0);
1384     return;
1385   }
1386   /* Reschedule session timeout */
1387   reschedule_session_timeout (session);
1388 }
1389
1390
1391 /* ************************* Sending ************************ */
1392
1393
1394 /**
1395  * Remove the given message from the transmission queue and
1396  * update all applicable statistics.
1397  *
1398  * @param plugin the UDP plugin
1399  * @param udpw message wrapper to dequeue
1400  */
1401 static void
1402 dequeue (struct Plugin *plugin,
1403          struct UDP_MessageWrapper *udpw)
1404 {
1405   struct Session *session = udpw->session;
1406
1407   if (plugin->bytes_in_buffer < udpw->msg_size)
1408   {
1409     GNUNET_break (0);
1410   }
1411   else
1412   {
1413     GNUNET_STATISTICS_update (plugin->env->stats,
1414                               "# UDP, total bytes in send buffers",
1415                               - (long long) udpw->msg_size,
1416                               GNUNET_NO);
1417     plugin->bytes_in_buffer -= udpw->msg_size;
1418   }
1419   GNUNET_STATISTICS_update (plugin->env->stats,
1420                             "# UDP, total messages in send buffers",
1421                             -1,
1422                             GNUNET_NO);
1423   if (sizeof(struct IPv4UdpAddress) == udpw->session->address->address_length)
1424   {
1425     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1426                                  plugin->ipv4_queue_tail,
1427                                  udpw);
1428   }
1429   else if (sizeof(struct IPv6UdpAddress) == udpw->session->address->address_length)
1430   {
1431     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1432                                  plugin->ipv6_queue_tail,
1433                                  udpw);
1434   }
1435   else
1436   {
1437     GNUNET_break (0);
1438     return;
1439   }
1440   GNUNET_assert (session->msgs_in_queue > 0);
1441   session->msgs_in_queue--;
1442   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1443   session->bytes_in_queue -= udpw->msg_size;
1444 }
1445
1446
1447 /**
1448  * Enqueue a message for transmission and update statistics.
1449  *
1450  * @param plugin the UDP plugin
1451  * @param udpw message wrapper to queue
1452  */
1453 static void
1454 enqueue (struct Plugin *plugin,
1455          struct UDP_MessageWrapper *udpw)
1456 {
1457   struct Session *session = udpw->session;
1458
1459   if (GNUNET_YES == session->in_destroy)
1460   {
1461     GNUNET_break (0);
1462     return;
1463   }
1464   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1465   {
1466     GNUNET_break (0);
1467   }
1468   else
1469   {
1470     GNUNET_STATISTICS_update (plugin->env->stats,
1471                               "# UDP, total bytes in send buffers",
1472                               udpw->msg_size,
1473                               GNUNET_NO);
1474     plugin->bytes_in_buffer += udpw->msg_size;
1475   }
1476   GNUNET_STATISTICS_update (plugin->env->stats,
1477                             "# UDP, total messages in send buffers",
1478                             1,
1479                             GNUNET_NO);
1480   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
1481   {
1482     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1483                                 plugin->ipv4_queue_tail,
1484                                 udpw);
1485   }
1486   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
1487   {
1488     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1489                                  plugin->ipv6_queue_tail,
1490                                  udpw);
1491   }
1492   else
1493   {
1494     GNUNET_break (0);
1495     udpw->cont (udpw->cont_cls,
1496                 &session->target,
1497                 GNUNET_SYSERR,
1498                 udpw->msg_size,
1499                 0);
1500     GNUNET_free (udpw);
1501     return;
1502   }
1503   session->msgs_in_queue++;
1504   session->bytes_in_queue += udpw->msg_size;
1505 }
1506
1507
1508 /**
1509  * We have completed our (attempt) to transmit a message that had to
1510  * be fragmented -- either because we got an ACK saying that all
1511  * fragments were received, or because of timeout / disconnect.  Clean
1512  * up our state.
1513  *
1514  * @param frag_ctx fragmentation context to clean up
1515  * @param result #GNUNET_OK if we succeeded (got ACK),
1516  *               #GNUNET_SYSERR if the transmission failed
1517  */
1518 static void
1519 fragmented_message_done (struct UDP_FragmentationContext *frag_ctx,
1520                          int result)
1521 {
1522   struct Plugin *plugin = frag_ctx->plugin;
1523   struct Session *s = frag_ctx->session;
1524   struct UDP_MessageWrapper *udpw;
1525   struct UDP_MessageWrapper *tmp;
1526   size_t overhead;
1527
1528   LOG (GNUNET_ERROR_TYPE_DEBUG,
1529        "%p: Fragmented message removed with result %s\n",
1530        frag_ctx,
1531        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1532   /* Call continuation for fragmented message */
1533   if (frag_ctx->on_wire_size >= frag_ctx->payload_size)
1534     overhead = frag_ctx->on_wire_size - frag_ctx->payload_size;
1535   else
1536     overhead = frag_ctx->on_wire_size;
1537   if (NULL != frag_ctx->cont)
1538     frag_ctx->cont (frag_ctx->cont_cls,
1539                     &s->target,
1540                     result,
1541                     s->frag_ctx->payload_size,
1542                     frag_ctx->on_wire_size);
1543   GNUNET_STATISTICS_update (plugin->env->stats,
1544                             "# UDP, fragmented messages active",
1545                             -1,
1546                             GNUNET_NO);
1547
1548   if (GNUNET_OK == result)
1549   {
1550     GNUNET_STATISTICS_update (plugin->env->stats,
1551                               "# UDP, fragmented msgs, messages, sent, success",
1552                               1,
1553                               GNUNET_NO);
1554     GNUNET_STATISTICS_update (plugin->env->stats,
1555                               "# UDP, fragmented msgs, bytes payload, sent, success",
1556                               s->frag_ctx->payload_size,
1557                               GNUNET_NO);
1558     GNUNET_STATISTICS_update (plugin->env->stats,
1559                               "# UDP, fragmented msgs, bytes overhead, sent, success",
1560                               overhead,
1561                               GNUNET_NO);
1562     GNUNET_STATISTICS_update (plugin->env->stats,
1563                               "# UDP, total, bytes overhead, sent",
1564                               overhead,
1565                               GNUNET_NO);
1566     GNUNET_STATISTICS_update (plugin->env->stats,
1567                               "# UDP, total, bytes payload, sent",
1568                               s->frag_ctx->payload_size,
1569                               GNUNET_NO);
1570   }
1571   else
1572   {
1573     GNUNET_STATISTICS_update (plugin->env->stats,
1574                               "# UDP, fragmented msgs, messages, sent, failure",
1575                               1,
1576                               GNUNET_NO);
1577     GNUNET_STATISTICS_update (plugin->env->stats,
1578                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1579                               s->frag_ctx->payload_size,
1580                               GNUNET_NO);
1581     GNUNET_STATISTICS_update (plugin->env->stats,
1582                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1583                               overhead,
1584                               GNUNET_NO);
1585     GNUNET_STATISTICS_update (plugin->env->stats,
1586                               "# UDP, fragmented msgs, bytes payload, sent, failure",
1587                               overhead,
1588                               GNUNET_NO);
1589   }
1590
1591   /* Remove remaining fragments from queue, no need to transmit those
1592      any longer. */
1593   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1594   {
1595     udpw = plugin->ipv6_queue_head;
1596     while (NULL != udpw)
1597     {
1598       tmp = udpw->next;
1599       if ( (udpw->frag_ctx != NULL) &&
1600            (udpw->frag_ctx == frag_ctx) )
1601       {
1602         dequeue (plugin,
1603                  udpw);
1604         GNUNET_free (udpw);
1605       }
1606       udpw = tmp;
1607     }
1608   }
1609   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1610   {
1611     udpw = plugin->ipv4_queue_head;
1612     while (NULL != udpw)
1613     {
1614       tmp = udpw->next;
1615       if ( (NULL != udpw->frag_ctx) &&
1616            (udpw->frag_ctx == frag_ctx) )
1617       {
1618         dequeue (plugin,
1619                  udpw);
1620         GNUNET_free (udpw);
1621       }
1622       udpw = tmp;
1623     }
1624   }
1625   notify_session_monitor (s->plugin,
1626                           s,
1627                           GNUNET_TRANSPORT_SS_UPDATE);
1628   GNUNET_FRAGMENT_context_destroy (frag_ctx->frag,
1629                                    &s->last_expected_msg_delay,
1630                                    &s->last_expected_ack_delay);
1631   s->frag_ctx = NULL;
1632   GNUNET_free (frag_ctx);
1633 }
1634
1635
1636 /**
1637  * We are finished with a fragment in the message queue.
1638  * Notify the continuation and update statistics.
1639  *
1640  * @param cls the `struct Plugin *`
1641  * @param udpw the queue entry
1642  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1643  */
1644 static void
1645 qc_fragment_sent (void *cls,
1646                   struct UDP_MessageWrapper *udpw,
1647                   int result)
1648 {
1649   struct Plugin *plugin = cls;
1650
1651   GNUNET_assert (NULL != udpw->frag_ctx);
1652   if (GNUNET_OK == result)
1653   {
1654     GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1655     GNUNET_STATISTICS_update (plugin->env->stats,
1656                               "# UDP, fragmented msgs, fragments, sent, success",
1657                               1,
1658                               GNUNET_NO);
1659     GNUNET_STATISTICS_update (plugin->env->stats,
1660                               "# UDP, fragmented msgs, fragments bytes, sent, success",
1661                               udpw->msg_size,
1662                               GNUNET_NO);
1663   }
1664   else
1665   {
1666     fragmented_message_done (udpw->frag_ctx,
1667                              GNUNET_SYSERR);
1668     GNUNET_STATISTICS_update (plugin->env->stats,
1669                               "# UDP, fragmented msgs, fragments, sent, failure",
1670                               1,
1671                               GNUNET_NO);
1672     GNUNET_STATISTICS_update (plugin->env->stats,
1673                               "# UDP, fragmented msgs, fragments bytes, sent, failure",
1674                               udpw->msg_size,
1675                               GNUNET_NO);
1676   }
1677 }
1678
1679
1680 /**
1681  * Function that is called with messages created by the fragmentation
1682  * module.  In the case of the `proc` callback of the
1683  * #GNUNET_FRAGMENT_context_create() function, this function must
1684  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1685  *
1686  * @param cls closure, the `struct UDP_FragmentationContext`
1687  * @param msg the message that was created
1688  */
1689 static void
1690 enqueue_fragment (void *cls,
1691                   const struct GNUNET_MessageHeader *msg)
1692 {
1693   struct UDP_FragmentationContext *frag_ctx = cls;
1694   struct Plugin *plugin = frag_ctx->plugin;
1695   struct UDP_MessageWrapper *udpw;
1696   struct Session *session = frag_ctx->session;
1697   size_t msg_len = ntohs (msg->size);
1698
1699   LOG (GNUNET_ERROR_TYPE_DEBUG,
1700        "Enqueuing fragment with %u bytes\n",
1701        msg_len);
1702   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
1703   udpw->session = session;
1704   udpw->msg_buf = (char *) &udpw[1];
1705   udpw->msg_size = msg_len;
1706   udpw->payload_size = msg_len; /* FIXME: minus fragment overhead */
1707   udpw->timeout = frag_ctx->timeout;
1708   udpw->frag_ctx = frag_ctx;
1709   udpw->qc = &qc_fragment_sent;
1710   udpw->qc_cls = plugin;
1711   memcpy (udpw->msg_buf,
1712           msg,
1713           msg_len);
1714   enqueue (plugin,
1715            udpw);
1716   if (sizeof (struct IPv4UdpAddress) == session->address->address_length)
1717     schedule_select_v4 (plugin);
1718   else
1719     schedule_select_v6 (plugin);
1720 }
1721
1722
1723 /**
1724  * We are finished with a message from the message queue.
1725  * Notify the continuation and update statistics.
1726  *
1727  * @param cls the `struct Plugin *`
1728  * @param udpw the queue entry
1729  * @param result #GNUNET_OK on success, #GNUNET_SYSERR on failure
1730  */
1731 static void
1732 qc_message_sent (void *cls,
1733                  struct UDP_MessageWrapper *udpw,
1734                  int result)
1735 {
1736   struct Plugin *plugin = cls;
1737   size_t overhead;
1738
1739   if (udpw->msg_size >= udpw->payload_size)
1740     overhead = udpw->msg_size - udpw->payload_size;
1741   else
1742     overhead = udpw->msg_size;
1743
1744   if (NULL != udpw->cont)
1745     udpw->cont (udpw->cont_cls,
1746                 &udpw->session->target,
1747                 result,
1748                 udpw->payload_size,
1749                 overhead);
1750   if (GNUNET_OK == result)
1751   {
1752     GNUNET_STATISTICS_update (plugin->env->stats,
1753                               "# UDP, unfragmented msgs, messages, sent, success",
1754                               1,
1755                               GNUNET_NO);
1756     GNUNET_STATISTICS_update (plugin->env->stats,
1757                               "# UDP, unfragmented msgs, bytes payload, sent, success",
1758                               udpw->payload_size,
1759                               GNUNET_NO);
1760     GNUNET_STATISTICS_update (plugin->env->stats,
1761                               "# UDP, unfragmented msgs, bytes overhead, sent, success",
1762                               overhead,
1763                               GNUNET_NO);
1764     GNUNET_STATISTICS_update (plugin->env->stats,
1765                               "# UDP, total, bytes overhead, sent",
1766                               overhead,
1767                               GNUNET_NO);
1768     GNUNET_STATISTICS_update (plugin->env->stats,
1769                               "# UDP, total, bytes payload, sent",
1770                               udpw->payload_size,
1771                               GNUNET_NO);
1772   }
1773   else
1774   {
1775     GNUNET_STATISTICS_update (plugin->env->stats,
1776                               "# UDP, unfragmented msgs, messages, sent, failure",
1777                               1,
1778                               GNUNET_NO);
1779     GNUNET_STATISTICS_update (plugin->env->stats,
1780                               "# UDP, unfragmented msgs, bytes payload, sent, failure",
1781                               udpw->payload_size,
1782                               GNUNET_NO);
1783     GNUNET_STATISTICS_update (plugin->env->stats,
1784                               "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1785                               overhead,
1786                               GNUNET_NO);
1787   }
1788 }
1789
1790
1791 /**
1792  * Function that can be used by the transport service to transmit a
1793  * message using the plugin.  Note that in the case of a peer
1794  * disconnecting, the continuation MUST be called prior to the
1795  * disconnect notification itself.  This function will be called with
1796  * this peer's HELLO message to initiate a fresh connection to another
1797  * peer.
1798  *
1799  * @param cls closure
1800  * @param s which session must be used
1801  * @param msgbuf the message to transmit
1802  * @param msgbuf_size number of bytes in @a msgbuf
1803  * @param priority how important is the message (most plugins will
1804  *                 ignore message priority and just FIFO)
1805  * @param to how long to wait at most for the transmission (does not
1806  *                require plugins to discard the message after the timeout,
1807  *                just advisory for the desired delay; most plugins will ignore
1808  *                this as well)
1809  * @param cont continuation to call once the message has
1810  *        been transmitted (or if the transport is ready
1811  *        for the next transmission call; or if the
1812  *        peer disconnected...); can be NULL
1813  * @param cont_cls closure for @a cont
1814  * @return number of bytes used (on the physical network, with overheads);
1815  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1816  *         and does NOT mean that the message was not transmitted (DV)
1817  */
1818 static ssize_t
1819 udp_plugin_send (void *cls,
1820                  struct Session *s,
1821                  const char *msgbuf,
1822                  size_t msgbuf_size,
1823                  unsigned int priority,
1824                  struct GNUNET_TIME_Relative to,
1825                  GNUNET_TRANSPORT_TransmitContinuation cont,
1826                  void *cont_cls)
1827 {
1828   struct Plugin *plugin = cls;
1829   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
1830   struct UDP_FragmentationContext *frag_ctx;
1831   struct UDP_MessageWrapper *udpw;
1832   struct UDPMessage *udp;
1833   char mbuf[udpmlen] GNUNET_ALIGN;
1834
1835   if ( (sizeof(struct IPv6UdpAddress) == s->address->address_length) &&
1836        (NULL == plugin->sockv6) )
1837     return GNUNET_SYSERR;
1838   if ( (sizeof(struct IPv4UdpAddress) == s->address->address_length) &&
1839        (NULL == plugin->sockv4) )
1840     return GNUNET_SYSERR;
1841   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1842   {
1843     GNUNET_break (0);
1844     return GNUNET_SYSERR;
1845   }
1846   if (GNUNET_YES !=
1847       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1848                                                     &s->target,
1849                                                     s))
1850   {
1851     GNUNET_break (0);
1852     return GNUNET_SYSERR;
1853   }
1854   LOG (GNUNET_ERROR_TYPE_DEBUG,
1855        "UDP transmits %u-byte message to `%s' using address `%s'\n",
1856        udpmlen,
1857        GNUNET_i2s (&s->target),
1858        udp_address_to_string (plugin,
1859                               s->address->address,
1860                               s->address->address_length));
1861
1862   udp = (struct UDPMessage *) mbuf;
1863   udp->header.size = htons (udpmlen);
1864   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
1865   udp->reserved = htonl (0);
1866   udp->sender = *plugin->env->my_identity;
1867
1868   /* We do not update the session time out here!  Otherwise this
1869    * session will not timeout since we send keep alive before session
1870    * can timeout.
1871    *
1872    * For UDP we update session timeout only on receive, this will
1873    * cover keep alives, since remote peer will reply with keep alive
1874    * responses!
1875    */
1876   if (udpmlen <= UDP_MTU)
1877   {
1878     /* unfragmented message */
1879     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
1880     udpw->session = s;
1881     udpw->msg_buf = (char *) &udpw[1];
1882     udpw->msg_size = udpmlen; /* message size with UDP overhead */
1883     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
1884     udpw->timeout = GNUNET_TIME_relative_to_absolute (to);
1885     udpw->cont = cont;
1886     udpw->cont_cls = cont_cls;
1887     udpw->frag_ctx = NULL;
1888     udpw->qc = &qc_message_sent;
1889     udpw->qc_cls = plugin;
1890     memcpy (udpw->msg_buf,
1891             udp,
1892             sizeof (struct UDPMessage));
1893     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)],
1894             msgbuf,
1895             msgbuf_size);
1896     enqueue (plugin,
1897              udpw);
1898     GNUNET_STATISTICS_update (plugin->env->stats,
1899                               "# UDP, unfragmented messages queued total",
1900                               1,
1901                               GNUNET_NO);
1902     GNUNET_STATISTICS_update (plugin->env->stats,
1903                               "# UDP, unfragmented bytes payload queued total",
1904                               msgbuf_size,
1905                               GNUNET_NO);
1906   }
1907   else
1908   {
1909     /* fragmented message */
1910     if (NULL != s->frag_ctx)
1911       return GNUNET_SYSERR;
1912     memcpy (&udp[1],
1913             msgbuf,
1914             msgbuf_size);
1915     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
1916     frag_ctx->plugin = plugin;
1917     frag_ctx->session = s;
1918     frag_ctx->cont = cont;
1919     frag_ctx->cont_cls = cont_cls;
1920     frag_ctx->timeout = GNUNET_TIME_relative_to_absolute (to);
1921     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
1922     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
1923     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
1924                                                      UDP_MTU,
1925                                                      &plugin->tracker,
1926                                                      s->last_expected_msg_delay,
1927                                                      s->last_expected_ack_delay,
1928                                                      &udp->header,
1929                                                      &enqueue_fragment,
1930                                                      frag_ctx);
1931     s->frag_ctx = frag_ctx;
1932     GNUNET_STATISTICS_update (plugin->env->stats,
1933                               "# UDP, fragmented messages active",
1934                               1,
1935                               GNUNET_NO);
1936     GNUNET_STATISTICS_update (plugin->env->stats,
1937                               "# UDP, fragmented messages, total",
1938                               1,
1939                               GNUNET_NO);
1940     GNUNET_STATISTICS_update (plugin->env->stats,
1941                               "# UDP, fragmented bytes (payload)",
1942                               frag_ctx->payload_size,
1943                               GNUNET_NO);
1944   }
1945   notify_session_monitor (s->plugin,
1946                           s,
1947                           GNUNET_TRANSPORT_SS_UPDATE);
1948   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
1949     schedule_select_v4 (plugin);
1950   else
1951     schedule_select_v6 (plugin);
1952   return udpmlen;
1953 }
1954
1955
1956 /**
1957  * Handle an ACK message.
1958  *
1959  * @param plugin the UDP plugin
1960  * @param msg the (presumed) UDP ACK message
1961  * @param udp_addr sender address
1962  * @param udp_addr_len number of bytes in @a udp_addr
1963  */
1964 static void
1965 read_process_ack (struct Plugin *plugin,
1966                   const struct GNUNET_MessageHeader *msg,
1967                   const union UdpAddress *udp_addr,
1968                   socklen_t udp_addr_len)
1969 {
1970   const struct GNUNET_MessageHeader *ack;
1971   const struct UDP_ACK_Message *udp_ack;
1972   struct GNUNET_HELLO_Address *address;
1973   struct Session *s;
1974   struct GNUNET_TIME_Relative flow_delay;
1975
1976   if (ntohs (msg->size)
1977       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
1978   {
1979     GNUNET_break_op (0);
1980     return;
1981   }
1982   udp_ack = (const struct UDP_ACK_Message *) msg;
1983   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
1984   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
1985   {
1986     GNUNET_break_op(0);
1987     return;
1988   }
1989   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
1990                                            PLUGIN_NAME,
1991                                            udp_addr,
1992                                            udp_addr_len,
1993                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
1994   s = udp_plugin_lookup_session (plugin,
1995                                  address);
1996   if (NULL == s)
1997   {
1998     LOG (GNUNET_ERROR_TYPE_WARNING,
1999          "UDP session of address %s for ACK not found\n",
2000          udp_address_to_string (plugin,
2001                                 address->address,
2002                                 address->address_length));
2003     GNUNET_HELLO_address_free (address);
2004     return;
2005   }
2006   if (NULL == s->frag_ctx)
2007   {
2008     LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
2009          "Fragmentation context of address %s for ACK (%s) not found\n",
2010          udp_address_to_string (plugin,
2011                                 address->address,
2012                                 address->address_length),
2013          GNUNET_FRAGMENT_print_ack (ack));
2014     GNUNET_HELLO_address_free (address);
2015     return;
2016   }
2017   GNUNET_HELLO_address_free (address);
2018
2019   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2020   LOG (GNUNET_ERROR_TYPE_DEBUG,
2021        "We received a sending delay of %s for %s\n",
2022        GNUNET_STRINGS_relative_time_to_string (flow_delay,
2023                                                GNUNET_YES),
2024        GNUNET_i2s (&udp_ack->sender));
2025   s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
2026
2027
2028   if (GNUNET_OK !=
2029       GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2030                                    ack))
2031   {
2032     LOG (GNUNET_ERROR_TYPE_DEBUG,
2033          "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2034          (unsigned int) ntohs (msg->size),
2035          GNUNET_i2s (&udp_ack->sender),
2036          udp_address_to_string (plugin,
2037                                 udp_addr,
2038                                 udp_addr_len));
2039     /* Expect more ACKs to arrive */
2040     return;
2041   }
2042
2043   LOG (GNUNET_ERROR_TYPE_DEBUG,
2044        "Message from %s at %s full ACK'ed\n",
2045        GNUNET_i2s (&udp_ack->sender),
2046        udp_address_to_string (plugin,
2047                               udp_addr,
2048                               udp_addr_len));
2049
2050   /* Remove fragmented message after successful sending */
2051   fragmented_message_done (s->frag_ctx,
2052                            GNUNET_OK);
2053 }
2054
2055
2056 /* ********************** Receiving ********************** */
2057
2058
2059 /**
2060  * Closure for #find_receive_context().
2061  */
2062 struct FindReceiveContext
2063 {
2064   /**
2065    * Where to store the result.
2066    */
2067   struct DefragContext *rc;
2068
2069   /**
2070    * Session associated with this context.
2071    */
2072   struct Session *session;
2073
2074   /**
2075    * Address to find.
2076    */
2077   const union UdpAddress *udp_addr;
2078
2079   /**
2080    * Number of bytes in @e udp_addr.
2081    */
2082   size_t udp_addr_len;
2083
2084 };
2085
2086
2087 /**
2088  * Scan the heap for a receive context with the given address.
2089  *
2090  * @param cls the `struct FindReceiveContext`
2091  * @param node internal node of the heap
2092  * @param element value stored at the node (a `struct ReceiveContext`)
2093  * @param cost cost associated with the node
2094  * @return #GNUNET_YES if we should continue to iterate,
2095  *         #GNUNET_NO if not.
2096  */
2097 static int
2098 find_receive_context (void *cls,
2099                       struct GNUNET_CONTAINER_HeapNode *node,
2100                       void *element,
2101                       GNUNET_CONTAINER_HeapCostType cost)
2102 {
2103   struct FindReceiveContext *frc = cls;
2104   struct DefragContext *e = element;
2105
2106   if ( (frc->udp_addr_len == e->udp_addr_len) &&
2107        (0 == memcmp (frc->udp_addr,
2108                      e->udp_addr,
2109                      frc->udp_addr_len)) )
2110   {
2111     frc->rc = e;
2112     return GNUNET_NO;
2113   }
2114   return GNUNET_YES;
2115 }
2116
2117
2118 /**
2119  * Message tokenizer has broken up an incomming message. Pass it on
2120  * to the service.
2121  *
2122  * @param cls the `struct Plugin *`
2123  * @param client the `struct Session *`
2124  * @param hdr the actual message
2125  * @return #GNUNET_OK (always)
2126  */
2127 static int
2128 process_inbound_tokenized_messages (void *cls,
2129                                     void *client,
2130                                     const struct GNUNET_MessageHeader *hdr)
2131 {
2132   struct Plugin *plugin = cls;
2133   struct Session *session = client;
2134
2135   if (GNUNET_YES == session->in_destroy)
2136     return GNUNET_OK;
2137   reschedule_session_timeout (session);
2138   session->flow_delay_for_other_peer
2139     = plugin->env->receive (plugin->env->cls,
2140                             session->address,
2141                             session,
2142                             hdr);
2143   return GNUNET_OK;
2144 }
2145
2146
2147 /**
2148  * Functions with this signature are called whenever we need to close
2149  * a session due to a disconnect or failure to establish a connection.
2150  *
2151  * @param cls closure with the `struct Plugin`
2152  * @param s session to close down
2153  * @return #GNUNET_OK on success
2154  */
2155 static int
2156 udp_disconnect_session (void *cls,
2157                         struct Session *s)
2158 {
2159   struct Plugin *plugin = cls;
2160   struct UDP_MessageWrapper *udpw;
2161   struct UDP_MessageWrapper *next;
2162   struct FindReceiveContext frc;
2163
2164   GNUNET_assert (GNUNET_YES != s->in_destroy);
2165   LOG (GNUNET_ERROR_TYPE_DEBUG,
2166        "Session %p to peer `%s' at address %s ended\n",
2167        s,
2168        GNUNET_i2s (&s->target),
2169        udp_address_to_string (plugin,
2170                               s->address->address,
2171                               s->address->address_length));
2172   if (NULL != s->timeout_task)
2173   {
2174     GNUNET_SCHEDULER_cancel (s->timeout_task);
2175     s->timeout_task = NULL;
2176   }
2177   if (NULL != s->frag_ctx)
2178   {
2179     /* Remove fragmented message due to disconnect */
2180     fragmented_message_done (s->frag_ctx,
2181                              GNUNET_SYSERR);
2182   }
2183   GNUNET_assert (GNUNET_YES ==
2184                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
2185                                                        &s->target,
2186                                                        s));
2187   frc.rc = NULL;
2188   frc.udp_addr = s->address->address;
2189   frc.udp_addr_len = s->address->address_length;
2190   /* Lookup existing receive context for this address */
2191   if (NULL != plugin->defrag_ctxs)
2192   {
2193     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2194                                    &find_receive_context,
2195                                    &frc);
2196     if (NULL != frc.rc)
2197     {
2198       struct DefragContext *d_ctx = frc.rc;
2199
2200       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
2201       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2202       GNUNET_free (d_ctx);
2203     }
2204   }
2205   s->in_destroy = GNUNET_YES;
2206   next = plugin->ipv4_queue_head;
2207   while (NULL != (udpw = next))
2208   {
2209     next = udpw->next;
2210     if (udpw->session == s)
2211     {
2212       dequeue (plugin,
2213                udpw);
2214       udpw->qc (udpw->qc_cls,
2215                 udpw,
2216                 GNUNET_SYSERR);
2217       GNUNET_free (udpw);
2218     }
2219   }
2220   next = plugin->ipv6_queue_head;
2221   while (NULL != (udpw = next))
2222   {
2223     next = udpw->next;
2224     if (udpw->session == s)
2225     {
2226       dequeue (plugin,
2227                udpw);
2228       udpw->qc (udpw->qc_cls,
2229                 udpw,
2230                 GNUNET_SYSERR);
2231       GNUNET_free (udpw);
2232     }
2233   }
2234   if ( (NULL != s->frag_ctx) &&
2235        (NULL != s->frag_ctx->cont) )
2236   {
2237     /* The 'frag_ctx' itself will be freed in #free_session() a bit
2238        later, as it might be in use right now */
2239     LOG (GNUNET_ERROR_TYPE_DEBUG,
2240          "Calling continuation for fragemented message to `%s' with result SYSERR\n",
2241          GNUNET_i2s (&s->target));
2242     s->frag_ctx->cont (s->frag_ctx->cont_cls,
2243                        &s->target,
2244                        GNUNET_SYSERR,
2245                        s->frag_ctx->payload_size,
2246                        s->frag_ctx->on_wire_size);
2247   }
2248   notify_session_monitor (s->plugin,
2249                           s,
2250                           GNUNET_TRANSPORT_SS_DONE);
2251   plugin->env->session_end (plugin->env->cls,
2252                             s->address,
2253                             s);
2254   GNUNET_STATISTICS_set (plugin->env->stats,
2255                          "# UDP sessions active",
2256                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2257                          GNUNET_NO);
2258   if (0 == s->rc)
2259     free_session (s);
2260   return GNUNET_OK;
2261 }
2262
2263
2264 /**
2265  * Destroy a session, plugin is being unloaded.
2266  *
2267  * @param cls the `struct Plugin`
2268  * @param key hash of public key of target peer
2269  * @param value a `struct PeerSession *` to clean up
2270  * @return #GNUNET_OK (continue to iterate)
2271  */
2272 static int
2273 disconnect_and_free_it (void *cls,
2274                         const struct GNUNET_PeerIdentity *key,
2275                         void *value)
2276 {
2277   struct Plugin *plugin = cls;
2278
2279   udp_disconnect_session (plugin,
2280                           value);
2281   return GNUNET_OK;
2282 }
2283
2284
2285 /**
2286  * Disconnect from a remote node.  Clean up session if we have one for
2287  * this peer.
2288  *
2289  * @param cls closure for this call (should be handle to Plugin)
2290  * @param target the peeridentity of the peer to disconnect
2291  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
2292  */
2293 static void
2294 udp_disconnect (void *cls,
2295                 const struct GNUNET_PeerIdentity *target)
2296 {
2297   struct Plugin *plugin = cls;
2298
2299   LOG (GNUNET_ERROR_TYPE_DEBUG,
2300        "Disconnecting from peer `%s'\n",
2301        GNUNET_i2s (target));
2302   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
2303                                               target,
2304                                               &disconnect_and_free_it,
2305                                               plugin);
2306 }
2307
2308
2309 /**
2310  * Session was idle, so disconnect it.
2311  *
2312  * @param cls the `struct Session` to time out
2313  * @param tc scheduler context
2314  */
2315 static void
2316 session_timeout (void *cls,
2317                  const struct GNUNET_SCHEDULER_TaskContext *tc)
2318 {
2319   struct Session *s = cls;
2320   struct Plugin *plugin = s->plugin;
2321   struct GNUNET_TIME_Relative left;
2322
2323   s->timeout_task = NULL;
2324   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
2325   if (left.rel_value_us > 0)
2326   {
2327     /* not actually our turn yet, but let's at least update
2328        the monitor, it may think we're about to die ... */
2329     notify_session_monitor (s->plugin,
2330                             s,
2331                             GNUNET_TRANSPORT_SS_UPDATE);
2332     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
2333                                                     &session_timeout,
2334                                                     s);
2335     return;
2336   }
2337   LOG (GNUNET_ERROR_TYPE_DEBUG,
2338        "Session %p was idle for %s, disconnecting\n",
2339        s,
2340        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
2341                                                GNUNET_YES));
2342   /* call session destroy function */
2343   udp_disconnect_session (plugin,
2344                           s);
2345 }
2346
2347
2348 /**
2349  * Allocate a new session for the given endpoint address.
2350  * Note that this function does not inform the service
2351  * of the new session, this is the responsibility of the
2352  * caller (if needed).
2353  *
2354  * @param cls the `struct Plugin`
2355  * @param address address of the other peer to use
2356  * @param network_type network type the address belongs to
2357  * @return NULL on error, otherwise session handle
2358  */
2359 static struct Session *
2360 udp_plugin_create_session (void *cls,
2361                            const struct GNUNET_HELLO_Address *address,
2362                            enum GNUNET_ATS_Network_Type network_type)
2363 {
2364   struct Plugin *plugin = cls;
2365   struct Session *s;
2366
2367   s = GNUNET_new (struct Session);
2368   s->plugin = plugin;
2369   s->address = GNUNET_HELLO_address_copy (address);
2370   s->target = address->peer;
2371   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2372                                                               250);
2373   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
2374   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
2375   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
2376   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
2377   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
2378                                                   &session_timeout,
2379                                                   s);
2380   s->scope = network_type;
2381
2382   LOG (GNUNET_ERROR_TYPE_DEBUG,
2383        "Creating new session %p for peer `%s' address `%s'\n",
2384        s,
2385        GNUNET_i2s (&address->peer),
2386        udp_address_to_string (plugin,
2387                               address->address,
2388                               address->address_length));
2389   GNUNET_assert (GNUNET_OK ==
2390                  GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
2391                                                     &s->target,
2392                                                     s,
2393                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2394   GNUNET_STATISTICS_set (plugin->env->stats,
2395                          "# UDP sessions active",
2396                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
2397                          GNUNET_NO);
2398   notify_session_monitor (plugin,
2399                           s,
2400                           GNUNET_TRANSPORT_SS_INIT);
2401   return s;
2402 }
2403
2404
2405 /**
2406  * Creates a new outbound session the transport service will use to
2407  * send data to the peer.
2408  *
2409  * @param cls the `struct Plugin *`
2410  * @param address the address
2411  * @return the session or NULL of max connections exceeded
2412  */
2413 static struct Session *
2414 udp_plugin_get_session (void *cls,
2415                         const struct GNUNET_HELLO_Address *address)
2416 {
2417   struct Plugin *plugin = cls;
2418   struct Session *s;
2419   enum GNUNET_ATS_Network_Type network_type = GNUNET_ATS_NET_UNSPECIFIED;
2420   const struct IPv4UdpAddress *udp_v4;
2421   const struct IPv6UdpAddress *udp_v6;
2422
2423   if (NULL == address)
2424   {
2425     GNUNET_break (0);
2426     return NULL;
2427   }
2428   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
2429        (address->address_length != sizeof(struct IPv6UdpAddress)) )
2430   {
2431     GNUNET_break_op (0);
2432     return NULL;
2433   }
2434   if (NULL != (s = udp_plugin_lookup_session (cls,
2435                                               address)))
2436     return s;
2437
2438   /* need to create new session */
2439   if (sizeof (struct IPv4UdpAddress) == address->address_length)
2440   {
2441     struct sockaddr_in v4;
2442
2443     udp_v4 = (const struct IPv4UdpAddress *) address->address;
2444     memset (&v4, '\0', sizeof (v4));
2445     v4.sin_family = AF_INET;
2446 #if HAVE_SOCKADDR_IN_SIN_LEN
2447     v4.sin_len = sizeof (struct sockaddr_in);
2448 #endif
2449     v4.sin_port = udp_v4->u4_port;
2450     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
2451     network_type = plugin->env->get_address_type (plugin->env->cls,
2452                                                   (const struct sockaddr *) &v4,
2453                                                   sizeof (v4));
2454   }
2455   if (sizeof (struct IPv6UdpAddress) == address->address_length)
2456   {
2457     struct sockaddr_in6 v6;
2458
2459     udp_v6 = (const struct IPv6UdpAddress *) address->address;
2460     memset (&v6, '\0', sizeof (v6));
2461     v6.sin6_family = AF_INET6;
2462 #if HAVE_SOCKADDR_IN_SIN_LEN
2463     v6.sin6_len = sizeof (struct sockaddr_in6);
2464 #endif
2465     v6.sin6_port = udp_v6->u6_port;
2466     v6.sin6_addr = udp_v6->ipv6_addr;
2467     network_type = plugin->env->get_address_type (plugin->env->cls,
2468                                                   (const struct sockaddr *) &v6,
2469                                                   sizeof (v6));
2470   }
2471   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2472   return udp_plugin_create_session (cls,
2473                                     address,
2474                                     network_type);
2475 }
2476
2477
2478 /**
2479  * We've received a UDP Message.  Process it (pass contents to main service).
2480  *
2481  * @param plugin plugin context
2482  * @param msg the message
2483  * @param udp_addr sender address
2484  * @param udp_addr_len number of bytes in @a udp_addr
2485  * @param network_type network type the address belongs to
2486  */
2487 static void
2488 process_udp_message (struct Plugin *plugin,
2489                      const struct UDPMessage *msg,
2490                      const union UdpAddress *udp_addr,
2491                      size_t udp_addr_len,
2492                      enum GNUNET_ATS_Network_Type network_type)
2493 {
2494   struct Session *s;
2495   struct GNUNET_HELLO_Address *address;
2496
2497   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2498   if (0 != ntohl (msg->reserved))
2499   {
2500     GNUNET_break_op(0);
2501     return;
2502   }
2503   if (ntohs (msg->header.size)
2504       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2505   {
2506     GNUNET_break_op(0);
2507     return;
2508   }
2509
2510   address = GNUNET_HELLO_address_allocate (&msg->sender,
2511                                            PLUGIN_NAME,
2512                                            udp_addr,
2513                                            udp_addr_len,
2514                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2515   if (NULL ==
2516       (s = udp_plugin_lookup_session (plugin,
2517                                       address)))
2518   {
2519     s = udp_plugin_create_session (plugin,
2520                                    address,
2521                                    network_type);
2522     plugin->env->session_start (plugin->env->cls,
2523                                 address,
2524                                 s,
2525                                 s->scope);
2526     notify_session_monitor (plugin,
2527                             s,
2528                             GNUNET_TRANSPORT_SS_UP);
2529   }
2530   GNUNET_free (address);
2531
2532   s->rc++;
2533   GNUNET_SERVER_mst_receive (plugin->mst,
2534                              s,
2535                              (const char *) &msg[1],
2536                              ntohs (msg->header.size) - sizeof(struct UDPMessage),
2537                              GNUNET_YES,
2538                              GNUNET_NO);
2539   s->rc--;
2540   if ( (0 == s->rc) &&
2541        (GNUNET_YES == s->in_destroy) )
2542     free_session (s);
2543 }
2544
2545
2546 /**
2547  * Process a defragmented message.
2548  *
2549  * @param cls the `struct DefragContext *`
2550  * @param msg the message
2551  */
2552 static void
2553 fragment_msg_proc (void *cls,
2554                    const struct GNUNET_MessageHeader *msg)
2555 {
2556   struct DefragContext *dc = cls;
2557   const struct UDPMessage *um;
2558
2559   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2560   {
2561     GNUNET_break_op (0);
2562     return;
2563   }
2564   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2565   {
2566     GNUNET_break_op (0);
2567     return;
2568   }
2569   um = (const struct UDPMessage *) msg;
2570   dc->sender = um->sender;
2571   dc->have_sender = GNUNET_YES;
2572   process_udp_message (dc->plugin,
2573                        um,
2574                        dc->udp_addr,
2575                        dc->udp_addr_len,
2576                        dc->network_type);
2577 }
2578
2579
2580 /**
2581  * We finished sending an acknowledgement.  Update
2582  * statistics.
2583  *
2584  * @param cls the `struct Plugin`
2585  * @param udpw message queue entry of the ACK
2586  * @param result #GNUNET_OK if the transmission worked,
2587  *               #GNUNET_SYSERR if we failed to send the ACK
2588  */
2589 static void
2590 ack_message_sent (void *cls,
2591                   struct UDP_MessageWrapper *udpw,
2592                   int result)
2593 {
2594   struct Plugin *plugin = cls;
2595
2596   if (GNUNET_OK == result)
2597   {
2598     GNUNET_STATISTICS_update (plugin->env->stats,
2599                               "# UDP, ACK messages sent",
2600                               1,
2601                               GNUNET_NO);
2602   }
2603   else
2604   {
2605     GNUNET_STATISTICS_update (plugin->env->stats,
2606                               "# UDP, ACK transmissions failed",
2607                               1,
2608                               GNUNET_NO);
2609   }
2610 }
2611
2612
2613 /**
2614  * Transmit an acknowledgement.
2615  *
2616  * @param cls the `struct DefragContext *`
2617  * @param id message ID (unused)
2618  * @param msg ack to transmit
2619  */
2620 static void
2621 ack_proc (void *cls,
2622           uint32_t id,
2623           const struct GNUNET_MessageHeader *msg)
2624 {
2625   struct DefragContext *rc = cls;
2626   struct Plugin *plugin = rc->plugin;
2627   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2628   struct UDP_ACK_Message *udp_ack;
2629   uint32_t delay;
2630   struct UDP_MessageWrapper *udpw;
2631   struct Session *s;
2632   struct GNUNET_HELLO_Address *address;
2633
2634   if (GNUNET_NO == rc->have_sender)
2635   {
2636     /* tried to defragment but never succeeded, hence will not ACK */
2637     /* This can happen if we just lost msgs */
2638     GNUNET_STATISTICS_update (plugin->env->stats,
2639                               "# UDP, fragments discarded without ACK",
2640                               1,
2641                               GNUNET_NO);
2642     return;
2643   }
2644   address = GNUNET_HELLO_address_allocate (&rc->sender,
2645                                            PLUGIN_NAME,
2646                                            rc->udp_addr,
2647                                            rc->udp_addr_len,
2648                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2649   s = udp_plugin_lookup_session (plugin,
2650                                  address);
2651   GNUNET_HELLO_address_free (address);
2652   if (NULL == s)
2653   {
2654     LOG (GNUNET_ERROR_TYPE_ERROR,
2655          "Trying to transmit ACK to peer `%s' but no session found!\n",
2656          udp_address_to_string (plugin,
2657                                 rc->udp_addr,
2658                                 rc->udp_addr_len));
2659     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2660     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2661     GNUNET_free (rc);
2662     GNUNET_STATISTICS_update (plugin->env->stats,
2663                               "# UDP, ACK transmissions failed",
2664                               1,
2665                               GNUNET_NO);
2666     return;
2667   }
2668   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
2669     delay = s->flow_delay_for_other_peer.rel_value_us;
2670   else
2671     delay = UINT32_MAX;
2672   LOG (GNUNET_ERROR_TYPE_DEBUG,
2673        "Sending ACK to `%s' including delay of %s\n",
2674        udp_address_to_string (plugin,
2675                               rc->udp_addr,
2676                               rc->udp_addr_len),
2677        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2678                                                GNUNET_YES));
2679   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2680   udpw->msg_size = msize;
2681   udpw->payload_size = 0;
2682   udpw->session = s;
2683   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2684   udpw->msg_buf = (char *) &udpw[1];
2685   udpw->qc = &ack_message_sent;
2686   udpw->qc_cls = plugin;
2687   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2688   udp_ack->header.size = htons ((uint16_t) msize);
2689   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2690   udp_ack->delay = htonl (delay);
2691   udp_ack->sender = *plugin->env->my_identity;
2692   memcpy (&udp_ack[1],
2693           msg,
2694           ntohs (msg->size));
2695   enqueue (plugin,
2696            udpw);
2697   notify_session_monitor (plugin,
2698                           s,
2699                           GNUNET_TRANSPORT_SS_UPDATE);
2700   if (s->address->address_length == sizeof (struct IPv4UdpAddress))
2701     schedule_select_v4 (plugin);
2702   else
2703     schedule_select_v6 (plugin);
2704 }
2705
2706
2707 /**
2708  * We received a fragment, process it.
2709  *
2710  * @param plugin our plugin
2711  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2712  * @param udp_addr sender address
2713  * @param udp_addr_len number of bytes in @a udp_addr
2714  * @param network_type network type the address belongs to
2715  */
2716 static void
2717 read_process_fragment (struct Plugin *plugin,
2718                        const struct GNUNET_MessageHeader *msg,
2719                        const union UdpAddress *udp_addr,
2720                        size_t udp_addr_len,
2721                        enum GNUNET_ATS_Network_Type network_type)
2722 {
2723   struct DefragContext *d_ctx;
2724   struct GNUNET_TIME_Absolute now;
2725   struct FindReceiveContext frc;
2726
2727   frc.rc = NULL;
2728   frc.udp_addr = udp_addr;
2729   frc.udp_addr_len = udp_addr_len;
2730
2731   /* Lookup existing receive context for this address */
2732   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2733                                  &find_receive_context,
2734                                  &frc);
2735   now = GNUNET_TIME_absolute_get ();
2736   d_ctx = frc.rc;
2737
2738   if (NULL == d_ctx)
2739   {
2740     /* Create a new defragmentation context */
2741     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
2742     memcpy (&d_ctx[1],
2743             udp_addr,
2744             udp_addr_len);
2745     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2746     d_ctx->udp_addr_len = udp_addr_len;
2747     d_ctx->network_type = network_type;
2748     d_ctx->plugin = plugin;
2749     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2750                                                       UDP_MTU,
2751                                                       UDP_MAX_MESSAGES_IN_DEFRAG,
2752                                                       d_ctx,
2753                                                       &fragment_msg_proc,
2754                                                       &ack_proc);
2755     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2756                                                  d_ctx,
2757                                                  (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2758     LOG (GNUNET_ERROR_TYPE_DEBUG,
2759          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2760          (unsigned int) ntohs (msg->size),
2761          udp_address_to_string (plugin,
2762                                 udp_addr,
2763                                 udp_addr_len));
2764   }
2765   else
2766   {
2767     LOG (GNUNET_ERROR_TYPE_DEBUG,
2768          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2769          (unsigned int) ntohs (msg->size),
2770          udp_address_to_string (plugin,
2771                                 udp_addr,
2772                                 udp_addr_len));
2773   }
2774
2775   if (GNUNET_OK ==
2776       GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag,
2777                                           msg))
2778   {
2779     /* keep this 'rc' from expiring */
2780     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
2781                                        d_ctx->hnode,
2782                                        (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2783   }
2784   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2785       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2786   {
2787     /* remove 'rc' that was inactive the longest */
2788     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2789     GNUNET_assert (NULL != d_ctx);
2790     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2791     GNUNET_free (d_ctx);
2792     GNUNET_STATISTICS_update (plugin->env->stats,
2793                               "# UDP, Defragmentations aborted",
2794                               1,
2795                               GNUNET_NO);
2796   }
2797 }
2798
2799
2800 /**
2801  * Read and process a message from the given socket.
2802  *
2803  * @param plugin the overall plugin
2804  * @param rsock socket to read from
2805  */
2806 static void
2807 udp_select_read (struct Plugin *plugin,
2808                  struct GNUNET_NETWORK_Handle *rsock)
2809 {
2810   socklen_t fromlen;
2811   struct sockaddr_storage addr;
2812   char buf[65536] GNUNET_ALIGN;
2813   ssize_t size;
2814   const struct GNUNET_MessageHeader *msg;
2815   struct IPv4UdpAddress v4;
2816   struct IPv6UdpAddress v6;
2817   const struct sockaddr *sa;
2818   const struct sockaddr_in *sa4;
2819   const struct sockaddr_in6 *sa6;
2820   const union UdpAddress *int_addr;
2821   size_t int_addr_len;
2822   enum GNUNET_ATS_Network_Type network_type;
2823
2824   fromlen = sizeof (addr);
2825   memset (&addr,
2826           0,
2827           sizeof(addr));
2828   size = GNUNET_NETWORK_socket_recvfrom (rsock,
2829                                          buf,
2830                                          sizeof(buf),
2831                                          (struct sockaddr *) &addr,
2832                                          &fromlen);
2833   sa = (const struct sockaddr *) &addr;
2834 #if MINGW
2835   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2836    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2837    * on this socket has failed.
2838    * Quote from MSDN:
2839    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2840    *   executing a hard or abortive close. The application should close
2841    *   the socket; it is no longer usable. On a UDP-datagram socket this
2842    *   error indicates a previous send operation resulted in an ICMP Port
2843    *   Unreachable message.
2844    */
2845   if ( (-1 == size) &&
2846        (ECONNRESET == errno) )
2847     return;
2848 #endif
2849   if (-1 == size)
2850   {
2851     LOG (GNUNET_ERROR_TYPE_DEBUG,
2852          "UDP failed to receive data: %s\n",
2853          STRERROR (errno));
2854     /* Connection failure or something. Not a protocol violation. */
2855     return;
2856   }
2857   if (size < sizeof(struct GNUNET_MessageHeader))
2858   {
2859     LOG (GNUNET_ERROR_TYPE_WARNING,
2860          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
2861          (unsigned int ) size,
2862          GNUNET_a2s (sa,
2863                      fromlen));
2864     /* _MAY_ be a connection failure (got partial message) */
2865     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2866     GNUNET_break_op (0);
2867     return;
2868   }
2869   msg = (const struct GNUNET_MessageHeader *) buf;
2870   LOG (GNUNET_ERROR_TYPE_DEBUG,
2871        "UDP received %u-byte message from `%s' type %u\n",
2872        (unsigned int) size,
2873        GNUNET_a2s (sa,
2874                    fromlen),
2875        ntohs (msg->type));
2876   if (size != ntohs (msg->size))
2877   {
2878     LOG (GNUNET_ERROR_TYPE_WARNING,
2879          "UDP malformed message header from %s\n",
2880          (unsigned int) size,
2881          GNUNET_a2s (sa,
2882                      fromlen));
2883     GNUNET_break_op (0);
2884     return;
2885   }
2886   GNUNET_STATISTICS_update (plugin->env->stats,
2887                             "# UDP, total bytes received",
2888                             size,
2889                             GNUNET_NO);
2890   network_type = plugin->env->get_address_type (plugin->env->cls,
2891                                                 sa,
2892                                                 fromlen);
2893   switch (sa->sa_family)
2894   {
2895   case AF_INET:
2896     sa4 = (const struct sockaddr_in *) &addr;
2897     v4.options = 0;
2898     v4.ipv4_addr = sa4->sin_addr.s_addr;
2899     v4.u4_port = sa4->sin_port;
2900     int_addr = (union UdpAddress *) &v4;
2901     int_addr_len = sizeof (v4);
2902     break;
2903   case AF_INET6:
2904     sa6 = (const struct sockaddr_in6 *) &addr;
2905     v6.options = 0;
2906     v6.ipv6_addr = sa6->sin6_addr;
2907     v6.u6_port = sa6->sin6_port;
2908     int_addr = (union UdpAddress *) &v6;
2909     int_addr_len = sizeof (v6);
2910     break;
2911   default:
2912     GNUNET_break (0);
2913     return;
2914   }
2915
2916   switch (ntohs (msg->type))
2917   {
2918   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2919     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
2920       udp_broadcast_receive (plugin,
2921                              buf,
2922                              size,
2923                              int_addr,
2924                              int_addr_len,
2925                              network_type);
2926     return;
2927   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2928     if (ntohs (msg->size) < sizeof(struct UDPMessage))
2929     {
2930       GNUNET_break_op(0);
2931       return;
2932     }
2933     process_udp_message (plugin,
2934                          (const struct UDPMessage *) msg,
2935                          int_addr,
2936                          int_addr_len,
2937                          network_type);
2938     return;
2939   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2940     read_process_ack (plugin,
2941                       msg,
2942                       int_addr,
2943                       int_addr_len);
2944     return;
2945   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2946     read_process_fragment (plugin,
2947                            msg,
2948                            int_addr,
2949                            int_addr_len,
2950                            network_type);
2951     return;
2952   default:
2953     GNUNET_break_op(0);
2954     return;
2955   }
2956 }
2957
2958
2959 /**
2960  * Removes messages from the transmission queue that have
2961  * timed out, and then selects a message that should be
2962  * transmitted next.
2963  *
2964  * @param plugin the UDP plugin
2965  * @param sock which socket should we process the queue for (v4 or v6)
2966  * @return message selected for transmission, or NULL for none
2967  */
2968 static struct UDP_MessageWrapper *
2969 remove_timeout_messages_and_select (struct Plugin *plugin,
2970                                     struct GNUNET_NETWORK_Handle *sock)
2971 {
2972   struct UDP_MessageWrapper *udpw;
2973   struct GNUNET_TIME_Relative remaining;
2974   struct Session *session;
2975   int removed;
2976
2977   removed = GNUNET_NO;
2978   udpw = (sock == plugin->sockv4)
2979     ? plugin->ipv4_queue_head
2980     : plugin->ipv6_queue_head;
2981   while (NULL != udpw)
2982   {
2983     session = udpw->session;
2984     /* Find messages with timeout */
2985     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2986     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2987     {
2988       /* Message timed out */
2989       removed = GNUNET_YES;
2990       dequeue (plugin,
2991                udpw);
2992       udpw->qc (udpw->qc_cls,
2993                 udpw,
2994                 GNUNET_SYSERR);
2995       GNUNET_free (udpw);
2996
2997       if (sock == plugin->sockv4)
2998       {
2999         udpw = plugin->ipv4_queue_head;
3000       }
3001       else if (sock == plugin->sockv6)
3002       {
3003         udpw = plugin->ipv6_queue_head;
3004       }
3005       else
3006       {
3007         GNUNET_break (0); /* should never happen */
3008         udpw = NULL;
3009       }
3010       GNUNET_STATISTICS_update (plugin->env->stats,
3011                                 "# messages discarded due to timeout",
3012                                 1,
3013                                 GNUNET_NO);
3014     }
3015     else
3016     {
3017       /* Message did not time out, check flow delay */
3018       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
3019       if (0 == remaining.rel_value_us)
3020       {
3021         /* this message is not delayed */
3022         LOG (GNUNET_ERROR_TYPE_DEBUG,
3023              "Message for peer `%s' (%u bytes) is not delayed \n",
3024              GNUNET_i2s (&udpw->session->target),
3025              udpw->payload_size);
3026         break; /* Found message to send, break */
3027       }
3028       else
3029       {
3030         /* Message is delayed, try next */
3031         LOG (GNUNET_ERROR_TYPE_DEBUG,
3032              "Message for peer `%s' (%u bytes) is delayed for %s\n",
3033              GNUNET_i2s (&udpw->session->target),
3034              udpw->payload_size,
3035              GNUNET_STRINGS_relative_time_to_string (remaining,
3036                                                      GNUNET_YES));
3037         udpw = udpw->next;
3038       }
3039     }
3040   }
3041   if (GNUNET_YES == removed)
3042     notify_session_monitor (session->plugin,
3043                             session,
3044                             GNUNET_TRANSPORT_SS_UPDATE);
3045   return udpw;
3046 }
3047
3048
3049 /**
3050  * We failed to transmit a message via UDP. Generate
3051  * a descriptive error message.
3052  *
3053  * @param plugin our plugin
3054  * @param sa target address we were trying to reach
3055  * @param slen number of bytes in @a sa
3056  * @param error the errno value returned from the sendto() call
3057  */
3058 static void
3059 analyze_send_error (struct Plugin *plugin,
3060                     const struct sockaddr *sa,
3061                     socklen_t slen,
3062                     int error)
3063 {
3064   enum GNUNET_ATS_Network_Type type;
3065
3066   type = plugin->env->get_address_type (plugin->env->cls,
3067                                         sa,
3068                                         slen);
3069   if ( ( (GNUNET_ATS_NET_LAN == type) ||
3070          (GNUNET_ATS_NET_WAN == type) ) &&
3071        ( (ENETUNREACH == errno) ||
3072          (ENETDOWN == errno) ) )
3073   {
3074     if (slen == sizeof (struct sockaddr_in))
3075     {
3076       /* IPv4: "Network unreachable" or "Network down"
3077        *
3078        * This indicates we do not have connectivity
3079        */
3080       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3081            _("UDP could not transmit message to `%s': "
3082              "Network seems down, please check your network configuration\n"),
3083            GNUNET_a2s (sa,
3084                        slen));
3085     }
3086     if (slen == sizeof (struct sockaddr_in6))
3087     {
3088       /* IPv6: "Network unreachable" or "Network down"
3089        *
3090        * This indicates that this system is IPv6 enabled, but does not
3091        * have a valid global IPv6 address assigned or we do not have
3092        * connectivity
3093        */
3094       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
3095            _("UDP could not transmit IPv6 message! "
3096              "Please check your network configuration and disable IPv6 if your "
3097              "connection does not have a global IPv6 address\n"));
3098     }
3099   }
3100   else
3101   {
3102     LOG (GNUNET_ERROR_TYPE_WARNING,
3103          "UDP could not transmit message to `%s': `%s'\n",
3104          GNUNET_a2s (sa,
3105                      slen),
3106          STRERROR (error));
3107   }
3108 }
3109
3110
3111 /**
3112  * It is time to try to transmit a UDP message.  Select one
3113  * and send.
3114  *
3115  * @param plugin the plugin
3116  * @param sock which socket (v4/v6) to send on
3117  */
3118 static void
3119 udp_select_send (struct Plugin *plugin,
3120                  struct GNUNET_NETWORK_Handle *sock)
3121 {
3122   ssize_t sent;
3123   socklen_t slen;
3124   const struct sockaddr *a;
3125   const struct IPv4UdpAddress *u4;
3126   struct sockaddr_in a4;
3127   const struct IPv6UdpAddress *u6;
3128   struct sockaddr_in6 a6;
3129   struct UDP_MessageWrapper *udpw;
3130
3131   /* Find message(s) to send */
3132   while (NULL != (udpw = remove_timeout_messages_and_select (plugin,
3133                                                              sock)))
3134   {
3135     if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3136     {
3137       u4 = udpw->session->address->address;
3138       memset (&a4,
3139               0,
3140               sizeof(a4));
3141       a4.sin_family = AF_INET;
3142 #if HAVE_SOCKADDR_IN_SIN_LEN
3143       a4.sin_len = sizeof (a4);
3144 #endif
3145       a4.sin_port = u4->u4_port;
3146       a4.sin_addr.s_addr = u4->ipv4_addr;
3147       a = (const struct sockaddr *) &a4;
3148       slen = sizeof (a4);
3149     }
3150     else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3151     {
3152       u6 = udpw->session->address->address;
3153       memset (&a6,
3154               0,
3155               sizeof(a6));
3156       a6.sin6_family = AF_INET6;
3157 #if HAVE_SOCKADDR_IN_SIN_LEN
3158       a6.sin6_len = sizeof (a6);
3159 #endif
3160       a6.sin6_port = u6->u6_port;
3161       a6.sin6_addr = u6->ipv6_addr;
3162       a = (const struct sockaddr *) &a6;
3163       slen = sizeof (a6);
3164     }
3165     else
3166     {
3167       GNUNET_break (0);
3168       dequeue (plugin,
3169                udpw);
3170       udpw->qc (udpw->qc_cls,
3171                 udpw,
3172                 GNUNET_SYSERR);
3173       notify_session_monitor (plugin,
3174                               udpw->session,
3175                               GNUNET_TRANSPORT_SS_UPDATE);
3176       GNUNET_free (udpw);
3177       continue;
3178     }
3179     sent = GNUNET_NETWORK_socket_sendto (sock,
3180                                          udpw->msg_buf,
3181                                          udpw->msg_size,
3182                                          a,
3183                                          slen);
3184     dequeue (plugin,
3185              udpw);
3186     if (GNUNET_SYSERR == sent)
3187     {
3188       /* Failure */
3189       analyze_send_error (plugin,
3190                           a,
3191                           slen,
3192                           errno);
3193       udpw->qc (udpw->qc_cls,
3194                 udpw,
3195                 GNUNET_SYSERR);
3196       GNUNET_STATISTICS_update (plugin->env->stats,
3197                                 "# UDP, total, bytes, sent, failure",
3198                                 sent,
3199                                 GNUNET_NO);
3200       GNUNET_STATISTICS_update (plugin->env->stats,
3201                                 "# UDP, total, messages, sent, failure",
3202                                 1,
3203                                 GNUNET_NO);
3204     }
3205     else
3206     {
3207       /* Success */
3208       LOG (GNUNET_ERROR_TYPE_DEBUG,
3209            "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3210            (unsigned int) (udpw->msg_size),
3211            GNUNET_i2s (&udpw->session->target),
3212            GNUNET_a2s (a,
3213                        slen),
3214            (int ) sent,
3215            (sent < 0) ? STRERROR (errno) : "ok");
3216       GNUNET_STATISTICS_update (plugin->env->stats,
3217                                 "# UDP, total, bytes, sent, success",
3218                                 sent,
3219                                 GNUNET_NO);
3220       GNUNET_STATISTICS_update (plugin->env->stats,
3221                                 "# UDP, total, messages, sent, success",
3222                                 1,
3223                                 GNUNET_NO);
3224       if (NULL != udpw->frag_ctx)
3225         udpw->frag_ctx->on_wire_size += udpw->msg_size;
3226       udpw->qc (udpw->qc_cls,
3227                 udpw,
3228                 GNUNET_OK);
3229     }
3230     notify_session_monitor (plugin,
3231                             udpw->session,
3232                             GNUNET_TRANSPORT_SS_UPDATE);
3233     GNUNET_free (udpw);
3234   }
3235 }
3236
3237
3238 /* ***************** Event loop (part 2) *************** */
3239
3240
3241 /**
3242  * We have been notified that our readset has something to read.  We don't
3243  * know which socket needs to be read, so we have to check each one
3244  * Then reschedule this function to be called again once more is available.
3245  *
3246  * @param cls the plugin handle
3247  * @param tc the scheduling context
3248  */
3249 static void
3250 udp_plugin_select_v4 (void *cls,
3251                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3252 {
3253   struct Plugin *plugin = cls;
3254
3255   plugin->select_task_v4 = NULL;
3256   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3257     return;
3258   if (NULL == plugin->sockv4)
3259     return;
3260   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3261       (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3262                                    plugin->sockv4)))
3263     udp_select_read (plugin,
3264                      plugin->sockv4);
3265   udp_select_send (plugin,
3266                    plugin->sockv4);
3267   schedule_select_v4 (plugin);
3268 }
3269
3270
3271 /**
3272  * We have been notified that our readset has something to read.  We don't
3273  * know which socket needs to be read, so we have to check each one
3274  * Then reschedule this function to be called again once more is available.
3275  *
3276  * @param cls the plugin handle
3277  * @param tc the scheduling context
3278  */
3279 static void
3280 udp_plugin_select_v6 (void *cls,
3281                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3282 {
3283   struct Plugin *plugin = cls;
3284
3285   plugin->select_task_v6 = NULL;
3286   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3287     return;
3288   if (NULL == plugin->sockv6)
3289     return;
3290   if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
3291        (GNUNET_NETWORK_fdset_isset (tc->read_ready,
3292                                     plugin->sockv6)) )
3293     udp_select_read (plugin,
3294                      plugin->sockv6);
3295
3296   udp_select_send (plugin,
3297                    plugin->sockv6);
3298   schedule_select_v6 (plugin);
3299 }
3300
3301
3302 /* ******************* Initialization *************** */
3303
3304
3305 /**
3306  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3307  *
3308  * @param plugin the plugin to initialize
3309  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3310  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3311  * @return number of sockets that were successfully bound
3312  */
3313 static int
3314 setup_sockets (struct Plugin *plugin,
3315                const struct sockaddr_in6 *bind_v6,
3316                const struct sockaddr_in *bind_v4)
3317 {
3318   int tries;
3319   int sockets_created = 0;
3320   struct sockaddr_in6 server_addrv6;
3321   struct sockaddr_in server_addrv4;
3322   const struct sockaddr *server_addr;
3323   const struct sockaddr *addrs[2];
3324   socklen_t addrlens[2];
3325   socklen_t addrlen;
3326   int eno;
3327
3328   /* Create IPv6 socket */
3329   eno = EINVAL;
3330   if (GNUNET_YES == plugin->enable_ipv6)
3331   {
3332     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6,
3333                                                    SOCK_DGRAM,
3334                                                    0);
3335     if (NULL == plugin->sockv6)
3336     {
3337       LOG (GNUNET_ERROR_TYPE_INFO,
3338            _("Disabling IPv6 since it is not supported on this system!\n"));
3339       plugin->enable_ipv6 = GNUNET_NO;
3340     }
3341     else
3342     {
3343       memset (&server_addrv6,
3344               0,
3345               sizeof(struct sockaddr_in6));
3346 #if HAVE_SOCKADDR_IN_SIN_LEN
3347       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3348 #endif
3349       server_addrv6.sin6_family = AF_INET6;
3350       if (NULL != bind_v6)
3351         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3352       else
3353         server_addrv6.sin6_addr = in6addr_any;
3354
3355       if (0 == plugin->port) /* autodetect */
3356         server_addrv6.sin6_port
3357           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3358                                              33537)
3359                    + 32000);
3360       else
3361         server_addrv6.sin6_port = htons (plugin->port);
3362       addrlen = sizeof (struct sockaddr_in6);
3363       server_addr = (const struct sockaddr *) &server_addrv6;
3364
3365       tries = 0;
3366       while (tries < 10)
3367       {
3368         LOG(GNUNET_ERROR_TYPE_DEBUG,
3369             "Binding to IPv6 `%s'\n",
3370             GNUNET_a2s (server_addr,
3371                         addrlen));
3372         /* binding */
3373         if (GNUNET_OK ==
3374             GNUNET_NETWORK_socket_bind (plugin->sockv6,
3375                                         server_addr,
3376                                         addrlen))
3377           break;
3378         eno = errno;
3379         if (0 != plugin->port)
3380         {
3381           tries = 10; /* fail immediately */
3382           break; /* bind failed on specific port */
3383         }
3384         /* autodetect */
3385         server_addrv6.sin6_port
3386           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3387                                              33537)
3388                    + 32000);
3389         tries++;
3390       }
3391       if (tries >= 10)
3392       {
3393         GNUNET_NETWORK_socket_close (plugin->sockv6);
3394         plugin->enable_ipv6 = GNUNET_NO;
3395         plugin->sockv6 = NULL;
3396       }
3397       else
3398       {
3399         plugin->port = ntohs (server_addrv6.sin6_port);
3400       }
3401       if (NULL != plugin->sockv6)
3402       {
3403         LOG (GNUNET_ERROR_TYPE_DEBUG,
3404              "IPv6 UDP socket created listinging at %s\n",
3405              GNUNET_a2s (server_addr,
3406                          addrlen));
3407         addrs[sockets_created] = server_addr;
3408         addrlens[sockets_created] = addrlen;
3409         sockets_created++;
3410       }
3411       else
3412       {
3413         LOG (GNUNET_ERROR_TYPE_WARNING,
3414              _("Failed to bind UDP socket to %s: %s\n"),
3415              GNUNET_a2s (server_addr,
3416                          addrlen),
3417              STRERROR (eno));
3418       }
3419     }
3420   }
3421
3422   /* Create IPv4 socket */
3423   eno = EINVAL;
3424   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET,
3425                                                  SOCK_DGRAM,
3426                                                  0);
3427   if (NULL == plugin->sockv4)
3428   {
3429     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3430                          "socket");
3431     LOG (GNUNET_ERROR_TYPE_INFO,
3432          _("Disabling IPv4 since it is not supported on this system!\n"));
3433     plugin->enable_ipv4 = GNUNET_NO;
3434   }
3435   else
3436   {
3437     memset (&server_addrv4,
3438             0,
3439             sizeof(struct sockaddr_in));
3440 #if HAVE_SOCKADDR_IN_SIN_LEN
3441     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3442 #endif
3443     server_addrv4.sin_family = AF_INET;
3444     if (NULL != bind_v4)
3445       server_addrv4.sin_addr = bind_v4->sin_addr;
3446     else
3447       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3448
3449     if (0 == plugin->port)
3450       /* autodetect */
3451       server_addrv4.sin_port
3452         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3453                                            33537)
3454                  + 32000);
3455     else
3456       server_addrv4.sin_port = htons (plugin->port);
3457
3458     addrlen = sizeof (struct sockaddr_in);
3459     server_addr = (const struct sockaddr *) &server_addrv4;
3460
3461     tries = 0;
3462     while (tries < 10)
3463     {
3464       LOG (GNUNET_ERROR_TYPE_DEBUG,
3465            "Binding to IPv4 `%s'\n",
3466            GNUNET_a2s (server_addr,
3467                        addrlen));
3468
3469       /* binding */
3470       if (GNUNET_OK ==
3471           GNUNET_NETWORK_socket_bind (plugin->sockv4,
3472                                       server_addr,
3473                                       addrlen))
3474         break;
3475       eno = errno;
3476       if (0 != plugin->port)
3477       {
3478         tries = 10; /* fail */
3479         break; /* bind failed on specific port */
3480       }
3481
3482       /* autodetect */
3483       server_addrv4.sin_port
3484         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3485                                            33537)
3486                  + 32000);
3487       tries++;
3488     }
3489     if (tries >= 10)
3490     {
3491       GNUNET_NETWORK_socket_close (plugin->sockv4);
3492       plugin->enable_ipv4 = GNUNET_NO;
3493       plugin->sockv4 = NULL;
3494     }
3495     else
3496     {
3497       plugin->port = ntohs (server_addrv4.sin_port);
3498     }
3499
3500     if (NULL != plugin->sockv4)
3501     {
3502       LOG (GNUNET_ERROR_TYPE_DEBUG,
3503            "IPv4 socket created on port %s\n",
3504            GNUNET_a2s (server_addr,
3505                        addrlen));
3506       addrs[sockets_created] = server_addr;
3507       addrlens[sockets_created] = addrlen;
3508       sockets_created++;
3509     }
3510     else
3511     {
3512       LOG (GNUNET_ERROR_TYPE_ERROR,
3513            _("Failed to bind UDP socket to %s: %s\n"),
3514            GNUNET_a2s (server_addr,
3515                        addrlen),
3516            STRERROR (eno));
3517     }
3518   }
3519
3520   if (0 == sockets_created)
3521   {
3522     LOG (GNUNET_ERROR_TYPE_WARNING,
3523          _("Failed to open UDP sockets\n"));
3524     return 0; /* No sockets created, return */
3525   }
3526   schedule_select_v4 (plugin);
3527   schedule_select_v6 (plugin);
3528   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3529                                      GNUNET_NO,
3530                                      plugin->port,
3531                                      sockets_created,
3532                                      addrs,
3533                                      addrlens,
3534                                      &udp_nat_port_map_callback,
3535                                      NULL,
3536                                      plugin);
3537   return sockets_created;
3538 }
3539
3540
3541 /**
3542  * The exported method. Makes the core api available via a global and
3543  * returns the udp transport API.
3544  *
3545  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3546  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3547  */
3548 void *
3549 libgnunet_plugin_transport_udp_init (void *cls)
3550 {
3551   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3552   struct GNUNET_TRANSPORT_PluginFunctions *api;
3553   struct Plugin *p;
3554   unsigned long long port;
3555   unsigned long long aport;
3556   unsigned long long udp_max_bps;
3557   unsigned long long enable_v6;
3558   unsigned long long enable_broadcasting;
3559   unsigned long long enable_broadcasting_recv;
3560   char *bind4_address;
3561   char *bind6_address;
3562   struct GNUNET_TIME_Relative interval;
3563   struct sockaddr_in server_addrv4;
3564   struct sockaddr_in6 server_addrv6;
3565   int res;
3566   int have_bind4;
3567   int have_bind6;
3568
3569   if (NULL == env->receive)
3570   {
3571     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3572      initialze the plugin or the API */
3573     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3574     api->cls = NULL;
3575     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3576     api->address_to_string = &udp_address_to_string;
3577     api->string_to_address = &udp_string_to_address;
3578     return api;
3579   }
3580
3581   /* Get port number: port == 0 : autodetect a port,
3582    * > 0 : use this port, not given : 2086 default */
3583   if (GNUNET_OK !=
3584       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3585                                              "transport-udp",
3586                                              "PORT",
3587                                              &port))
3588     port = 2086;
3589   if (port > 65535)
3590   {
3591     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3592                                "transport-udp",
3593                                "PORT",
3594                                _("must be in [0,65535]"));
3595     return NULL;
3596   }
3597   if (GNUNET_OK !=
3598       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3599                                              "transport-udp",
3600                                              "ADVERTISED_PORT",
3601                                              &aport))
3602     aport = port;
3603   if (aport > 65535)
3604   {
3605     GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3606                                "transport-udp",
3607                                "ADVERTISED_PORT",
3608                                _("must be in [0,65535]"));
3609     return NULL;
3610   }
3611
3612   if (GNUNET_YES ==
3613       GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3614                                             "nat",
3615                                             "DISABLEV6"))
3616     enable_v6 = GNUNET_NO;
3617   else
3618     enable_v6 = GNUNET_YES;
3619
3620   have_bind4 = GNUNET_NO;
3621   memset (&server_addrv4,
3622           0,
3623           sizeof (server_addrv4));
3624   if (GNUNET_YES ==
3625       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3626                                              "transport-udp",
3627                                              "BINDTO",
3628                                              &bind4_address))
3629   {
3630     LOG (GNUNET_ERROR_TYPE_DEBUG,
3631          "Binding UDP plugin to specific address: `%s'\n",
3632          bind4_address);
3633     if (1 != inet_pton (AF_INET,
3634                         bind4_address,
3635                         &server_addrv4.sin_addr))
3636     {
3637       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3638                                  "transport-udp",
3639                                  "BINDTO",
3640                                  _("must be valid IPv4 address"));
3641       GNUNET_free (bind4_address);
3642       return NULL;
3643     }
3644     have_bind4 = GNUNET_YES;
3645   }
3646   GNUNET_free_non_null (bind4_address);
3647   have_bind6 = GNUNET_NO;
3648   memset (&server_addrv6,
3649           0,
3650           sizeof (server_addrv6));
3651   if (GNUNET_YES ==
3652       GNUNET_CONFIGURATION_get_value_string (env->cfg,
3653                                              "transport-udp",
3654                                              "BINDTO6",
3655                                              &bind6_address))
3656   {
3657     LOG (GNUNET_ERROR_TYPE_DEBUG,
3658          "Binding udp plugin to specific address: `%s'\n",
3659          bind6_address);
3660     if (1 != inet_pton (AF_INET6,
3661                         bind6_address,
3662                         &server_addrv6.sin6_addr))
3663     {
3664       GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
3665                                  "transport-udp",
3666                                  "BINDTO6",
3667                                  _("must be valid IPv6 address"));
3668       GNUNET_free (bind6_address);
3669       return NULL;
3670     }
3671     have_bind6 = GNUNET_YES;
3672   }
3673   GNUNET_free_non_null (bind6_address);
3674
3675   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3676                                                               "transport-udp",
3677                                                               "BROADCAST");
3678   if (enable_broadcasting == GNUNET_SYSERR)
3679     enable_broadcasting = GNUNET_NO;
3680
3681   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3682                                                                    "transport-udp",
3683                                                                    "BROADCAST_RECEIVE");
3684   if (enable_broadcasting_recv == GNUNET_SYSERR)
3685     enable_broadcasting_recv = GNUNET_YES;
3686
3687   if (GNUNET_SYSERR ==
3688       GNUNET_CONFIGURATION_get_value_time (env->cfg,
3689                                            "transport-udp",
3690                                            "BROADCAST_INTERVAL",
3691                                            &interval))
3692   {
3693     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
3694                                               10);
3695   }
3696   if (GNUNET_OK !=
3697       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3698                                              "transport-udp",
3699                                              "MAX_BPS",
3700                                              &udp_max_bps))
3701   {
3702     /* 50 MB/s == infinity for practical purposes */
3703     udp_max_bps = 1024 * 1024 * 50;
3704   }
3705
3706   p = GNUNET_new (struct Plugin);
3707   p->port = port;
3708   p->aport = aport;
3709   p->broadcast_interval = interval;
3710   p->enable_ipv6 = enable_v6;
3711   p->enable_ipv4 = GNUNET_YES; /* default */
3712   p->enable_broadcasting = enable_broadcasting;
3713   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3714   p->env = env;
3715   p->sessions = GNUNET_CONTAINER_multipeermap_create (16,
3716                                                       GNUNET_NO);
3717   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
3718   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
3719                                      p);
3720   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3721                                  NULL,
3722                                  NULL,
3723                                  GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
3724                                  30);
3725   res = setup_sockets (p,
3726                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3727                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3728   if ( (0 == res) ||
3729        ( (NULL == p->sockv4) &&
3730          (NULL == p->sockv6) ) )
3731   {
3732     LOG (GNUNET_ERROR_TYPE_ERROR,
3733         _("Failed to create UDP network sockets\n"));
3734     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3735     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3736     GNUNET_SERVER_mst_destroy (p->mst);
3737     GNUNET_free (p);
3738     return NULL;
3739   }
3740
3741   /* Setup broadcasting and receiving beacons */
3742   setup_broadcast (p,
3743                    &server_addrv6,
3744                    &server_addrv4);
3745
3746   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3747   api->cls = p;
3748   api->disconnect_session = &udp_disconnect_session;
3749   api->query_keepalive_factor = &udp_query_keepalive_factor;
3750   api->disconnect_peer = &udp_disconnect;
3751   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3752   api->address_to_string = &udp_address_to_string;
3753   api->string_to_address = &udp_string_to_address;
3754   api->check_address = &udp_plugin_check_address;
3755   api->get_session = &udp_plugin_get_session;
3756   api->send = &udp_plugin_send;
3757   api->get_network = &udp_get_network;
3758   api->update_session_timeout = &udp_plugin_update_session_timeout;
3759   api->setup_monitor = &udp_plugin_setup_monitor;
3760   return api;
3761 }
3762
3763
3764 /**
3765  * Function called on each entry in the defragmentation heap to
3766  * clean it up.
3767  *
3768  * @param cls NULL
3769  * @param node node in the heap (to be removed)
3770  * @param element a `struct DefragContext` to be cleaned up
3771  * @param cost unused
3772  * @return #GNUNET_YES
3773  */
3774 static int
3775 heap_cleanup_iterator (void *cls,
3776                        struct GNUNET_CONTAINER_HeapNode *node,
3777                        void *element,
3778                        GNUNET_CONTAINER_HeapCostType cost)
3779 {
3780   struct DefragContext *d_ctx = element;
3781
3782   GNUNET_CONTAINER_heap_remove_node (node);
3783   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3784   GNUNET_free (d_ctx);
3785   return GNUNET_YES;
3786 }
3787
3788
3789 /**
3790  * The exported method. Makes the core api available via a global and
3791  * returns the udp transport API.
3792  *
3793  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3794  * @return NULL
3795  */
3796 void *
3797 libgnunet_plugin_transport_udp_done (void *cls)
3798 {
3799   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3800   struct Plugin *plugin = api->cls;
3801   struct PrettyPrinterContext *cur;
3802   struct UDP_MessageWrapper *udpw;
3803
3804   if (NULL == plugin)
3805   {
3806     GNUNET_free (api);
3807     return NULL;
3808   }
3809   stop_broadcast (plugin);
3810   if (NULL != plugin->select_task_v4)
3811   {
3812     GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
3813     plugin->select_task_v4 = NULL;
3814   }
3815   if (NULL != plugin->select_task_v6)
3816   {
3817     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3818     plugin->select_task_v6 = NULL;
3819   }
3820   if (NULL != plugin->sockv4)
3821   {
3822     GNUNET_break (GNUNET_OK ==
3823                   GNUNET_NETWORK_socket_close (plugin->sockv4));
3824     plugin->sockv4 = NULL;
3825   }
3826   if (NULL != plugin->sockv6)
3827   {
3828     GNUNET_break (GNUNET_OK ==
3829                   GNUNET_NETWORK_socket_close (plugin->sockv6));
3830     plugin->sockv6 = NULL;
3831   }
3832   if (NULL != plugin->nat)
3833   {
3834     GNUNET_NAT_unregister (plugin->nat);
3835     plugin->nat = NULL;
3836   }
3837   if (NULL != plugin->defrag_ctxs)
3838   {
3839     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3840                                    &heap_cleanup_iterator,
3841                                    NULL);
3842     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3843     plugin->defrag_ctxs = NULL;
3844   }
3845   if (NULL != plugin->mst)
3846   {
3847     GNUNET_SERVER_mst_destroy (plugin->mst);
3848     plugin->mst = NULL;
3849   }
3850   while (NULL != (udpw = plugin->ipv4_queue_head))
3851   {
3852     dequeue (plugin,
3853              udpw);
3854     udpw->qc (udpw->qc_cls,
3855               udpw,
3856               GNUNET_SYSERR);
3857     GNUNET_free (udpw);
3858   }
3859   while (NULL != (udpw = plugin->ipv6_queue_head))
3860   {
3861     dequeue (plugin,
3862              udpw);
3863     udpw->qc (udpw->qc_cls,
3864               udpw,
3865               GNUNET_SYSERR);
3866     GNUNET_free (udpw);
3867   }
3868   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3869                                          &disconnect_and_free_it,
3870                                          plugin);
3871   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3872
3873   while (NULL != (cur = plugin->ppc_dll_head))
3874   {
3875     GNUNET_break (0);
3876     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
3877                                  plugin->ppc_dll_tail,
3878                                  cur);
3879     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
3880     GNUNET_free (cur);
3881   }
3882   GNUNET_free (plugin);
3883   GNUNET_free (api);
3884   return NULL;
3885 }
3886
3887 /* end of plugin_transport_udp.c */