-fix indentation, logging, i18n, doxygen
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
46
47 /**
48  * Number of messages we can defragment in parallel.  We only really
49  * defragment 1 message at a time, but if messages get re-ordered, we
50  * may want to keep knowledge about the previous message to avoid
51  * discarding the current message in favor of a single fragment of a
52  * previous message.  3 should be good since we don't expect massive
53  * message reorderings with UDP.
54  */
55 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
56
57 /**
58  * We keep a defragmentation queue per sender address.  How many
59  * sender addresses do we support at the same time? Memory consumption
60  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
61  * value. (So 128 corresponds to 12 MB and should suffice for
62  * connecting to roughly 128 peers via UDP).
63  */
64 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
65
66
67 /**
68  * Closure for #append_port().
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * DLL
74    */
75   struct PrettyPrinterContext *next;
76
77   /**
78    * DLL
79    */
80   struct PrettyPrinterContext *prev;
81
82   /**
83    * Our plugin.
84    */
85   struct Plugin *plugin;
86
87   /**
88    * Resolver handle
89    */
90   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
91
92   /**
93    * Function to call with the result.
94    */
95   GNUNET_TRANSPORT_AddressStringCallback asc;
96
97   /**
98    * Clsoure for @e asc.
99    */
100   void *asc_cls;
101
102   /**
103    * Timeout task
104    */
105   struct GNUNET_SCHEDULER_Task *timeout_task;
106
107   /**
108    * Is this an IPv6 address?
109    */
110   int ipv6;
111
112   /**
113    * Options
114    */
115   uint32_t options;
116
117   /**
118    * Port to add after the IP address.
119    */
120   uint16_t port;
121
122 };
123
124
125 /**
126  * Session with another peer.
127  */
128 struct Session
129 {
130   /**
131    * Which peer is this session for?
132    */
133   struct GNUNET_PeerIdentity target;
134
135   /**
136    * Plugin this session belongs to.
137    */
138   struct Plugin *plugin;
139
140   /**
141    * Context for dealing with fragments.
142    */
143   struct UDP_FragmentationContext *frag_ctx;
144
145   /**
146    * Desired delay for next sending we send to other peer
147    */
148   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
149
150   /**
151    * Desired delay for next sending we received from other peer
152    */
153   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
154
155   /**
156    * Session timeout task
157    */
158   struct GNUNET_SCHEDULER_Task *timeout_task;
159
160   /**
161    * When does this session time out?
162    */
163   struct GNUNET_TIME_Absolute timeout;
164
165   /**
166    * expected delay for ACKs
167    */
168   struct GNUNET_TIME_Relative last_expected_ack_delay;
169
170   /**
171    * desired delay between UDP messages
172    */
173   struct GNUNET_TIME_Relative last_expected_msg_delay;
174
175   /**
176    * Our own address.
177    */
178   struct GNUNET_HELLO_Address *address;
179
180   /**
181    * Number of bytes waiting for transmission to this peer.
182    */
183   unsigned long long bytes_in_queue;
184
185   /**
186    * Number of messages waiting for transmission to this peer.
187    */
188   unsigned int msgs_in_queue;
189
190   /**
191    * Reference counter to indicate that this session is
192    * currently being used and must not be destroyed;
193    * setting @e in_destroy will destroy it as soon as
194    * possible.
195    */
196   unsigned int rc;
197
198   /**
199    * Network type of the address.
200    */
201   enum GNUNET_ATS_Network_Type scope;
202
203   /**
204    * Is this session about to be destroyed (sometimes we cannot
205    * destroy a session immediately as below us on the stack
206    * there might be code that still uses it; in this case,
207    * @e rc is non-zero).
208    */
209   int in_destroy;
210 };
211
212
213 /**
214  * Closure for #process_inbound_tokenized_messages().
215  */
216 struct SourceInformation
217 {
218   /**
219    * Sender identity.
220    */
221   struct GNUNET_PeerIdentity sender;
222
223   /**
224    * Associated session.
225    */
226   struct Session *session;
227
228 };
229
230 /**
231  * Closure for #find_receive_context().
232  */
233 struct FindReceiveContext
234 {
235   /**
236    * Where to store the result.
237    */
238   struct DefragContext *rc;
239
240   /**
241    * Session associated with this context.
242    */
243   struct Session *session;
244
245   /**
246    * Address to find.
247    */
248   const union UdpAddress *udp_addr;
249
250   /**
251    * Number of bytes in @e udp_addr.
252    */
253   size_t udp_addr_len;
254
255 };
256
257 /**
258  * Data structure to track defragmentation contexts based
259  * on the source of the UDP traffic.
260  */
261 struct DefragContext
262 {
263
264   /**
265    * Defragmentation context.
266    */
267   struct GNUNET_DEFRAGMENT_Context *defrag;
268
269   /**
270    * Reference to master plugin struct.
271    */
272   struct Plugin *plugin;
273
274   /**
275    * Node in the defrag heap.
276    */
277   struct GNUNET_CONTAINER_HeapNode *hnode;
278
279   /**
280    * Who's message(s) are we defragmenting here?
281    * Only initialized once we succeeded and
282    * @e have_sender is set.
283    */
284   struct GNUNET_PeerIdentity sender;
285
286   /**
287    * Source address this receive context is for (allocated at the
288    * end of the struct).
289    */
290   const union UdpAddress *udp_addr;
291
292   /**
293    * Length of @e udp_addr.
294    */
295   size_t udp_addr_len;
296
297   /**
298    * Network type the address belongs to.
299    */
300   enum GNUNET_ATS_Network_Type network_type;
301
302   /**
303    * Has the @e sender field been initialized yet?
304    */
305   int have_sender;
306 };
307
308
309 /**
310  * Context to send fragmented messages
311  */
312 struct UDP_FragmentationContext
313 {
314   /**
315    * Next in linked list
316    */
317   struct UDP_FragmentationContext *next;
318
319   /**
320    * Previous in linked list
321    */
322   struct UDP_FragmentationContext *prev;
323
324   /**
325    * The plugin
326    */
327   struct Plugin *plugin;
328
329   /**
330    * Handle for GNUNET_FRAGMENT context
331    */
332   struct GNUNET_FRAGMENT_Context *frag;
333
334   /**
335    * The session this fragmentation context belongs to
336    */
337   struct Session *session;
338
339   /**
340    * Function to call upon completion of the transmission.
341    */
342   GNUNET_TRANSPORT_TransmitContinuation cont;
343
344   /**
345    * Closure for @e cont.
346    */
347   void *cont_cls;
348
349   /**
350    * Message timeout
351    */
352   struct GNUNET_TIME_Absolute timeout;
353
354   /**
355    * Payload size of original unfragmented message
356    */
357   size_t payload_size;
358
359   /**
360    * Bytes used to send all fragments on wire including UDP overhead
361    */
362   size_t on_wire_size;
363
364   /**
365    * FIXME.
366    */
367   unsigned int fragments_used;
368
369 };
370
371
372 /**
373  * Message types included in a `struct UDP_MessageWrapper`
374  */
375 enum UDP_MessageType
376 {
377   /**
378    * Uninitialized (error)
379    */
380   UMT_UNDEFINED = 0,
381
382   /**
383    * Fragment of a message.
384    */
385   UMT_MSG_FRAGMENTED = 1,
386
387   /**
388    *
389    */
390   UMT_MSG_FRAGMENTED_COMPLETE = 2,
391
392   /**
393    * Unfragmented message.
394    */
395   UMT_MSG_UNFRAGMENTED = 3,
396
397   /**
398    * Receipt confirmation.
399    */
400   UMT_MSG_ACK = 4
401
402 };
403
404
405 /**
406  * Information we track for each message in the queue.
407  */
408 struct UDP_MessageWrapper
409 {
410   /**
411    * Session this message belongs to
412    */
413   struct Session *session;
414
415   /**
416    * DLL of messages
417    * previous element
418    */
419   struct UDP_MessageWrapper *prev;
420
421   /**
422    * DLL of messages
423    * previous element
424    */
425   struct UDP_MessageWrapper *next;
426
427   /**
428    * Message with size msg_size including UDP specific overhead
429    */
430   char *msg_buf;
431
432   /**
433    * Function to call upon completion of the transmission.
434    */
435   GNUNET_TRANSPORT_TransmitContinuation cont;
436
437   /**
438    * Closure for @e cont.
439    */
440   void *cont_cls;
441
442   /**
443    * Fragmentation context
444    * frag_ctx == NULL if transport <= MTU
445    * frag_ctx != NULL if transport > MTU
446    */
447   struct UDP_FragmentationContext *frag_ctx;
448
449   /**
450    * Message timeout
451    */
452   struct GNUNET_TIME_Absolute timeout;
453
454   /**
455    * Size of UDP message to send including UDP specific overhead
456    */
457   size_t msg_size;
458
459   /**
460    * Payload size of original message
461    */
462   size_t payload_size;
463
464   /**
465    * Message type
466    */
467   enum UDP_MessageType msg_type;
468
469 };
470
471
472 /**
473  * UDP ACK Message-Packet header.
474  */
475 struct UDP_ACK_Message
476 {
477   /**
478    * Message header.
479    */
480   struct GNUNET_MessageHeader header;
481
482   /**
483    * Desired delay for flow control
484    */
485   uint32_t delay;
486
487   /**
488    * What is the identity of the sender
489    */
490   struct GNUNET_PeerIdentity sender;
491
492 };
493
494
495 /**
496  * If a session monitor is attached, notify it about the new
497  * session state.
498  *
499  * @param plugin our plugin
500  * @param session session that changed state
501  * @param state new state of the session
502  */
503 static void
504 notify_session_monitor (struct Plugin *plugin,
505                         struct Session *session,
506                         enum GNUNET_TRANSPORT_SessionState state)
507 {
508   struct GNUNET_TRANSPORT_SessionInfo info;
509
510   if (NULL == plugin->sic)
511     return;
512   if (GNUNET_YES == session->in_destroy)
513     return; /* already destroyed, just RC>0 left-over actions */
514   memset (&info, 0, sizeof (info));
515   info.state = state;
516   info.is_inbound = GNUNET_SYSERR; /* hard to say */
517   info.num_msg_pending = session->msgs_in_queue;
518   info.num_bytes_pending = session->bytes_in_queue;
519   /* info.receive_delay remains zero as this is not supported by UDP
520      (cannot selectively not receive from 'some' peer while continuing
521      to receive from others) */
522   info.session_timeout = session->timeout;
523   info.address = session->address;
524   plugin->sic (plugin->sic_cls,
525                session,
526                &info);
527 }
528
529
530 /**
531  * We have been notified that our readset has something to read.  We don't
532  * know which socket needs to be read, so we have to check each one
533  * Then reschedule this function to be called again once more is available.
534  *
535  * @param cls the plugin handle
536  * @param tc the scheduling context (for rescheduling this function again)
537  */
538 static void
539 udp_plugin_select (void *cls,
540                    const struct GNUNET_SCHEDULER_TaskContext *tc);
541
542
543 /**
544  * We have been notified that our readset has something to read.  We don't
545  * know which socket needs to be read, so we have to check each one
546  * Then reschedule this function to be called again once more is available.
547  *
548  * @param cls the plugin handle
549  * @param tc the scheduling context (for rescheduling this function again)
550  */
551 static void
552 udp_plugin_select_v6 (void *cls,
553                       const struct GNUNET_SCHEDULER_TaskContext *tc);
554
555
556 /**
557  * (re)schedule select tasks for this plugin.
558  *
559  * @param plugin plugin to reschedule
560  */
561 static void
562 schedule_select (struct Plugin *plugin)
563 {
564   struct GNUNET_TIME_Relative min_delay;
565   struct UDP_MessageWrapper *udpw;
566
567   if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
568   {
569     /* Find a message ready to send:
570      * Flow delay from other peer is expired or not set (0) */
571     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
572     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
573       min_delay = GNUNET_TIME_relative_min (min_delay,
574           GNUNET_TIME_absolute_get_remaining (
575               udpw->session->flow_delay_from_other_peer));
576
577     if (plugin->select_task != NULL )
578       GNUNET_SCHEDULER_cancel (plugin->select_task);
579
580     /* Schedule with:
581      * - write active set if message is ready
582      * - timeout minimum delay */
583     plugin->select_task = GNUNET_SCHEDULER_add_select (
584         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
585         (0 == min_delay.rel_value_us) ?
586             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v4,
587         (0 == min_delay.rel_value_us) ? plugin->ws_v4 : NULL,
588         &udp_plugin_select, plugin);
589   }
590   if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
591   {
592     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
593     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
594       min_delay = GNUNET_TIME_relative_min (min_delay,
595           GNUNET_TIME_absolute_get_remaining (
596               udpw->session->flow_delay_from_other_peer));
597
598     if (NULL != plugin->select_task_v6)
599       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
600     plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (
601         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
602         (0 == min_delay.rel_value_us) ?
603             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v6,
604         (0 == min_delay.rel_value_us) ? plugin->ws_v6 : NULL,
605         &udp_plugin_select_v6, plugin);
606   }
607 }
608
609
610 /**
611  * Function called for a quick conversion of the binary address to
612  * a numeric address.  Note that the caller must not free the
613  * address and that the next call to this function is allowed
614  * to override the address again.
615  *
616  * @param cls closure
617  * @param addr binary address (a `union UdpAddress`)
618  * @param addrlen length of the @a addr
619  * @return string representing the same address
620  */
621 const char *
622 udp_address_to_string (void *cls,
623                        const void *addr,
624                        size_t addrlen)
625 {
626   static char rbuf[INET6_ADDRSTRLEN + 10];
627   char buf[INET6_ADDRSTRLEN];
628   const void *sb;
629   struct in_addr a4;
630   struct in6_addr a6;
631   const struct IPv4UdpAddress *t4;
632   const struct IPv6UdpAddress *t6;
633   int af;
634   uint16_t port;
635   uint32_t options;
636
637   if ((NULL != addr) && (addrlen == sizeof(struct IPv6UdpAddress)))
638   {
639     t6 = addr;
640     af = AF_INET6;
641     options = ntohl (t6->options);
642     port = ntohs (t6->u6_port);
643     memcpy (&a6, &t6->ipv6_addr, sizeof(a6));
644     sb = &a6;
645   }
646   else if ((NULL != addr) && (addrlen == sizeof(struct IPv4UdpAddress)))
647   {
648     t4 = addr;
649     af = AF_INET;
650     options = ntohl (t4->options);
651     port = ntohs (t4->u4_port);
652     memcpy (&a4, &t4->ipv4_addr, sizeof(a4));
653     sb = &a4;
654   }
655   else
656   {
657     return NULL;
658   }
659   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
660
661   GNUNET_snprintf (rbuf, sizeof(rbuf),
662                    (af == AF_INET6)
663                    ? "%s.%u.[%s]:%u"
664                    : "%s.%u.%s:%u",
665                    PLUGIN_NAME,
666                    options,
667                    buf,
668                    port);
669   return rbuf;
670 }
671
672
673 /**
674  * Function called to convert a string address to
675  * a binary address.
676  *
677  * @param cls closure (`struct Plugin *`)
678  * @param addr string address
679  * @param addrlen length of the address
680  * @param buf location to store the buffer
681  * @param added location to store the number of bytes in the buffer.
682  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
683  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
684  */
685 static int
686 udp_string_to_address (void *cls,
687                        const char *addr,
688                        uint16_t addrlen,
689                        void **buf,
690                        size_t *added)
691 {
692   struct sockaddr_storage socket_address;
693   char *address;
694   char *plugin;
695   char *optionstr;
696   uint32_t options;
697
698   /* Format tcp.options.address:port */
699   address = NULL;
700   plugin = NULL;
701   optionstr = NULL;
702
703   if ((NULL == addr) || (addrlen == 0))
704   {
705     GNUNET_break(0);
706     return GNUNET_SYSERR;
707   }
708   if ('\0' != addr[addrlen - 1])
709   {
710     GNUNET_break(0);
711     return GNUNET_SYSERR;
712   }
713   if (strlen (addr) != addrlen - 1)
714   {
715     GNUNET_break(0);
716     return GNUNET_SYSERR;
717   }
718   plugin = GNUNET_strdup (addr);
719   optionstr = strchr (plugin, '.');
720   if (NULL == optionstr)
721   {
722     GNUNET_break(0);
723     GNUNET_free(plugin);
724     return GNUNET_SYSERR;
725   }
726   optionstr[0] = '\0';
727   optionstr++;
728   options = atol (optionstr);
729   address = strchr (optionstr, '.');
730   if (NULL == address)
731   {
732     GNUNET_break(0);
733     GNUNET_free(plugin);
734     return GNUNET_SYSERR;
735   }
736   address[0] = '\0';
737   address++;
738
739   if (GNUNET_OK !=
740       GNUNET_STRINGS_to_address_ip (address, strlen (address),
741                                     &socket_address))
742   {
743     GNUNET_break(0);
744     GNUNET_free(plugin);
745     return GNUNET_SYSERR;
746   }
747
748   GNUNET_free(plugin);
749
750   switch (socket_address.ss_family)
751   {
752   case AF_INET:
753   {
754     struct IPv4UdpAddress *u4;
755     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
756     u4 = GNUNET_new (struct IPv4UdpAddress);
757     u4->options = htonl (options);
758     u4->ipv4_addr = in4->sin_addr.s_addr;
759     u4->u4_port = in4->sin_port;
760     *buf = u4;
761     *added = sizeof(struct IPv4UdpAddress);
762     return GNUNET_OK;
763   }
764   case AF_INET6:
765   {
766     struct IPv6UdpAddress *u6;
767     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
768     u6 = GNUNET_new (struct IPv6UdpAddress);
769     u6->options = htonl (options);
770     u6->ipv6_addr = in6->sin6_addr;
771     u6->u6_port = in6->sin6_port;
772     *buf = u6;
773     *added = sizeof(struct IPv6UdpAddress);
774     return GNUNET_OK;
775   }
776   default:
777     GNUNET_break(0);
778     return GNUNET_SYSERR;
779   }
780 }
781
782
783 /**
784  * Append our port and forward the result.
785  *
786  * @param cls a `struct PrettyPrinterContext *`
787  * @param hostname result from DNS resolver
788  */
789 static void
790 append_port (void *cls,
791              const char *hostname)
792 {
793   struct PrettyPrinterContext *ppc = cls;
794   struct Plugin *plugin = ppc->plugin;
795   char *ret;
796
797   if (NULL == hostname)
798   {
799     /* Final call, done */
800     ppc->asc (ppc->asc_cls,
801               NULL,
802               GNUNET_OK);
803     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
804                                  plugin->ppc_dll_tail,
805                                  ppc);
806     ppc->resolver_handle = NULL;
807     GNUNET_free (ppc);
808     return;
809   }
810   if (GNUNET_YES == ppc->ipv6)
811     GNUNET_asprintf (&ret,
812                      "%s.%u.[%s]:%d",
813                      PLUGIN_NAME,
814                      ppc->options,
815                      hostname,
816                      ppc->port);
817   else
818     GNUNET_asprintf (&ret,
819                      "%s.%u.%s:%d",
820                      PLUGIN_NAME,
821                      ppc->options,
822                      hostname,
823                      ppc->port);
824   ppc->asc (ppc->asc_cls,
825             ret,
826             GNUNET_OK);
827   GNUNET_free (ret);
828 }
829
830
831 /**
832  * Convert the transports address to a nice, human-readable
833  * format.
834  *
835  * @param cls closure with the `struct Plugin *`
836  * @param type name of the transport that generated the address
837  * @param addr one of the addresses of the host, NULL for the last address
838  *        the specific address format depends on the transport;
839  *        a `union UdpAddress`
840  * @param addrlen length of the address
841  * @param numeric should (IP) addresses be displayed in numeric form?
842  * @param timeout after how long should we give up?
843  * @param asc function to call on each string
844  * @param asc_cls closure for @a asc
845  */
846 static void
847 udp_plugin_address_pretty_printer (void *cls,
848                                    const char *type,
849                                    const void *addr,
850                                    size_t addrlen,
851                                    int numeric,
852                                    struct GNUNET_TIME_Relative timeout,
853                                    GNUNET_TRANSPORT_AddressStringCallback asc,
854                                    void *asc_cls)
855 {
856   struct Plugin *plugin = cls;
857   struct PrettyPrinterContext *ppc;
858   const void *sb;
859   size_t sbs;
860   struct sockaddr_in a4;
861   struct sockaddr_in6 a6;
862   const struct IPv4UdpAddress *u4;
863   const struct IPv6UdpAddress *u6;
864   uint16_t port;
865   uint32_t options;
866
867   if (addrlen == sizeof(struct IPv6UdpAddress))
868   {
869     u6 = addr;
870     memset (&a6, 0, sizeof(a6));
871     a6.sin6_family = AF_INET6;
872 #if HAVE_SOCKADDR_IN_SIN_LEN
873     a6.sin6_len = sizeof (a6);
874 #endif
875     a6.sin6_port = u6->u6_port;
876     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
877     port = ntohs (u6->u6_port);
878     options = ntohl (u6->options);
879     sb = &a6;
880     sbs = sizeof(a6);
881   }
882   else if (addrlen == sizeof(struct IPv4UdpAddress))
883   {
884     u4 = addr;
885     memset (&a4, 0, sizeof(a4));
886     a4.sin_family = AF_INET;
887 #if HAVE_SOCKADDR_IN_SIN_LEN
888     a4.sin_len = sizeof (a4);
889 #endif
890     a4.sin_port = u4->u4_port;
891     a4.sin_addr.s_addr = u4->ipv4_addr;
892     port = ntohs (u4->u4_port);
893     options = ntohl (u4->options);
894     sb = &a4;
895     sbs = sizeof(a4);
896   }
897   else
898   {
899     /* invalid address */
900     GNUNET_break_op (0);
901     asc (asc_cls, NULL , GNUNET_SYSERR);
902     asc (asc_cls, NULL, GNUNET_OK);
903     return;
904   }
905   ppc = GNUNET_new (struct PrettyPrinterContext);
906   ppc->plugin = plugin;
907   ppc->asc = asc;
908   ppc->asc_cls = asc_cls;
909   ppc->port = port;
910   ppc->options = options;
911   if (addrlen == sizeof(struct IPv6UdpAddress))
912     ppc->ipv6 = GNUNET_YES;
913   else
914     ppc->ipv6 = GNUNET_NO;
915   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
916                                plugin->ppc_dll_tail,
917                                ppc);
918   ppc->resolver_handle
919     = GNUNET_RESOLVER_hostname_get (sb,
920                                     sbs,
921                                     ! numeric,
922                                     timeout,
923                                     &append_port, ppc);
924 }
925
926
927 /**
928  * FIXME.
929  */
930 static void
931 call_continuation (struct UDP_MessageWrapper *udpw,
932                    int result)
933 {
934   struct Session *session = udpw->session;
935   struct Plugin *plugin = session->plugin;
936   size_t overhead;
937
938   LOG (GNUNET_ERROR_TYPE_DEBUG,
939        "Calling continuation for %u byte message to `%s' with result %s\n",
940        udpw->payload_size,
941        GNUNET_i2s (&udpw->session->target),
942        (GNUNET_OK == result) ? "OK" : "SYSERR");
943
944   if (udpw->msg_size >= udpw->payload_size)
945     overhead = udpw->msg_size - udpw->payload_size;
946   else
947     overhead = udpw->msg_size;
948
949   switch (result)
950   {
951   case GNUNET_OK:
952     switch (udpw->msg_type)
953     {
954     case UMT_MSG_UNFRAGMENTED:
955       if (NULL != udpw->cont)
956       {
957         /* Transport continuation */
958         udpw->cont (udpw->cont_cls,
959                     &udpw->session->target,
960                     result,
961                     udpw->payload_size,
962                     udpw->msg_size);
963       }
964       GNUNET_STATISTICS_update (plugin->env->stats,
965                                 "# UDP, unfragmented msgs, messages, sent, success",
966                                 1,
967                                 GNUNET_NO);
968       GNUNET_STATISTICS_update (plugin->env->stats,
969                                 "# UDP, unfragmented msgs, bytes payload, sent, success",
970                                 udpw->payload_size,
971                                 GNUNET_NO);
972       GNUNET_STATISTICS_update (plugin->env->stats,
973                                 "# UDP, unfragmented msgs, bytes overhead, sent, success",
974                                 overhead,
975                                 GNUNET_NO);
976       GNUNET_STATISTICS_update (plugin->env->stats,
977                                 "# UDP, total, bytes overhead, sent",
978                                 overhead,
979                                 GNUNET_NO);
980       GNUNET_STATISTICS_update (plugin->env->stats,
981                                 "# UDP, total, bytes payload, sent",
982                                 udpw->payload_size,
983                                 GNUNET_NO);
984       break;
985     case UMT_MSG_FRAGMENTED_COMPLETE:
986       GNUNET_assert(NULL != udpw->frag_ctx);
987       if (udpw->frag_ctx->cont != NULL )
988         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls,
989                               &udpw->session->target,
990                               GNUNET_OK,
991                               udpw->frag_ctx->payload_size,
992                               udpw->frag_ctx->on_wire_size);
993       GNUNET_STATISTICS_update (plugin->env->stats,
994                                 "# UDP, fragmented msgs, messages, sent, success",
995                                 1,
996                                 GNUNET_NO);
997       GNUNET_STATISTICS_update (plugin->env->stats,
998                                 "# UDP, fragmented msgs, bytes payload, sent, success",
999                                 udpw->payload_size,
1000                                 GNUNET_NO);
1001       GNUNET_STATISTICS_update (plugin->env->stats,
1002                                 "# UDP, fragmented msgs, bytes overhead, sent, success",
1003                                 overhead,
1004                                 GNUNET_NO);
1005       GNUNET_STATISTICS_update (plugin->env->stats,
1006                                 "# UDP, total, bytes overhead, sent",
1007                                 overhead,
1008                                 GNUNET_NO);
1009       GNUNET_STATISTICS_update (plugin->env->stats,
1010                                 "# UDP, total, bytes payload, sent",
1011                                 udpw->payload_size,
1012                                 GNUNET_NO);
1013       GNUNET_STATISTICS_update (plugin->env->stats,
1014                                 "# UDP, fragmented msgs, messages, pending",
1015                                 -1,
1016                                 GNUNET_NO);
1017       break;
1018     case UMT_MSG_FRAGMENTED:
1019       /* Fragmented message: enqueue next fragment */
1020       if (NULL != udpw->cont)
1021         udpw->cont (udpw->cont_cls,
1022                     &udpw->session->target,
1023                     result,
1024                     udpw->payload_size,
1025                     udpw->msg_size);
1026       GNUNET_STATISTICS_update (plugin->env->stats,
1027                                 "# UDP, fragmented msgs, fragments, sent, success",
1028                                 1,
1029                                 GNUNET_NO);
1030       GNUNET_STATISTICS_update (plugin->env->stats,
1031                                 "# UDP, fragmented msgs, fragments bytes, sent, success",
1032                                 udpw->msg_size,
1033                                 GNUNET_NO);
1034       break;
1035     case UMT_MSG_ACK:
1036       /* No continuation */
1037       GNUNET_STATISTICS_update (plugin->env->stats,
1038                                 "# UDP, ACK msgs, messages, sent, success",
1039                                 1,
1040                                 GNUNET_NO);
1041       GNUNET_STATISTICS_update (plugin->env->stats,
1042                                 "# UDP, ACK msgs, bytes overhead, sent, success",
1043                                 overhead,
1044                                 GNUNET_NO);
1045       GNUNET_STATISTICS_update (plugin->env->stats,
1046                                 "# UDP, total, bytes overhead, sent",
1047                                 overhead,
1048                                 GNUNET_NO);
1049       break;
1050     default:
1051       GNUNET_break(0);
1052       break;
1053     }
1054     break;
1055   case GNUNET_SYSERR:
1056     switch (udpw->msg_type)
1057     {
1058     case UMT_MSG_UNFRAGMENTED:
1059       /* Unfragmented message: failed to send */
1060       if (NULL != udpw->cont)
1061         udpw->cont (udpw->cont_cls,
1062                     &udpw->session->target,
1063                     result,
1064                     udpw->payload_size,
1065                     overhead);
1066       GNUNET_STATISTICS_update (plugin->env->stats,
1067                                 "# UDP, unfragmented msgs, messages, sent, failure",
1068                                 1,
1069                                 GNUNET_NO);
1070       GNUNET_STATISTICS_update (plugin->env->stats,
1071                                 "# UDP, unfragmented msgs, bytes payload, sent, failure",
1072                                 udpw->payload_size,
1073                                 GNUNET_NO);
1074       GNUNET_STATISTICS_update (plugin->env->stats,
1075                                 "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1076                                 overhead,
1077                                 GNUNET_NO);
1078       break;
1079     case UMT_MSG_FRAGMENTED_COMPLETE:
1080       GNUNET_assert (NULL != udpw->frag_ctx);
1081       if (udpw->frag_ctx->cont != NULL)
1082         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls,
1083                               &udpw->session->target,
1084                               GNUNET_SYSERR,
1085                               udpw->frag_ctx->payload_size,
1086                               udpw->frag_ctx->on_wire_size);
1087       GNUNET_STATISTICS_update (plugin->env->stats,
1088                                 "# UDP, fragmented msgs, messages, sent, failure",
1089                                 1,
1090                                 GNUNET_NO);
1091       GNUNET_STATISTICS_update (plugin->env->stats,
1092                                 "# UDP, fragmented msgs, bytes payload, sent, failure",
1093                                 udpw->payload_size,
1094                                 GNUNET_NO);
1095       GNUNET_STATISTICS_update (plugin->env->stats,
1096                                 "# UDP, fragmented msgs, bytes payload, sent, failure",
1097                                 overhead,
1098                                 GNUNET_NO);
1099       GNUNET_STATISTICS_update (plugin->env->stats,
1100                                 "# UDP, fragmented msgs, bytes payload, sent, failure",
1101                                 overhead,
1102                                 GNUNET_NO);
1103       GNUNET_STATISTICS_update (plugin->env->stats,
1104                                 "# UDP, fragmented msgs, messages, pending",
1105                                 -1,
1106                                 GNUNET_NO);
1107       break;
1108     case UMT_MSG_FRAGMENTED:
1109       GNUNET_assert (NULL != udpw->frag_ctx);
1110       /* Fragmented message: failed to send */
1111       GNUNET_STATISTICS_update (plugin->env->stats,
1112                                 "# UDP, fragmented msgs, fragments, sent, failure",
1113                                 1,
1114                                 GNUNET_NO);
1115       GNUNET_STATISTICS_update (plugin->env->stats,
1116                                 "# UDP, fragmented msgs, fragments bytes, sent, failure",
1117                                 udpw->msg_size,
1118                                 GNUNET_NO);
1119       break;
1120     case UMT_MSG_ACK:
1121       /* ACK message: failed to send */
1122       GNUNET_STATISTICS_update (plugin->env->stats,
1123                                 "# UDP, ACK msgs, messages, sent, failure",
1124                                 1,
1125                                 GNUNET_NO);
1126       break;
1127     default:
1128       GNUNET_break(0);
1129       break;
1130     }
1131     break;
1132   default:
1133     GNUNET_break(0);
1134     break;
1135   }
1136 }
1137
1138
1139 /**
1140  * Check if the given port is plausible (must be either our listen
1141  * port or our advertised port).  If it is neither, we return
1142  * #GNUNET_SYSERR.
1143  *
1144  * @param plugin global variables
1145  * @param in_port port number to check
1146  * @return #GNUNET_OK if port is either open_port or adv_port
1147  */
1148 static int
1149 check_port (struct Plugin *plugin,
1150             uint16_t in_port)
1151 {
1152   if ((in_port == plugin->port) || (in_port == plugin->aport))
1153     return GNUNET_OK;
1154   return GNUNET_SYSERR;
1155 }
1156
1157
1158 /**
1159  * Function that will be called to check if a binary address for this
1160  * plugin is well-formed and corresponds to an address for THIS peer
1161  * (as per our configuration).  Naturally, if absolutely necessary,
1162  * plugins can be a bit conservative in their answer, but in general
1163  * plugins should make sure that the address does not redirect
1164  * traffic to a 3rd party that might try to man-in-the-middle our
1165  * traffic.
1166  *
1167  * @param cls closure, should be our handle to the Plugin
1168  * @param addr pointer to a `union UdpAddress`
1169  * @param addrlen length of @a addr
1170  * @return #GNUNET_OK if this is a plausible address for this peer
1171  *         and transport, #GNUNET_SYSERR if not
1172  */
1173 static int
1174 udp_plugin_check_address (void *cls,
1175                           const void *addr,
1176                           size_t addrlen)
1177 {
1178   struct Plugin *plugin = cls;
1179   struct IPv4UdpAddress *v4;
1180   struct IPv6UdpAddress *v6;
1181
1182   if ( (addrlen != sizeof(struct IPv4UdpAddress)) &&
1183        (addrlen != sizeof(struct IPv6UdpAddress)) )
1184   {
1185     GNUNET_break_op(0);
1186     return GNUNET_SYSERR;
1187   }
1188   if (addrlen == sizeof(struct IPv4UdpAddress))
1189   {
1190     v4 = (struct IPv4UdpAddress *) addr;
1191     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1192       return GNUNET_SYSERR;
1193     if (GNUNET_OK !=
1194         GNUNET_NAT_test_address (plugin->nat,
1195                                  &v4->ipv4_addr,
1196                                  sizeof (struct in_addr)))
1197       return GNUNET_SYSERR;
1198   }
1199   else
1200   {
1201     v6 = (struct IPv6UdpAddress *) addr;
1202     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1203     {
1204       GNUNET_break_op(0);
1205       return GNUNET_SYSERR;
1206     }
1207     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
1208       return GNUNET_SYSERR;
1209     if (GNUNET_OK !=
1210         GNUNET_NAT_test_address (plugin->nat,
1211                                  &v6->ipv6_addr,
1212                                  sizeof(struct in6_addr)))
1213       return GNUNET_SYSERR;
1214   }
1215   return GNUNET_OK;
1216 }
1217
1218
1219 /**
1220  * Function to free last resources associated with a session.
1221  *
1222  * @param s session to free
1223  */
1224 static void
1225 free_session (struct Session *s)
1226 {
1227   if (NULL != s->frag_ctx)
1228   {
1229     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL );
1230     GNUNET_free(s->frag_ctx);
1231     s->frag_ctx = NULL;
1232   }
1233   GNUNET_free(s);
1234 }
1235
1236
1237 /**
1238  * Remove a message from the transmission queue.
1239  *
1240  * @param plugin the UDP plugin
1241  * @param udpw message wrapper to queue
1242  */
1243 static void
1244 dequeue (struct Plugin *plugin,
1245          struct UDP_MessageWrapper *udpw)
1246 {
1247   struct Session *session = udpw->session;
1248
1249   if (plugin->bytes_in_buffer < udpw->msg_size)
1250   {
1251     GNUNET_break (0);
1252   }
1253   else
1254   {
1255     GNUNET_STATISTICS_update (plugin->env->stats,
1256                               "# UDP, total, bytes in buffers",
1257                               -(long long) udpw->msg_size,
1258                               GNUNET_NO);
1259     plugin->bytes_in_buffer -= udpw->msg_size;
1260   }
1261   GNUNET_STATISTICS_update (plugin->env->stats,
1262                             "# UDP, total, msgs in buffers",
1263                             -1, GNUNET_NO);
1264   if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress))
1265     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1266                                  plugin->ipv4_queue_tail,
1267                                  udpw);
1268   else if (udpw->session->address->address_length == sizeof(struct IPv6UdpAddress))
1269     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1270                                  plugin->ipv6_queue_tail,
1271                                  udpw);
1272   else
1273   {
1274     GNUNET_break (0);
1275     return;
1276   }
1277   GNUNET_assert (session->msgs_in_queue > 0);
1278   session->msgs_in_queue--;
1279   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1280   session->bytes_in_queue -= udpw->msg_size;
1281 }
1282
1283
1284 /**
1285  * We have completed our (attempt) to transmit a message
1286  * that had to be fragmented -- either because we got an
1287  * ACK saying that all fragments were received, or because
1288  * of timeout / disconnect.  Clean up our state.
1289  *
1290  * @param fc fragmentation context to clean up
1291  * @param result #GNUNET_OK if we succeeded (got ACK),
1292  *               #GNUNET_SYSERR if the transmission failed
1293  */
1294 static void
1295 fragmented_message_done (struct UDP_FragmentationContext *fc,
1296                          int result)
1297 {
1298   struct Plugin *plugin = fc->plugin;
1299   struct Session *s = fc->session;
1300   struct UDP_MessageWrapper *udpw;
1301   struct UDP_MessageWrapper *tmp;
1302   struct UDP_MessageWrapper dummy;
1303
1304   LOG (GNUNET_ERROR_TYPE_DEBUG,
1305        "%p : Fragmented message removed with result %s\n",
1306        fc,
1307        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1308
1309   /* Call continuation for fragmented message */
1310   memset (&dummy, 0, sizeof(dummy));
1311   dummy.msg_type = UMT_MSG_FRAGMENTED_COMPLETE;
1312   dummy.msg_size = s->frag_ctx->on_wire_size;
1313   dummy.payload_size = s->frag_ctx->payload_size;
1314   dummy.frag_ctx = s->frag_ctx;
1315   dummy.cont = NULL;
1316   dummy.cont_cls = NULL;
1317   dummy.session = s;
1318   call_continuation (&dummy, result);
1319   /* Remove leftover fragments from queue */
1320   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1321   {
1322     udpw = plugin->ipv6_queue_head;
1323     while (NULL != udpw)
1324     {
1325       tmp = udpw->next;
1326       if ( (udpw->frag_ctx != NULL) &&
1327            (udpw->frag_ctx == s->frag_ctx) )
1328       {
1329         dequeue (plugin, udpw);
1330         call_continuation (udpw, GNUNET_SYSERR);
1331         GNUNET_free (udpw);
1332       }
1333       udpw = tmp;
1334     }
1335   }
1336   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1337   {
1338     udpw = plugin->ipv4_queue_head;
1339     while (udpw != NULL )
1340     {
1341       tmp = udpw->next;
1342       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1343       {
1344         dequeue (plugin, udpw);
1345         call_continuation (udpw, GNUNET_SYSERR);
1346         GNUNET_free(udpw);
1347       }
1348       udpw = tmp;
1349     }
1350   }
1351   notify_session_monitor (s->plugin,
1352                           s,
1353                           GNUNET_TRANSPORT_SS_UPDATE);
1354   /* Destroy fragmentation context */
1355   GNUNET_FRAGMENT_context_destroy (fc->frag,
1356                                    &s->last_expected_msg_delay,
1357                                    &s->last_expected_ack_delay);
1358   s->frag_ctx = NULL;
1359   GNUNET_free (fc);
1360 }
1361
1362
1363 /**
1364  * Scan the heap for a receive context with the given address.
1365  *
1366  * @param cls the `struct FindReceiveContext`
1367  * @param node internal node of the heap
1368  * @param element value stored at the node (a `struct ReceiveContext`)
1369  * @param cost cost associated with the node
1370  * @return #GNUNET_YES if we should continue to iterate,
1371  *         #GNUNET_NO if not.
1372  */
1373 static int
1374 find_receive_context (void *cls,
1375                       struct GNUNET_CONTAINER_HeapNode *node,
1376                       void *element,
1377                       GNUNET_CONTAINER_HeapCostType cost)
1378 {
1379   struct FindReceiveContext *frc = cls;
1380   struct DefragContext *e = element;
1381
1382   if ( (frc->udp_addr_len == e->udp_addr_len) &&
1383        (0 == memcmp (frc->udp_addr,
1384                      e->udp_addr,
1385                      frc->udp_addr_len)) )
1386   {
1387     frc->rc = e;
1388     return GNUNET_NO;
1389   }
1390   return GNUNET_YES;
1391 }
1392
1393
1394 /**
1395  * Functions with this signature are called whenever we need
1396  * to close a session due to a disconnect or failure to
1397  * establish a connection.
1398  *
1399  * @param cls closure with the `struct Plugin`
1400  * @param s session to close down
1401  * @return #GNUNET_OK on success
1402  */
1403 static int
1404 udp_disconnect_session (void *cls,
1405                         struct Session *s)
1406 {
1407   struct Plugin *plugin = cls;
1408   struct UDP_MessageWrapper *udpw;
1409   struct UDP_MessageWrapper *next;
1410   struct FindReceiveContext frc;
1411
1412   GNUNET_assert (GNUNET_YES != s->in_destroy);
1413   LOG (GNUNET_ERROR_TYPE_DEBUG,
1414        "Session %p to peer `%s' address ended\n",
1415        s,
1416        GNUNET_i2s (&s->target),
1417        udp_address_to_string (plugin,
1418                               s->address->address,
1419                               s->address->address_length));
1420   /* stop timeout task */
1421   if (NULL != s->timeout_task)
1422   {
1423     GNUNET_SCHEDULER_cancel (s->timeout_task);
1424     s->timeout_task = NULL;
1425   }
1426   if (NULL != s->frag_ctx)
1427   {
1428     /* Remove fragmented message due to disconnect */
1429     fragmented_message_done (s->frag_ctx,
1430                              GNUNET_SYSERR);
1431   }
1432
1433   frc.rc = NULL;
1434   frc.udp_addr = s->address->address;
1435   frc.udp_addr_len = s->address->address_length;
1436   /* Lookup existing receive context for this address */
1437   if (NULL != plugin->defrag_ctxs)
1438   {
1439     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1440                                    &find_receive_context,
1441                                    &frc);
1442     if (NULL != frc.rc)
1443     {
1444       struct DefragContext *d_ctx = frc.rc;
1445
1446       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
1447       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1448       GNUNET_free (d_ctx);
1449     }
1450   }
1451   next = plugin->ipv4_queue_head;
1452   while (NULL != (udpw = next))
1453   {
1454     next = udpw->next;
1455     if (udpw->session == s)
1456     {
1457       dequeue (plugin, udpw);
1458       call_continuation (udpw, GNUNET_SYSERR);
1459       GNUNET_free(udpw);
1460     }
1461   }
1462   next = plugin->ipv6_queue_head;
1463   while (NULL != (udpw = next))
1464   {
1465     next = udpw->next;
1466     if (udpw->session == s)
1467     {
1468       dequeue (plugin, udpw);
1469       call_continuation (udpw, GNUNET_SYSERR);
1470       GNUNET_free(udpw);
1471     }
1472   }
1473   notify_session_monitor (s->plugin,
1474                           s,
1475                           GNUNET_TRANSPORT_SS_DONE);
1476   plugin->env->session_end (plugin->env->cls,
1477                             s->address,
1478                             s);
1479
1480   if (NULL != s->frag_ctx)
1481   {
1482     if (NULL != s->frag_ctx->cont)
1483     {
1484       s->frag_ctx->cont (s->frag_ctx->cont_cls,
1485                          &s->target,
1486                          GNUNET_SYSERR,
1487                          s->frag_ctx->payload_size,
1488                          s->frag_ctx->on_wire_size);
1489       LOG (GNUNET_ERROR_TYPE_DEBUG,
1490            "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1491            GNUNET_i2s (&s->target));
1492     }
1493   }
1494
1495   GNUNET_assert (GNUNET_YES ==
1496                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
1497                                                        &s->target,
1498                                                        s));
1499   GNUNET_STATISTICS_set (plugin->env->stats,
1500                          "# UDP sessions active",
1501                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
1502                          GNUNET_NO);
1503   if (s->rc > 0)
1504   {
1505     s->in_destroy = GNUNET_YES;
1506   }
1507   else
1508   {
1509     GNUNET_HELLO_address_free (s->address);
1510     free_session (s);
1511   }
1512   return GNUNET_OK;
1513 }
1514
1515
1516 /**
1517  * Function that is called to get the keepalive factor.
1518  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
1519  * calculate the interval between keepalive packets.
1520  *
1521  * @param cls closure with the `struct Plugin`
1522  * @return keepalive factor
1523  */
1524 static unsigned int
1525 udp_query_keepalive_factor (void *cls)
1526 {
1527   return 15;
1528 }
1529
1530
1531 /**
1532  * Destroy a session, plugin is being unloaded.
1533  *
1534  * @param cls the `struct Plugin`
1535  * @param key hash of public key of target peer
1536  * @param value a `struct PeerSession *` to clean up
1537  * @return #GNUNET_OK (continue to iterate)
1538  */
1539 static int
1540 disconnect_and_free_it (void *cls,
1541                         const struct GNUNET_PeerIdentity *key,
1542                         void *value)
1543 {
1544   struct Plugin *plugin = cls;
1545
1546   udp_disconnect_session (plugin, value);
1547   return GNUNET_OK;
1548 }
1549
1550
1551 /**
1552  * Disconnect from a remote node.  Clean up session if we have one for
1553  * this peer.
1554  *
1555  * @param cls closure for this call (should be handle to Plugin)
1556  * @param target the peeridentity of the peer to disconnect
1557  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
1558  */
1559 static void
1560 udp_disconnect (void *cls,
1561                 const struct GNUNET_PeerIdentity *target)
1562 {
1563   struct Plugin *plugin = cls;
1564
1565   LOG (GNUNET_ERROR_TYPE_DEBUG,
1566        "Disconnecting from peer `%s'\n",
1567        GNUNET_i2s (target));
1568   /* Clean up sessions */
1569   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1570                                               target,
1571                                               &disconnect_and_free_it,
1572                                               plugin);
1573 }
1574
1575
1576 /**
1577  * Session was idle, so disconnect it
1578  *
1579  * @param cls the `struct Session` to time out
1580  * @param tc scheduler context
1581  */
1582 static void
1583 session_timeout (void *cls,
1584                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1585 {
1586   struct Session *s = cls;
1587   struct Plugin *plugin = s->plugin;
1588   struct GNUNET_TIME_Relative left;
1589
1590   s->timeout_task = NULL;
1591   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
1592   if (left.rel_value_us > 0)
1593   {
1594     /* not actually our turn yet, but let's at least update
1595        the monitor, it may think we're about to die ... */
1596     notify_session_monitor (s->plugin,
1597                             s,
1598                             GNUNET_TRANSPORT_SS_UPDATE);
1599     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
1600                                                     &session_timeout,
1601                                                     s);
1602     return;
1603   }
1604   LOG (GNUNET_ERROR_TYPE_DEBUG,
1605        "Session %p was idle for %s, disconnecting\n",
1606        s,
1607        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
1608                                                GNUNET_YES));
1609   /* call session destroy function */
1610   udp_disconnect_session (plugin, s);
1611 }
1612
1613
1614 /**
1615  * Increment session timeout due to activity
1616  *
1617  * @param s session to reschedule timeout activity for
1618  */
1619 static void
1620 reschedule_session_timeout (struct Session *s)
1621 {
1622   if (GNUNET_YES == s->in_destroy)
1623     return;
1624   GNUNET_assert(NULL != s->timeout_task);
1625   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1626 }
1627
1628
1629 /**
1630  * Function obtain the network type for a session
1631  *
1632  * @param cls closure (`struct Plugin *`)
1633  * @param session the session
1634  * @return the network type
1635  */
1636 static enum GNUNET_ATS_Network_Type
1637 udp_get_network (void *cls,
1638                  struct Session *session)
1639 {
1640   return session->scope;
1641 }
1642
1643
1644 /**
1645  * Closure for #session_cmp_it().
1646  */
1647 struct SessionCompareContext
1648 {
1649   /**
1650    * Set to session matching the address.
1651    */
1652   struct Session *res;
1653
1654   /**
1655    * Address we are looking for.
1656    */
1657   const struct GNUNET_HELLO_Address *address;
1658 };
1659
1660
1661 /**
1662  * Find a session with a matching address.
1663  *
1664  * @param cls the `struct SessionCompareContext *`
1665  * @param key peer identity (unused)
1666  * @param value the `struct Session *`
1667  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1668  */
1669 static int
1670 session_cmp_it (void *cls,
1671                 const struct GNUNET_PeerIdentity *key,
1672                 void *value)
1673 {
1674   struct SessionCompareContext *cctx = cls;
1675   const struct GNUNET_HELLO_Address *address = cctx->address;
1676   struct Session *s = value;
1677
1678   if (0 == GNUNET_HELLO_address_cmp (s->address,
1679                                      cctx->address))
1680   {
1681     cctx->res = s;
1682     return GNUNET_NO;
1683   }
1684   return GNUNET_YES;
1685 }
1686
1687
1688 /**
1689  * Locate an existing session the transport service is using to
1690  * send data to another peer.  Performs some basic sanity checks
1691  * on the address and then tries to locate a matching session.
1692  *
1693  * @param cls the plugin
1694  * @param address the address we should locate the session by
1695  * @return the session if it exists, or NULL if it is not found
1696  */
1697 static struct Session *
1698 udp_plugin_lookup_session (void *cls,
1699                            const struct GNUNET_HELLO_Address *address)
1700 {
1701   struct Plugin *plugin = cls;
1702   const struct IPv6UdpAddress *udp_a6;
1703   const struct IPv4UdpAddress *udp_a4;
1704   struct SessionCompareContext cctx;
1705
1706   if ( (NULL == address->address) ||
1707        ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1708         (address->address_length != sizeof (struct IPv6UdpAddress))))
1709   {
1710     LOG (GNUNET_ERROR_TYPE_WARNING,
1711          "Trying to locate session for address of unexpected length %u (should be %u or %u)\n",
1712          address->address_length,
1713          sizeof (struct IPv4UdpAddress),
1714          sizeof (struct IPv6UdpAddress));
1715     return NULL;
1716   }
1717
1718   if (address->address_length == sizeof(struct IPv4UdpAddress))
1719   {
1720     if (NULL == plugin->sockv4)
1721       return NULL;
1722     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1723     if (0 == udp_a4->u4_port)
1724       return NULL;
1725   }
1726
1727   if (address->address_length == sizeof(struct IPv6UdpAddress))
1728   {
1729     if (NULL == plugin->sockv6)
1730       return NULL;
1731     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1732     if (0 == udp_a6->u6_port)
1733       return NULL;
1734   }
1735
1736   /* check if session already exists */
1737   cctx.address = address;
1738   cctx.res = NULL;
1739   LOG (GNUNET_ERROR_TYPE_DEBUG,
1740        "Looking for existing session for peer `%s' `%s' \n",
1741        GNUNET_i2s (&address->peer),
1742        udp_address_to_string (plugin,
1743                               address->address,
1744                               address->address_length));
1745   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1746                                               &address->peer,
1747                                               &session_cmp_it,
1748                                               &cctx);
1749   if (NULL != cctx.res)
1750   {
1751     LOG (GNUNET_ERROR_TYPE_DEBUG,
1752          "Found existing session %p\n",
1753          cctx.res);
1754     return cctx.res;
1755   }
1756   return NULL;
1757 }
1758
1759
1760 /**
1761  * Allocate a new session for the given endpoint address.
1762  * Note that this function does not inform the service
1763  * of the new session, this is the responsibility of the
1764  * caller (if needed).
1765  *
1766  * @param cls the `struct Plugin`
1767  * @param address address of the other peer to use
1768  * @param network_type network type the address belongs to
1769  * @return NULL on error, otherwise session handle
1770  */
1771 static struct Session *
1772 udp_plugin_create_session (void *cls,
1773                            const struct GNUNET_HELLO_Address *address,
1774                            enum GNUNET_ATS_Network_Type network_type)
1775 {
1776   struct Plugin *plugin = cls;
1777   struct Session *s;
1778
1779   s = GNUNET_new (struct Session);
1780   s->plugin = plugin;
1781   s->address = GNUNET_HELLO_address_copy (address);
1782   s->target = address->peer;
1783   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1784                                                               250);
1785   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1786   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1787   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1788   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1789   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
1790                                                   &session_timeout, s);
1791   s->scope = network_type;
1792
1793   LOG (GNUNET_ERROR_TYPE_DEBUG,
1794        "Creating new session %p for peer `%s' address `%s'\n",
1795        s,
1796        GNUNET_i2s (&address->peer),
1797        udp_address_to_string (plugin,
1798                               address->address,
1799                               address->address_length));
1800   GNUNET_assert(GNUNET_OK ==
1801                 GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
1802                                                    &s->target,
1803                                                    s,
1804                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1805   GNUNET_STATISTICS_set (plugin->env->stats,
1806                          "# UDP sessions active",
1807                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
1808                          GNUNET_NO);
1809   return s;
1810 }
1811
1812
1813 /**
1814  * Function that will be called whenever the transport service wants to
1815  * notify the plugin that a session is still active and in use and
1816  * therefore the session timeout for this session has to be updated
1817  *
1818  * @param cls closure with the `struct Plugin`
1819  * @param peer which peer was the session for
1820  * @param session which session is being updated
1821  */
1822 static void
1823 udp_plugin_update_session_timeout (void *cls,
1824                                    const struct GNUNET_PeerIdentity *peer,
1825                                    struct Session *session)
1826 {
1827   struct Plugin *plugin = cls;
1828
1829   if (GNUNET_YES !=
1830       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1831                                                     peer,
1832                                                     session))
1833   {
1834     GNUNET_break(0);
1835     return;
1836   }
1837   /* Reschedule session timeout */
1838   reschedule_session_timeout (session);
1839 }
1840
1841
1842 /**
1843  * Creates a new outbound session the transport service will use to
1844  * send data to the peer.
1845  *
1846  * @param cls the `struct Plugin *`
1847  * @param address the address
1848  * @return the session or NULL of max connections exceeded
1849  */
1850 static struct Session *
1851 udp_plugin_get_session (void *cls,
1852                         const struct GNUNET_HELLO_Address *address)
1853 {
1854   struct Plugin *plugin = cls;
1855   struct Session *s;
1856   enum GNUNET_ATS_Network_Type network_type;
1857   struct IPv4UdpAddress *udp_v4;
1858   struct IPv6UdpAddress *udp_v6;
1859
1860   if (NULL == address)
1861   {
1862     GNUNET_break (0);
1863     return NULL;
1864   }
1865   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
1866        (address->address_length != sizeof(struct IPv6UdpAddress)) )
1867   {
1868     GNUNET_break_op (0);
1869     return NULL;
1870   }
1871   if (NULL != (s = udp_plugin_lookup_session (cls,
1872                                               address)))
1873     return s;
1874
1875   /* need to create new session */
1876   if (sizeof (struct IPv4UdpAddress) == address->address_length)
1877   {
1878     struct sockaddr_in v4;
1879
1880     udp_v4 = (struct IPv4UdpAddress *) address->address;
1881     memset (&v4, '\0', sizeof (v4));
1882     v4.sin_family = AF_INET;
1883 #if HAVE_SOCKADDR_IN_SIN_LEN
1884     v4.sin_len = sizeof (struct sockaddr_in);
1885 #endif
1886     v4.sin_port = udp_v4->u4_port;
1887     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
1888     network_type = plugin->env->get_address_type (plugin->env->cls,
1889                                                   (const struct sockaddr *) &v4,
1890                                                   sizeof (v4));
1891   }
1892   if (sizeof (struct IPv6UdpAddress) == address->address_length)
1893   {
1894     struct sockaddr_in6 v6;
1895
1896     udp_v6 = (struct IPv6UdpAddress *) address->address;
1897     memset (&v6, '\0', sizeof (v6));
1898     v6.sin6_family = AF_INET6;
1899 #if HAVE_SOCKADDR_IN_SIN_LEN
1900     v6.sin6_len = sizeof (struct sockaddr_in6);
1901 #endif
1902     v6.sin6_port = udp_v6->u6_port;
1903     v6.sin6_addr = udp_v6->ipv6_addr;
1904     network_type = plugin->env->get_address_type (plugin->env->cls,
1905                                                   (const struct sockaddr *) &v6,
1906                                                   sizeof (v6));
1907   }
1908   return udp_plugin_create_session (cls,
1909                                     address,
1910                                     network_type);
1911 }
1912
1913
1914 /**
1915  * Enqueue a message for transmission.
1916  *
1917  * @param plugin the UDP plugin
1918  * @param udpw message wrapper to queue
1919  */
1920 static void
1921 enqueue (struct Plugin *plugin,
1922          struct UDP_MessageWrapper *udpw)
1923 {
1924   struct Session *session = udpw->session;
1925
1926   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1927   {
1928     GNUNET_break (0);
1929   }
1930   else
1931   {
1932     GNUNET_STATISTICS_update (plugin->env->stats,
1933         "# UDP, total, bytes in buffers", udpw->msg_size, GNUNET_NO);
1934     plugin->bytes_in_buffer += udpw->msg_size;
1935   }
1936   GNUNET_STATISTICS_update (plugin->env->stats,
1937                             "# UDP, total, msgs in buffers",
1938                             1, GNUNET_NO);
1939   if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
1940     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1941                                 plugin->ipv4_queue_tail,
1942                                 udpw);
1943   else if (udpw->session->address->address_length == sizeof (struct IPv6UdpAddress))
1944     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1945                                  plugin->ipv6_queue_tail,
1946                                  udpw);
1947   else
1948   {
1949     GNUNET_break (0);
1950     return;
1951   }
1952   session->msgs_in_queue++;
1953   session->bytes_in_queue += udpw->msg_size;
1954 }
1955
1956
1957 /**
1958  * Fragment message was transmitted via UDP, let fragmentation know
1959  * to send the next fragment now.
1960  *
1961  * @param cls the `struct UDPMessageWrapper *` of the fragment
1962  * @param target destination peer (ignored)
1963  * @param result #GNUNET_OK on success (ignored)
1964  * @param payload bytes payload sent
1965  * @param physical bytes physical sent
1966  */
1967 static void
1968 send_next_fragment (void *cls,
1969                     const struct GNUNET_PeerIdentity *target,
1970                     int result,
1971                     size_t payload,
1972                     size_t physical)
1973 {
1974   struct UDP_MessageWrapper *udpw = cls;
1975
1976   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1977 }
1978
1979
1980 /**
1981  * Function that is called with messages created by the fragmentation
1982  * module.  In the case of the 'proc' callback of the
1983  * #GNUNET_FRAGMENT_context_create() function, this function must
1984  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1985  *
1986  * @param cls closure, the 'struct FragmentationContext'
1987  * @param msg the message that was created
1988  */
1989 static void
1990 enqueue_fragment (void *cls,
1991                   const struct GNUNET_MessageHeader *msg)
1992 {
1993   struct UDP_FragmentationContext *frag_ctx = cls;
1994   struct Plugin *plugin = frag_ctx->plugin;
1995   struct UDP_MessageWrapper * udpw;
1996   size_t msg_len = ntohs (msg->size);
1997
1998   LOG (GNUNET_ERROR_TYPE_DEBUG,
1999        "Enqueuing fragment with %u bytes\n",
2000        msg_len);
2001   frag_ctx->fragments_used++;
2002   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
2003   udpw->session = frag_ctx->session;
2004   udpw->msg_buf = (char *) &udpw[1];
2005   udpw->msg_size = msg_len;
2006   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
2007   udpw->cont = &send_next_fragment;
2008   udpw->cont_cls = udpw;
2009   udpw->timeout = frag_ctx->timeout;
2010   udpw->frag_ctx = frag_ctx;
2011   udpw->msg_type = UMT_MSG_FRAGMENTED;
2012   memcpy (udpw->msg_buf, msg, msg_len);
2013   enqueue (plugin, udpw);
2014   schedule_select (plugin);
2015 }
2016
2017
2018 /**
2019  * Function that can be used by the transport service to transmit
2020  * a message using the plugin.   Note that in the case of a
2021  * peer disconnecting, the continuation MUST be called
2022  * prior to the disconnect notification itself.  This function
2023  * will be called with this peer's HELLO message to initiate
2024  * a fresh connection to another peer.
2025  *
2026  * @param cls closure
2027  * @param s which session must be used
2028  * @param msgbuf the message to transmit
2029  * @param msgbuf_size number of bytes in 'msgbuf'
2030  * @param priority how important is the message (most plugins will
2031  *                 ignore message priority and just FIFO)
2032  * @param to how long to wait at most for the transmission (does not
2033  *                require plugins to discard the message after the timeout,
2034  *                just advisory for the desired delay; most plugins will ignore
2035  *                this as well)
2036  * @param cont continuation to call once the message has
2037  *        been transmitted (or if the transport is ready
2038  *        for the next transmission call; or if the
2039  *        peer disconnected...); can be NULL
2040  * @param cont_cls closure for cont
2041  * @return number of bytes used (on the physical network, with overheads);
2042  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
2043  *         and does NOT mean that the message was not transmitted (DV)
2044  */
2045 static ssize_t
2046 udp_plugin_send (void *cls,
2047                  struct Session *s,
2048                  const char *msgbuf,
2049                  size_t msgbuf_size,
2050                  unsigned int priority,
2051                  struct GNUNET_TIME_Relative to,
2052                  GNUNET_TRANSPORT_TransmitContinuation cont,
2053                  void *cont_cls)
2054 {
2055   struct Plugin *plugin = cls;
2056   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
2057   struct UDP_FragmentationContext * frag_ctx;
2058   struct UDP_MessageWrapper * udpw;
2059   struct UDPMessage *udp;
2060   char mbuf[udpmlen];
2061
2062   if ( (s->address->address_length == sizeof(struct IPv6UdpAddress)) &&
2063        (plugin->sockv6 == NULL) )
2064     return GNUNET_SYSERR;
2065   if ( (s->address->address_length == sizeof(struct IPv4UdpAddress)) &&
2066        (plugin->sockv4 == NULL) )
2067     return GNUNET_SYSERR;
2068   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2069   {
2070     GNUNET_break(0);
2071     return GNUNET_SYSERR;
2072   }
2073   if (GNUNET_YES !=
2074       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
2075                                                     &s->target,
2076                                                     s))
2077   {
2078     GNUNET_break(0);
2079     return GNUNET_SYSERR;
2080   }
2081   LOG (GNUNET_ERROR_TYPE_DEBUG,
2082        "UDP transmits %u-byte message to `%s' using address `%s'\n",
2083        udpmlen,
2084        GNUNET_i2s (&s->target),
2085        udp_address_to_string (plugin,
2086                               s->address->address,
2087                               s->address->address_length));
2088
2089   /* Message */
2090   udp = (struct UDPMessage *) mbuf;
2091   udp->header.size = htons (udpmlen);
2092   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2093   udp->reserved = htonl (0);
2094   udp->sender = *plugin->env->my_identity;
2095
2096   /* We do not update the session time out here!
2097    * Otherwise this session will not timeout since we send keep alive before
2098    * session can timeout
2099    *
2100    * For UDP we update session timeout only on receive, this will cover keep
2101    * alives, since remote peer will reply with keep alive response!
2102    */
2103   if (udpmlen <= UDP_MTU)
2104   {
2105     /* unfragmented message */
2106     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
2107     udpw->session = s;
2108     udpw->msg_buf = (char *) &udpw[1];
2109     udpw->msg_size = udpmlen; /* message size with UDP overhead */
2110     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2111     udpw->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), to);
2112     udpw->cont = cont;
2113     udpw->cont_cls = cont_cls;
2114     udpw->frag_ctx = NULL;
2115     udpw->msg_type = UMT_MSG_UNFRAGMENTED;
2116     memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage));
2117     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)], msgbuf, msgbuf_size);
2118     enqueue (plugin, udpw);
2119
2120     GNUNET_STATISTICS_update (plugin->env->stats,
2121         "# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO);
2122     GNUNET_STATISTICS_update (plugin->env->stats,
2123                               "# UDP, unfragmented msgs, bytes payload, attempt",
2124                               udpw->payload_size,
2125                               GNUNET_NO);
2126   }
2127   else
2128   {
2129     /* fragmented message */
2130     if (s->frag_ctx != NULL)
2131       return GNUNET_SYSERR;
2132     memcpy (&udp[1], msgbuf, msgbuf_size);
2133     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2134     frag_ctx->plugin = plugin;
2135     frag_ctx->session = s;
2136     frag_ctx->cont = cont;
2137     frag_ctx->cont_cls = cont_cls;
2138     frag_ctx->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
2139         to);
2140     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
2141     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2142     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2143                                                      UDP_MTU,
2144                                                      &plugin->tracker,
2145                                                      s->last_expected_msg_delay,
2146                                                      s->last_expected_ack_delay,
2147                                                      &udp->header,
2148                                                      &enqueue_fragment,
2149                                                      frag_ctx);
2150     s->frag_ctx = frag_ctx;
2151     GNUNET_STATISTICS_update (plugin->env->stats,
2152                               "# UDP, fragmented msgs, messages, pending",
2153                               1,
2154                               GNUNET_NO);
2155     GNUNET_STATISTICS_update (plugin->env->stats,
2156                               "# UDP, fragmented msgs, messages, attempt",
2157                               1,
2158                               GNUNET_NO);
2159     GNUNET_STATISTICS_update (plugin->env->stats,
2160                               "# UDP, fragmented msgs, bytes payload, attempt",
2161                               frag_ctx->payload_size,
2162                               GNUNET_NO);
2163   }
2164   notify_session_monitor (s->plugin,
2165                           s,
2166                           GNUNET_TRANSPORT_SS_UPDATE);
2167   schedule_select (plugin);
2168   return udpmlen;
2169 }
2170
2171
2172 /**
2173  * Our external IP address/port mapping has changed.
2174  *
2175  * @param cls closure, the `struct LocalAddrList`
2176  * @param add_remove #GNUNET_YES to mean the new public IP address, #GNUNET_NO to mean
2177  *     the previous (now invalid) one
2178  * @param addr either the previous or the new public IP address
2179  * @param addrlen actual lenght of the address
2180  */
2181 static void
2182 udp_nat_port_map_callback (void *cls,
2183                            int add_remove,
2184                            const struct sockaddr *addr,
2185                            socklen_t addrlen)
2186 {
2187   struct Plugin *plugin = cls;
2188   struct GNUNET_HELLO_Address *address;
2189   struct IPv4UdpAddress u4;
2190   struct IPv6UdpAddress u6;
2191   void *arg;
2192   size_t args;
2193
2194   LOG (GNUNET_ERROR_TYPE_INFO,
2195        "NAT notification to %s address `%s'\n",
2196        (GNUNET_YES == add_remove) ? "add" : "remove",
2197        GNUNET_a2s (addr, addrlen));
2198
2199   /* convert 'address' to our internal format */
2200   switch (addr->sa_family)
2201   {
2202   case AF_INET:
2203     GNUNET_assert(addrlen == sizeof(struct sockaddr_in));
2204     memset (&u4, 0, sizeof(u4));
2205     u4.options = htonl (plugin->myoptions);
2206     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
2207     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
2208     if (0 == ((struct sockaddr_in *) addr)->sin_port)
2209       return;
2210     arg = &u4;
2211     args = sizeof(struct IPv4UdpAddress);
2212     break;
2213   case AF_INET6:
2214     GNUNET_assert(addrlen == sizeof(struct sockaddr_in6));
2215     memset (&u6, 0, sizeof(u6));
2216     u6.options = htonl (plugin->myoptions);
2217     if (0 == ((struct sockaddr_in6 *) addr)->sin6_port)
2218       return;
2219     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
2220         sizeof(struct in6_addr));
2221     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
2222     arg = &u6;
2223     args = sizeof(struct IPv6UdpAddress);
2224     break;
2225   default:
2226     GNUNET_break(0);
2227     return;
2228   }
2229   /* modify our published address list */
2230   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
2231                                            PLUGIN_NAME,
2232                                            arg, args,
2233                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2234   plugin->env->notify_address (plugin->env->cls, add_remove, address);
2235   GNUNET_HELLO_address_free (address);
2236 }
2237
2238
2239 /**
2240  * Message tokenizer has broken up an incomming message. Pass it on
2241  * to the service.
2242  *
2243  * @param cls the `struct Plugin *`
2244  * @param client the `struct SourceInformation *`
2245  * @param hdr the actual message
2246  * @return #GNUNET_OK (always)
2247  */
2248 static int
2249 process_inbound_tokenized_messages (void *cls,
2250                                     void *client,
2251                                     const struct GNUNET_MessageHeader *hdr)
2252 {
2253   struct Plugin *plugin = cls;
2254   struct SourceInformation *si = client;
2255   struct GNUNET_TIME_Relative delay;
2256
2257   GNUNET_assert (NULL != si->session);
2258   if (GNUNET_YES == si->session->in_destroy)
2259     return GNUNET_OK;
2260   /* setup ATS */
2261   reschedule_session_timeout (si->session);
2262   delay = plugin->env->receive (plugin->env->cls,
2263                                 si->session->address,
2264                                 si->session,
2265                                 hdr);
2266   si->session->flow_delay_for_other_peer = delay;
2267   return GNUNET_OK;
2268 }
2269
2270
2271 /**
2272  * We've received a UDP Message.  Process it (pass contents to main service).
2273  *
2274  * @param plugin plugin context
2275  * @param msg the message
2276  * @param udp_addr sender address
2277  * @param udp_addr_len number of bytes in @a udp_addr
2278  * @param network_type network type the address belongs to
2279  */
2280 static void
2281 process_udp_message (struct Plugin *plugin,
2282                      const struct UDPMessage *msg,
2283                      const union UdpAddress *udp_addr,
2284                      size_t udp_addr_len,
2285                      enum GNUNET_ATS_Network_Type network_type)
2286 {
2287   struct SourceInformation si;
2288   struct Session *s;
2289   struct GNUNET_HELLO_Address *address;
2290
2291   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2292   if (0 != ntohl (msg->reserved))
2293   {
2294     GNUNET_break_op(0);
2295     return;
2296   }
2297   if (ntohs (msg->header.size)
2298       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2299   {
2300     GNUNET_break_op(0);
2301     return;
2302   }
2303
2304   address = GNUNET_HELLO_address_allocate (&msg->sender,
2305                                            PLUGIN_NAME,
2306                                            udp_addr,
2307                                            udp_addr_len,
2308                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2309   if (NULL ==
2310       (s = udp_plugin_lookup_session (plugin, address)))
2311   {
2312     s = udp_plugin_create_session (plugin,
2313                                    address,
2314                                    network_type);
2315     plugin->env->session_start (plugin->env->cls,
2316                                 address,
2317                                 s,
2318                                 s->scope);
2319     notify_session_monitor (s->plugin,
2320                             s,
2321                             GNUNET_TRANSPORT_SS_INIT);
2322     notify_session_monitor (s->plugin,
2323                             s,
2324                             GNUNET_TRANSPORT_SS_UP);
2325   }
2326   GNUNET_free (address);
2327
2328   /* iterate over all embedded messages */
2329   si.session = s;
2330   si.sender = msg->sender;
2331   s->rc++;
2332   GNUNET_SERVER_mst_receive (plugin->mst,
2333                              &si,
2334                              (const char *) &msg[1],
2335                              ntohs (msg->header.size) - sizeof(struct UDPMessage),
2336                              GNUNET_YES,
2337                              GNUNET_NO);
2338   s->rc--;
2339   if ((0 == s->rc) && (GNUNET_YES == s->in_destroy))
2340     free_session (s);
2341 }
2342
2343
2344 /**
2345  * Process a defragmented message.
2346  *
2347  * @param cls the `struct DefragContext *`
2348  * @param msg the message
2349  */
2350 static void
2351 fragment_msg_proc (void *cls,
2352                    const struct GNUNET_MessageHeader *msg)
2353 {
2354   struct DefragContext *rc = cls;
2355   const struct UDPMessage *um;
2356
2357   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2358   {
2359     GNUNET_break(0);
2360     return;
2361   }
2362   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2363   {
2364     GNUNET_break(0);
2365     return;
2366   }
2367   um = (const struct UDPMessage *) msg;
2368   rc->sender = um->sender;
2369   rc->have_sender = GNUNET_YES;
2370   process_udp_message (rc->plugin,
2371                        um,
2372                        rc->udp_addr,
2373                        rc->udp_addr_len,
2374                        rc->network_type);
2375 }
2376
2377
2378 /**
2379  * Transmit an acknowledgement.
2380  *
2381  * @param cls the `struct DefragContext *`
2382  * @param id message ID (unused)
2383  * @param msg ack to transmit
2384  */
2385 static void
2386 ack_proc (void *cls,
2387           uint32_t id,
2388           const struct GNUNET_MessageHeader *msg)
2389 {
2390   struct DefragContext *rc = cls;
2391   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2392   struct UDP_ACK_Message *udp_ack;
2393   uint32_t delay = 0;
2394   struct UDP_MessageWrapper *udpw;
2395   struct Session *s;
2396   struct GNUNET_HELLO_Address *address;
2397
2398   if (GNUNET_NO == rc->have_sender)
2399   {
2400     /* tried to defragment but never succeeded, hence will not ACK */
2401     GNUNET_break_op (0);
2402     return;
2403   }
2404   address = GNUNET_HELLO_address_allocate (&rc->sender,
2405                                            PLUGIN_NAME,
2406                                            rc->udp_addr,
2407                                            rc->udp_addr_len,
2408                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2409   s = udp_plugin_lookup_session (rc->plugin,
2410                                  address);
2411   GNUNET_HELLO_address_free (address);
2412   if (NULL == s)
2413   {
2414     LOG (GNUNET_ERROR_TYPE_ERROR,
2415          "Trying to transmit ACK to peer `%s' but no session found!\n",
2416          udp_address_to_string (rc->plugin,
2417                                 rc->udp_addr,
2418                                 rc->udp_addr_len));
2419     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2420     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2421     GNUNET_free (rc);
2422     return;
2423   }
2424   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
2425     delay = s->flow_delay_for_other_peer.rel_value_us;
2426
2427   LOG (GNUNET_ERROR_TYPE_DEBUG,
2428        "Sending ACK to `%s' including delay of %s\n",
2429        udp_address_to_string (rc->plugin,
2430                               rc->udp_addr,
2431                               rc->udp_addr_len),
2432        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2433                                                GNUNET_YES));
2434   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2435   udpw->msg_size = msize;
2436   udpw->payload_size = 0;
2437   udpw->session = s;
2438   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2439   udpw->msg_buf = (char *) &udpw[1];
2440   udpw->msg_type = UMT_MSG_ACK;
2441   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2442   udp_ack->header.size = htons ((uint16_t) msize);
2443   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2444   udp_ack->delay = htonl (delay);
2445   udp_ack->sender = *rc->plugin->env->my_identity;
2446   memcpy (&udp_ack[1], msg, ntohs (msg->size));
2447   enqueue (rc->plugin, udpw);
2448   notify_session_monitor (s->plugin,
2449                           s,
2450                           GNUNET_TRANSPORT_SS_UPDATE);
2451   schedule_select (rc->plugin);
2452 }
2453
2454
2455 /**
2456  * Handle an ACK message.
2457  *
2458  * @param plugin the UDP plugin
2459  * @param msg the (presumed) UDP ACK message
2460  * @param udp_addr sender address
2461  * @param udp_addr_len number of bytes in @a udp_addr
2462  */
2463 static void
2464 read_process_ack (struct Plugin *plugin,
2465                   const struct GNUNET_MessageHeader *msg,
2466                   const union UdpAddress *udp_addr,
2467                   socklen_t udp_addr_len)
2468 {
2469   const struct GNUNET_MessageHeader *ack;
2470   const struct UDP_ACK_Message *udp_ack;
2471   struct GNUNET_HELLO_Address *address;
2472   struct Session *s;
2473   struct GNUNET_TIME_Relative flow_delay;
2474
2475   if (ntohs (msg->size)
2476       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2477   {
2478     GNUNET_break_op(0);
2479     return;
2480   }
2481   udp_ack = (const struct UDP_ACK_Message *) msg;
2482   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2483                                            PLUGIN_NAME,
2484                                            udp_addr,
2485                                            udp_addr_len,
2486                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2487   s = udp_plugin_lookup_session (plugin,
2488                                  address);
2489   if (NULL == s)
2490   {
2491     LOG (GNUNET_ERROR_TYPE_WARNING,
2492          "UDP session of address %s for ACK not found\n",
2493          udp_address_to_string (plugin,
2494                                 address->address,
2495                                 address->address_length));
2496     GNUNET_HELLO_address_free (address);
2497     return;
2498   }
2499   if (NULL == s->frag_ctx)
2500   {
2501     LOG (GNUNET_ERROR_TYPE_WARNING,
2502          "Fragmentation context of address %s for ACK not found\n",
2503          udp_address_to_string (plugin,
2504                                 address->address,
2505                                 address->address_length));
2506     GNUNET_HELLO_address_free (address);
2507     return;
2508   }
2509   GNUNET_HELLO_address_free (address);
2510
2511   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2512   LOG (GNUNET_ERROR_TYPE_DEBUG,
2513        "We received a sending delay of %s\n",
2514        GNUNET_STRINGS_relative_time_to_string (flow_delay,
2515                                                GNUNET_YES));
2516   s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
2517
2518   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2519   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2520   {
2521     GNUNET_break_op(0);
2522     return;
2523   }
2524
2525   if (GNUNET_OK !=
2526       GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2527                                    ack))
2528   {
2529     LOG(GNUNET_ERROR_TYPE_DEBUG,
2530         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2531         (unsigned int ) ntohs (msg->size),
2532         GNUNET_i2s (&udp_ack->sender),
2533         udp_address_to_string (plugin,
2534                                udp_addr,
2535                                udp_addr_len));
2536     /* Expect more ACKs to arrive */
2537     return;
2538   }
2539
2540   LOG (GNUNET_ERROR_TYPE_DEBUG,
2541        "Message full ACK'ed\n",
2542        (unsigned int ) ntohs (msg->size),
2543        GNUNET_i2s (&udp_ack->sender),
2544        udp_address_to_string (plugin,
2545                               udp_addr,
2546                               udp_addr_len));
2547
2548   /* Remove fragmented message after successful sending */
2549   fragmented_message_done (s->frag_ctx,
2550                            GNUNET_OK);
2551 }
2552
2553
2554 /**
2555  * We received a fragment, process it.
2556  *
2557  * @param plugin our plugin
2558  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2559  * @param udp_addr sender address
2560  * @param udp_addr_len number of bytes in @a udp_addr
2561  * @param network_type network type the address belongs to
2562  */
2563 static void
2564 read_process_fragment (struct Plugin *plugin,
2565                        const struct GNUNET_MessageHeader *msg,
2566                        const union UdpAddress *udp_addr,
2567                        size_t udp_addr_len,
2568                        enum GNUNET_ATS_Network_Type network_type)
2569 {
2570   struct DefragContext *d_ctx;
2571   struct GNUNET_TIME_Absolute now;
2572   struct FindReceiveContext frc;
2573
2574   frc.rc = NULL;
2575   frc.udp_addr = udp_addr;
2576   frc.udp_addr_len = udp_addr_len;
2577
2578   /* Lookup existing receive context for this address */
2579   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2580                                  &find_receive_context,
2581                                  &frc);
2582   now = GNUNET_TIME_absolute_get ();
2583   d_ctx = frc.rc;
2584
2585   if (NULL == d_ctx)
2586   {
2587     /* Create a new defragmentation context */
2588     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
2589     memcpy (&d_ctx[1],
2590             udp_addr,
2591             udp_addr_len);
2592     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2593     d_ctx->udp_addr_len = udp_addr_len;
2594     d_ctx->network_type = network_type;
2595     d_ctx->plugin = plugin;
2596     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2597                                                       UDP_MTU,
2598                                                       UDP_MAX_MESSAGES_IN_DEFRAG,
2599                                                       d_ctx,
2600                                                       &fragment_msg_proc,
2601                                                       &ack_proc);
2602     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2603                                                  d_ctx,
2604         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2605     LOG (GNUNET_ERROR_TYPE_DEBUG,
2606          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2607          (unsigned int ) ntohs (msg->size),
2608          udp_address_to_string (plugin,
2609                                 udp_addr,
2610                                 udp_addr_len));
2611   }
2612   else
2613   {
2614     LOG (GNUNET_ERROR_TYPE_DEBUG,
2615          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2616          (unsigned int ) ntohs (msg->size),
2617          udp_address_to_string (plugin,
2618                                 udp_addr,
2619                                 udp_addr_len));
2620   }
2621
2622   if (GNUNET_OK ==
2623       GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2624   {
2625     /* keep this 'rc' from expiring */
2626     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
2627                                        d_ctx->hnode,
2628         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2629   }
2630   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2631       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2632   {
2633     /* remove 'rc' that was inactive the longest */
2634     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2635     GNUNET_assert (NULL != d_ctx);
2636     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2637     GNUNET_free (d_ctx);
2638   }
2639 }
2640
2641
2642 /**
2643  * Read and process a message from the given socket.
2644  *
2645  * @param plugin the overall plugin
2646  * @param rsock socket to read from
2647  */
2648 static void
2649 udp_select_read (struct Plugin *plugin,
2650                  struct GNUNET_NETWORK_Handle *rsock)
2651 {
2652   socklen_t fromlen;
2653   struct sockaddr_storage addr;
2654   char buf[65536] GNUNET_ALIGN;
2655   ssize_t size;
2656   const struct GNUNET_MessageHeader *msg;
2657   struct IPv4UdpAddress v4;
2658   struct IPv6UdpAddress v6;
2659   const struct sockaddr *sa;
2660   const struct sockaddr_in *sa4;
2661   const struct sockaddr_in6 *sa6;
2662   const union UdpAddress *int_addr;
2663   size_t int_addr_len;
2664   enum GNUNET_ATS_Network_Type network_type;
2665
2666   fromlen = sizeof(addr);
2667   memset (&addr, 0, sizeof(addr));
2668   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof(buf),
2669                                          (struct sockaddr *) &addr, &fromlen);
2670   sa = (const struct sockaddr *) &addr;
2671 #if MINGW
2672   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2673    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2674    * on this socket has failed.
2675    * Quote from MSDN:
2676    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2677    *   executing a hard or abortive close. The application should close
2678    *   the socket; it is no longer usable. On a UDP-datagram socket this
2679    *   error indicates a previous send operation resulted in an ICMP Port
2680    *   Unreachable message.
2681    */
2682   if ( (-1 == size) && (ECONNRESET == errno) )
2683   return;
2684 #endif
2685   if (-1 == size)
2686   {
2687     LOG (GNUNET_ERROR_TYPE_DEBUG,
2688          "UDP failed to receive data: %s\n",
2689          STRERROR (errno));
2690     /* Connection failure or something. Not a protocol violation. */
2691     return;
2692   }
2693   if (size < sizeof(struct GNUNET_MessageHeader))
2694   {
2695     LOG (GNUNET_ERROR_TYPE_WARNING,
2696          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
2697          (unsigned int ) size,
2698          GNUNET_a2s (sa, fromlen));
2699     /* _MAY_ be a connection failure (got partial message) */
2700     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2701     GNUNET_break_op(0);
2702     return;
2703   }
2704   msg = (const struct GNUNET_MessageHeader *) buf;
2705   LOG (GNUNET_ERROR_TYPE_DEBUG,
2706        "UDP received %u-byte message from `%s' type %u\n",
2707        (unsigned int) size,
2708        GNUNET_a2s (sa,
2709                    fromlen),
2710        ntohs (msg->type));
2711   if (size != ntohs (msg->size))
2712   {
2713     LOG (GNUNET_ERROR_TYPE_WARNING,
2714          "UDP malformed message header from %s\n",
2715          (unsigned int) size,
2716          GNUNET_a2s (sa,
2717                      fromlen));
2718     GNUNET_break_op (0);
2719     return;
2720   }
2721   GNUNET_STATISTICS_update (plugin->env->stats,
2722                             "# UDP, total, bytes, received",
2723                             size,
2724                             GNUNET_NO);
2725   network_type = plugin->env->get_address_type (plugin->env->cls,
2726                                                 sa,
2727                                                 fromlen);
2728   switch (sa->sa_family)
2729   {
2730   case AF_INET:
2731     sa4 = (const struct sockaddr_in *) &addr;
2732     v4.options = 0;
2733     v4.ipv4_addr = sa4->sin_addr.s_addr;
2734     v4.u4_port = sa4->sin_port;
2735     int_addr = (union UdpAddress *) &v4;
2736     int_addr_len = sizeof (v4);
2737     break;
2738   case AF_INET6:
2739     sa6 = (const struct sockaddr_in6 *) &addr;
2740     v6.options = 0;
2741     v6.ipv6_addr = sa6->sin6_addr;
2742     v6.u6_port = sa6->sin6_port;
2743     int_addr = (union UdpAddress *) &v6;
2744     int_addr_len = sizeof (v6);
2745     break;
2746   default:
2747     GNUNET_break (0);
2748     return;
2749   }
2750
2751   switch (ntohs (msg->type))
2752   {
2753   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2754     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
2755       udp_broadcast_receive (plugin,
2756                              buf,
2757                              size,
2758                              int_addr,
2759                              int_addr_len,
2760                              network_type);
2761     return;
2762   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2763     if (ntohs (msg->size) < sizeof(struct UDPMessage))
2764     {
2765       GNUNET_break_op(0);
2766       return;
2767     }
2768     process_udp_message (plugin,
2769                          (const struct UDPMessage *) msg,
2770                          int_addr,
2771                          int_addr_len,
2772                          network_type);
2773     return;
2774   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2775     read_process_ack (plugin,
2776                       msg,
2777                       int_addr,
2778                       int_addr_len);
2779     return;
2780   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2781     read_process_fragment (plugin,
2782                            msg,
2783                            int_addr,
2784                            int_addr_len,
2785                            network_type);
2786     return;
2787   default:
2788     GNUNET_break_op(0);
2789     return;
2790   }
2791 }
2792
2793
2794 /**
2795  * Removes messages from the transmission queue that have
2796  * timed out, and then selects a message that should be
2797  * transmitted next.
2798  *
2799  * @param plugin the UDP plugin
2800  * @param sock which socket should we process the queue for (v4 or v6)
2801  * @return message selected for transmission, or NULL for none
2802  */
2803 static struct UDP_MessageWrapper *
2804 remove_timeout_messages_and_select (struct Plugin *plugin,
2805                                     struct GNUNET_NETWORK_Handle *sock)
2806 {
2807   struct UDP_MessageWrapper *udpw = NULL;
2808   struct GNUNET_TIME_Relative remaining;
2809   struct Session *session;
2810   int removed;
2811
2812   removed = GNUNET_NO;
2813   udpw = (sock == plugin->sockv4)
2814     ? plugin->ipv4_queue_head
2815     : plugin->ipv6_queue_head;
2816   while (NULL != udpw)
2817   {
2818     session = udpw->session;
2819     /* Find messages with timeout */
2820     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2821     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2822     {
2823       /* Message timed out */
2824       switch (udpw->msg_type)
2825       {
2826       case UMT_MSG_UNFRAGMENTED:
2827         GNUNET_STATISTICS_update (plugin->env->stats,
2828                                   "# UDP, total, bytes, sent, timeout",
2829                                   udpw->msg_size,
2830                                   GNUNET_NO);
2831         GNUNET_STATISTICS_update (plugin->env->stats,
2832                                   "# UDP, total, messages, sent, timeout",
2833                                   1,
2834                                   GNUNET_NO);
2835         GNUNET_STATISTICS_update (plugin->env->stats,
2836                                   "# UDP, unfragmented msgs, messages, sent, timeout",
2837                                   1,
2838                                   GNUNET_NO);
2839         GNUNET_STATISTICS_update (plugin->env->stats,
2840                                   "# UDP, unfragmented msgs, bytes, sent, timeout",
2841                                   udpw->payload_size,
2842                                   GNUNET_NO);
2843         /* Not fragmented message */
2844         LOG (GNUNET_ERROR_TYPE_DEBUG,
2845              "Message for peer `%s' with size %u timed out\n",
2846              GNUNET_i2s (&udpw->session->target),
2847              udpw->payload_size);
2848         call_continuation (udpw, GNUNET_SYSERR);
2849         /* Remove message */
2850         removed = GNUNET_YES;
2851         dequeue (plugin, udpw);
2852         GNUNET_free(udpw);
2853         break;
2854       case UMT_MSG_FRAGMENTED:
2855         /* Fragmented message */
2856         GNUNET_STATISTICS_update (plugin->env->stats,
2857                                   "# UDP, total, bytes, sent, timeout",
2858                                   udpw->frag_ctx->on_wire_size,
2859                                   GNUNET_NO);
2860         GNUNET_STATISTICS_update (plugin->env->stats,
2861                                   "# UDP, total, messages, sent, timeout",
2862                                   1,
2863                                   GNUNET_NO);
2864         call_continuation (udpw,
2865                            GNUNET_SYSERR);
2866         LOG (GNUNET_ERROR_TYPE_DEBUG,
2867              "Fragment for message for peer `%s' with size %u timed out\n",
2868              GNUNET_i2s (&udpw->session->target),
2869             udpw->frag_ctx->payload_size);
2870
2871         GNUNET_STATISTICS_update (plugin->env->stats,
2872                                   "# UDP, fragmented msgs, messages, sent, timeout",
2873                                   1,
2874                                   GNUNET_NO);
2875         GNUNET_STATISTICS_update (plugin->env->stats,
2876                                   "# UDP, fragmented msgs, bytes, sent, timeout",
2877                                   udpw->frag_ctx->payload_size,
2878                                   GNUNET_NO);
2879         /* Remove fragmented message due to timeout */
2880         fragmented_message_done (udpw->frag_ctx,
2881                                  GNUNET_SYSERR);
2882         break;
2883       case UMT_MSG_ACK:
2884         GNUNET_STATISTICS_update (plugin->env->stats,
2885                                   "# UDP, total, bytes, sent, timeout",
2886                                   udpw->msg_size,
2887                                   GNUNET_NO);
2888         GNUNET_STATISTICS_update (plugin->env->stats,
2889                                   "# UDP, total, messages, sent, timeout",
2890                                   1,
2891                                   GNUNET_NO);
2892         LOG (GNUNET_ERROR_TYPE_DEBUG,
2893              "ACK Message for peer `%s' with size %u timed out\n",
2894              GNUNET_i2s (&udpw->session->target),
2895              udpw->payload_size);
2896         call_continuation (udpw,
2897                            GNUNET_SYSERR);
2898         removed = GNUNET_YES;
2899         dequeue (plugin,
2900                  udpw);
2901         GNUNET_free (udpw);
2902         break;
2903       default:
2904         break;
2905       }
2906       if (sock == plugin->sockv4)
2907         udpw = plugin->ipv4_queue_head;
2908       else if (sock == plugin->sockv6)
2909         udpw = plugin->ipv6_queue_head;
2910       else
2911       {
2912         GNUNET_break(0); /* should never happen */
2913         udpw = NULL;
2914       }
2915       GNUNET_STATISTICS_update (plugin->env->stats,
2916                                 "# messages discarded due to timeout",
2917                                 1,
2918                                 GNUNET_NO);
2919     }
2920     else
2921     {
2922       /* Message did not time out, check flow delay */
2923       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2924       if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2925       {
2926         /* this message is not delayed */
2927         LOG (GNUNET_ERROR_TYPE_DEBUG,
2928              "Message for peer `%s' (%u bytes) is not delayed \n",
2929              GNUNET_i2s (&udpw->session->target),
2930              udpw->payload_size);
2931         break; /* Found message to send, break */
2932       }
2933       else
2934       {
2935         /* Message is delayed, try next */
2936         LOG (GNUNET_ERROR_TYPE_DEBUG,
2937              "Message for peer `%s' (%u bytes) is delayed for %s\n",
2938              GNUNET_i2s (&udpw->session->target),
2939              udpw->payload_size,
2940              GNUNET_STRINGS_relative_time_to_string (remaining,
2941                                                      GNUNET_YES));
2942         udpw = udpw->next;
2943       }
2944     }
2945   }
2946   if (GNUNET_YES == removed)
2947     notify_session_monitor (session->plugin,
2948                             session,
2949                             GNUNET_TRANSPORT_SS_UPDATE);
2950   return udpw;
2951 }
2952
2953
2954 /**
2955  * FIXME.
2956  */
2957 static void
2958 analyze_send_error (struct Plugin *plugin,
2959                     const struct sockaddr *sa,
2960                     socklen_t slen, int error)
2961 {
2962   enum GNUNET_ATS_Network_Type type;
2963
2964   type = plugin->env->get_address_type (plugin->env->cls, sa, slen);
2965   if (((GNUNET_ATS_NET_LAN == type)
2966        || (GNUNET_ATS_NET_WAN == type))
2967       && ((ENETUNREACH == errno)|| (ENETDOWN == errno)))
2968   {
2969     if (slen == sizeof (struct sockaddr_in))
2970     {
2971       /* IPv4: "Network unreachable" or "Network down"
2972        *
2973        * This indicates we do not have connectivity
2974        */
2975       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2976            _("UDP could not transmit message to `%s': "
2977              "Network seems down, please check your network configuration\n"),
2978            GNUNET_a2s (sa, slen));
2979     }
2980     if (slen == sizeof (struct sockaddr_in6))
2981     {
2982       /* IPv6: "Network unreachable" or "Network down"
2983        *
2984        * This indicates that this system is IPv6 enabled, but does not
2985        * have a valid global IPv6 address assigned or we do not have
2986        * connectivity
2987        */
2988       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2989            _("UDP could not transmit IPv6 message! "
2990              "Please check your network configuration and disable IPv6 if your "
2991              "connection does not have a global IPv6 address\n"));
2992     }
2993   }
2994   else
2995   {
2996     LOG (GNUNET_ERROR_TYPE_WARNING,
2997          "UDP could not transmit message to `%s': `%s'\n",
2998          GNUNET_a2s (sa, slen), STRERROR (error));
2999   }
3000 }
3001
3002
3003 /**
3004  * It is time to try to transmit a UDP message.  Select one
3005  * and send.
3006  *
3007  * @param plugin the plugin
3008  * @param sock which socket (v4/v6) to send on
3009  * @return number of bytes transmitted, #GNUNET_SYSERR on failure
3010  */
3011 static size_t
3012 udp_select_send (struct Plugin *plugin,
3013                  struct GNUNET_NETWORK_Handle *sock)
3014 {
3015   ssize_t sent;
3016   socklen_t slen;
3017   struct sockaddr *a;
3018   const struct IPv4UdpAddress *u4;
3019   struct sockaddr_in a4;
3020   const struct IPv6UdpAddress *u6;
3021   struct sockaddr_in6 a6;
3022   struct UDP_MessageWrapper *udpw;
3023
3024   /* Find message to send */
3025   udpw = remove_timeout_messages_and_select (plugin,
3026                                              sock);
3027   if (NULL == udpw)
3028     return 0; /* No message to send */
3029
3030   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3031   {
3032     u4 = udpw->session->address->address;
3033     memset (&a4, 0, sizeof(a4));
3034     a4.sin_family = AF_INET;
3035 #if HAVE_SOCKADDR_IN_SIN_LEN
3036     a4.sin_len = sizeof (a4);
3037 #endif
3038     a4.sin_port = u4->u4_port;
3039     memcpy (&a4.sin_addr,
3040             &u4->ipv4_addr,
3041             sizeof(struct in_addr));
3042     a = (struct sockaddr *) &a4;
3043     slen = sizeof (a4);
3044   }
3045   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3046   {
3047     u6 = udpw->session->address->address;
3048     memset (&a6, 0, sizeof(a6));
3049     a6.sin6_family = AF_INET6;
3050 #if HAVE_SOCKADDR_IN_SIN_LEN
3051     a6.sin6_len = sizeof (a6);
3052 #endif
3053     a6.sin6_port = u6->u6_port;
3054     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
3055     a = (struct sockaddr *) &a6;
3056     slen = sizeof (a6);
3057   }
3058   else
3059   {
3060     call_continuation (udpw,
3061                        GNUNET_OK);
3062     dequeue (plugin,
3063              udpw);
3064     notify_session_monitor (plugin,
3065                             udpw->session,
3066                             GNUNET_TRANSPORT_SS_UPDATE);
3067     GNUNET_free (udpw);
3068     return GNUNET_SYSERR;
3069   }
3070   sent = GNUNET_NETWORK_socket_sendto (sock,
3071                                        udpw->msg_buf,
3072                                        udpw->msg_size,
3073                                        a,
3074                                        slen);
3075   if (GNUNET_SYSERR == sent)
3076   {
3077     /* Failure */
3078     analyze_send_error (plugin,
3079                         a,
3080                         slen,
3081                         errno);
3082     call_continuation (udpw,
3083                        GNUNET_SYSERR);
3084     GNUNET_STATISTICS_update (plugin->env->stats,
3085                               "# UDP, total, bytes, sent, failure",
3086                               sent,
3087                               GNUNET_NO);
3088     GNUNET_STATISTICS_update (plugin->env->stats,
3089                               "# UDP, total, messages, sent, failure",
3090                               1,
3091                               GNUNET_NO);
3092   }
3093   else
3094   {
3095     /* Success */
3096     LOG (GNUNET_ERROR_TYPE_DEBUG,
3097          "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3098          (unsigned int) (udpw->msg_size),
3099          GNUNET_i2s (&udpw->session->target),
3100          GNUNET_a2s (a, slen),
3101          (int ) sent,
3102          (sent < 0) ? STRERROR (errno) : "ok");
3103     GNUNET_STATISTICS_update (plugin->env->stats,
3104                               "# UDP, total, bytes, sent, success",
3105                               sent,
3106                               GNUNET_NO);
3107     GNUNET_STATISTICS_update (plugin->env->stats,
3108                               "# UDP, total, messages, sent, success",
3109                               1,
3110                               GNUNET_NO);
3111     if (NULL != udpw->frag_ctx)
3112       udpw->frag_ctx->on_wire_size += udpw->msg_size;
3113     call_continuation (udpw, GNUNET_OK);
3114   }
3115   dequeue (plugin, udpw);
3116   notify_session_monitor (plugin,
3117                           udpw->session,
3118                           GNUNET_TRANSPORT_SS_UPDATE);
3119   GNUNET_free (udpw);
3120   return sent;
3121 }
3122
3123
3124 /**
3125  * We have been notified that our readset has something to read.  We don't
3126  * know which socket needs to be read, so we have to check each one
3127  * Then reschedule this function to be called again once more is available.
3128  *
3129  * @param cls the plugin handle
3130  * @param tc the scheduling context (for rescheduling this function again)
3131  */
3132 static void
3133 udp_plugin_select (void *cls,
3134                    const struct GNUNET_SCHEDULER_TaskContext *tc)
3135 {
3136   struct Plugin *plugin = cls;
3137
3138   plugin->select_task = NULL;
3139   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3140     return;
3141   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
3142       && (NULL != plugin->sockv4)
3143       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
3144     udp_select_read (plugin, plugin->sockv4);
3145   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
3146       && (NULL != plugin->sockv4) && (NULL != plugin->ipv4_queue_head)
3147       && (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
3148     udp_select_send (plugin, plugin->sockv4);
3149   schedule_select (plugin);
3150 }
3151
3152
3153 /**
3154  * We have been notified that our readset has something to read.  We don't
3155  * know which socket needs to be read, so we have to check each one
3156  * Then reschedule this function to be called again once more is available.
3157  *
3158  * @param cls the plugin handle
3159  * @param tc the scheduling context (for rescheduling this function again)
3160  */
3161 static void
3162 udp_plugin_select_v6 (void *cls,
3163                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3164 {
3165   struct Plugin *plugin = cls;
3166
3167   plugin->select_task_v6 = NULL;
3168   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3169     return;
3170   if (((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
3171       && (NULL != plugin->sockv6)
3172       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
3173     udp_select_read (plugin, plugin->sockv6);
3174   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
3175       && (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL )&&
3176       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )udp_select_send (plugin, plugin->sockv6);
3177   schedule_select (plugin);
3178 }
3179
3180
3181 /**
3182  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3183  *
3184  * @param plugin the plugin to initialize
3185  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3186  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3187  * @return number of sockets that were successfully bound
3188  */
3189 static int
3190 setup_sockets (struct Plugin *plugin,
3191                const struct sockaddr_in6 *bind_v6,
3192                const struct sockaddr_in *bind_v4)
3193 {
3194   int tries;
3195   int sockets_created = 0;
3196   struct sockaddr_in6 server_addrv6;
3197   struct sockaddr_in server_addrv4;
3198   struct sockaddr *server_addr;
3199   struct sockaddr *addrs[2];
3200   socklen_t addrlens[2];
3201   socklen_t addrlen;
3202   int eno;
3203
3204   /* Create IPv6 socket */
3205   eno = EINVAL;
3206   if (GNUNET_YES == plugin->enable_ipv6)
3207   {
3208     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
3209     if (NULL == plugin->sockv6)
3210     {
3211       LOG(GNUNET_ERROR_TYPE_WARNING,
3212           "Disabling IPv6 since it is not supported on this system!\n");
3213       plugin->enable_ipv6 = GNUNET_NO;
3214     }
3215     else
3216     {
3217       memset (&server_addrv6, '\0', sizeof(struct sockaddr_in6));
3218 #if HAVE_SOCKADDR_IN_SIN_LEN
3219       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3220 #endif
3221       server_addrv6.sin6_family = AF_INET6;
3222       if (NULL != bind_v6)
3223         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3224       else
3225         server_addrv6.sin6_addr = in6addr_any;
3226
3227       if (0 == plugin->port) /* autodetect */
3228         server_addrv6.sin6_port
3229           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3230                    + 32000);
3231       else
3232         server_addrv6.sin6_port = htons (plugin->port);
3233       addrlen = sizeof(struct sockaddr_in6);
3234       server_addr = (struct sockaddr *) &server_addrv6;
3235
3236       tries = 0;
3237       while (tries < 10)
3238       {
3239         LOG(GNUNET_ERROR_TYPE_DEBUG,
3240             "Binding to IPv6 `%s'\n",
3241             GNUNET_a2s (server_addr, addrlen));
3242         /* binding */
3243         if (GNUNET_OK ==
3244             GNUNET_NETWORK_socket_bind (plugin->sockv6,
3245                                         server_addr,
3246                                         addrlen))
3247           break;
3248         eno = errno;
3249         if (0 != plugin->port)
3250         {
3251           tries = 10; /* fail immediately */
3252           break; /* bind failed on specific port */
3253         }
3254         /* autodetect */
3255         server_addrv6.sin6_port
3256           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3257                    + 32000);
3258         tries++;
3259       }
3260       if (tries >= 10)
3261       {
3262         GNUNET_NETWORK_socket_close (plugin->sockv6);
3263         plugin->enable_ipv6 = GNUNET_NO;
3264         plugin->sockv6 = NULL;
3265       }
3266       else
3267       {
3268         plugin->port = ntohs (server_addrv6.sin6_port);
3269       }
3270       if (NULL != plugin->sockv6)
3271       {
3272         LOG (GNUNET_ERROR_TYPE_DEBUG,
3273              "IPv6 socket created on port %s\n",
3274              GNUNET_a2s (server_addr, addrlen));
3275         addrs[sockets_created] = (struct sockaddr *) &server_addrv6;
3276         addrlens[sockets_created] = sizeof(struct sockaddr_in6);
3277         sockets_created++;
3278       }
3279       else
3280       {
3281         LOG (GNUNET_ERROR_TYPE_ERROR,
3282              "Failed to bind UDP socket to %s: %s\n",
3283              GNUNET_a2s (server_addr, addrlen),
3284              STRERROR (eno));
3285       }
3286     }
3287   }
3288
3289   /* Create IPv4 socket */
3290   eno = EINVAL;
3291   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
3292   if (NULL == plugin->sockv4)
3293   {
3294     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3295                          "socket");
3296     LOG (GNUNET_ERROR_TYPE_WARNING,
3297          "Disabling IPv4 since it is not supported on this system!\n");
3298     plugin->enable_ipv4 = GNUNET_NO;
3299   }
3300   else
3301   {
3302     memset (&server_addrv4, '\0', sizeof(struct sockaddr_in));
3303 #if HAVE_SOCKADDR_IN_SIN_LEN
3304     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3305 #endif
3306     server_addrv4.sin_family = AF_INET;
3307     if (NULL != bind_v4)
3308       server_addrv4.sin_addr = bind_v4->sin_addr;
3309     else
3310       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3311
3312     if (0 == plugin->port)
3313       /* autodetect */
3314       server_addrv4.sin_port
3315         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3316                                            33537)
3317                  + 32000);
3318     else
3319       server_addrv4.sin_port = htons (plugin->port);
3320
3321     addrlen = sizeof(struct sockaddr_in);
3322     server_addr = (struct sockaddr *) &server_addrv4;
3323
3324     tries = 0;
3325     while (tries < 10)
3326     {
3327       LOG (GNUNET_ERROR_TYPE_DEBUG,
3328            "Binding to IPv4 `%s'\n",
3329            GNUNET_a2s (server_addr, addrlen));
3330
3331       /* binding */
3332       if (GNUNET_OK ==
3333           GNUNET_NETWORK_socket_bind (plugin->sockv4,
3334                                       server_addr,
3335                                       addrlen))
3336         break;
3337       eno = errno;
3338       if (0 != plugin->port)
3339       {
3340         tries = 10; /* fail */
3341         break; /* bind failed on specific port */
3342       }
3343
3344       /* autodetect */
3345       server_addrv4.sin_port
3346         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3347                  + 32000);
3348       tries++;
3349     }
3350     if (tries >= 10)
3351     {
3352       GNUNET_NETWORK_socket_close (plugin->sockv4);
3353       plugin->enable_ipv4 = GNUNET_NO;
3354       plugin->sockv4 = NULL;
3355     }
3356     else
3357     {
3358       plugin->port = ntohs (server_addrv4.sin_port);
3359     }
3360
3361     if (NULL != plugin->sockv4)
3362     {
3363       LOG (GNUNET_ERROR_TYPE_DEBUG,
3364            "IPv4 socket created on port %s\n",
3365            GNUNET_a2s (server_addr, addrlen));
3366       addrs[sockets_created] = (struct sockaddr *) &server_addrv4;
3367       addrlens[sockets_created] = sizeof(struct sockaddr_in);
3368       sockets_created++;
3369     }
3370     else
3371     {
3372       LOG (GNUNET_ERROR_TYPE_ERROR,
3373            _("Failed to bind UDP socket to %s: %s\n"),
3374            GNUNET_a2s (server_addr, addrlen),
3375            STRERROR (eno));
3376     }
3377   }
3378
3379   if (0 == sockets_created)
3380   {
3381     LOG (GNUNET_ERROR_TYPE_WARNING,
3382          _("Failed to open UDP sockets\n"));
3383     return 0; /* No sockets created, return */
3384   }
3385
3386   /* Create file descriptors */
3387   if (plugin->enable_ipv4 == GNUNET_YES)
3388   {
3389     plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
3390     plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
3391     GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
3392     GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
3393     if (NULL != plugin->sockv4)
3394     {
3395       GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
3396       GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
3397     }
3398   }
3399
3400   if (plugin->enable_ipv6 == GNUNET_YES)
3401   {
3402     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
3403     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
3404     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
3405     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
3406     if (NULL != plugin->sockv6)
3407     {
3408       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
3409       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
3410     }
3411   }
3412
3413   schedule_select (plugin);
3414   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3415                                      GNUNET_NO,
3416                                      plugin->port,
3417                                      sockets_created,
3418                                      (const struct sockaddr **) addrs,
3419                                      addrlens,
3420                                      &udp_nat_port_map_callback,
3421                                      NULL,
3422                                      plugin);
3423
3424   return sockets_created;
3425 }
3426
3427
3428 /**
3429  * Return information about the given session to the
3430  * monitor callback.
3431  *
3432  * @param cls the `struct Plugin` with the monitor callback (`sic`)
3433  * @param peer peer we send information about
3434  * @param value our `struct Session` to send information about
3435  * @return #GNUNET_OK (continue to iterate)
3436  */
3437 static int
3438 send_session_info_iter (void *cls,
3439                         const struct GNUNET_PeerIdentity *peer,
3440                         void *value)
3441 {
3442   struct Plugin *plugin = cls;
3443   struct Session *session = value;
3444
3445   notify_session_monitor (plugin,
3446                           session,
3447                           GNUNET_TRANSPORT_SS_INIT);
3448   notify_session_monitor (plugin,
3449                           session,
3450                           GNUNET_TRANSPORT_SS_UP);
3451   return GNUNET_OK;
3452 }
3453
3454
3455 /**
3456  * Begin monitoring sessions of a plugin.  There can only
3457  * be one active monitor per plugin (i.e. if there are
3458  * multiple monitors, the transport service needs to
3459  * multiplex the generated events over all of them).
3460  *
3461  * @param cls closure of the plugin
3462  * @param sic callback to invoke, NULL to disable monitor;
3463  *            plugin will being by iterating over all active
3464  *            sessions immediately and then enter monitor mode
3465  * @param sic_cls closure for @a sic
3466  */
3467 static void
3468 udp_plugin_setup_monitor (void *cls,
3469                           GNUNET_TRANSPORT_SessionInfoCallback sic,
3470                           void *sic_cls)
3471 {
3472   struct Plugin *plugin = cls;
3473
3474   plugin->sic = sic;
3475   plugin->sic_cls = sic_cls;
3476   if (NULL != sic)
3477   {
3478     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3479                                            &send_session_info_iter,
3480                                            plugin);
3481     /* signal end of first iteration */
3482     sic (sic_cls, NULL, NULL);
3483   }
3484 }
3485
3486
3487 /**
3488  * The exported method. Makes the core api available via a global and
3489  * returns the udp transport API.
3490  *
3491  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3492  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3493  */
3494 void *
3495 libgnunet_plugin_transport_udp_init (void *cls)
3496 {
3497   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3498   struct GNUNET_TRANSPORT_PluginFunctions *api;
3499   struct Plugin *p;
3500   unsigned long long port;
3501   unsigned long long aport;
3502   unsigned long long udp_max_bps;
3503   unsigned long long enable_v6;
3504   unsigned long long enable_broadcasting;
3505   unsigned long long enable_broadcasting_recv;
3506   char *bind4_address;
3507   char *bind6_address;
3508   char *fancy_interval;
3509   struct GNUNET_TIME_Relative interval;
3510   struct sockaddr_in server_addrv4;
3511   struct sockaddr_in6 server_addrv6;
3512   int res;
3513   int have_bind4;
3514   int have_bind6;
3515
3516   if (NULL == env->receive)
3517   {
3518     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3519      initialze the plugin or the API */
3520     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3521     api->cls = NULL;
3522     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3523     api->address_to_string = &udp_address_to_string;
3524     api->string_to_address = &udp_string_to_address;
3525     return api;
3526   }
3527
3528   /* Get port number: port == 0 : autodetect a port,
3529    * > 0 : use this port, not given : 2086 default */
3530   if (GNUNET_OK !=
3531       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3532                                              "transport-udp",
3533                                              "PORT", &port))
3534     port = 2086;
3535   if (GNUNET_OK !=
3536       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3537                                              "transport-udp",
3538                                              "ADVERTISED_PORT", &aport))
3539     aport = port;
3540   if (port > 65535)
3541   {
3542     LOG (GNUNET_ERROR_TYPE_WARNING,
3543          _("Given `%s' option is out of range: %llu > %u\n"),
3544          "PORT", port,
3545          65535);
3546     return NULL;
3547   }
3548
3549   /* Protocols */
3550   if (GNUNET_YES ==
3551       GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6"))
3552     enable_v6 = GNUNET_NO;
3553   else
3554     enable_v6 = GNUNET_YES;
3555
3556   /* Addresses */
3557   have_bind4 = GNUNET_NO;
3558   memset (&server_addrv4, 0, sizeof(server_addrv4));
3559   if (GNUNET_YES ==
3560       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3561                                              "BINDTO", &bind4_address))
3562   {
3563     LOG (GNUNET_ERROR_TYPE_DEBUG,
3564          "Binding udp plugin to specific address: `%s'\n",
3565          bind4_address);
3566     if (1 != inet_pton (AF_INET,
3567                         bind4_address,
3568                         &server_addrv4.sin_addr))
3569     {
3570       GNUNET_free (bind4_address);
3571       return NULL;
3572     }
3573     have_bind4 = GNUNET_YES;
3574   }
3575   GNUNET_free_non_null(bind4_address);
3576   have_bind6 = GNUNET_NO;
3577   memset (&server_addrv6, 0, sizeof(server_addrv6));
3578   if (GNUNET_YES ==
3579       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3580                                              "BINDTO6", &bind6_address))
3581   {
3582     LOG (GNUNET_ERROR_TYPE_DEBUG,
3583          "Binding udp plugin to specific address: `%s'\n",
3584          bind6_address);
3585     if (1 != inet_pton (AF_INET6,
3586                         bind6_address,
3587                         &server_addrv6.sin6_addr))
3588     {
3589       LOG (GNUNET_ERROR_TYPE_ERROR,
3590            _("Invalid IPv6 address: `%s'\n"),
3591            bind6_address);
3592       GNUNET_free (bind6_address);
3593       return NULL;
3594     }
3595     have_bind6 = GNUNET_YES;
3596   }
3597   GNUNET_free_non_null (bind6_address);
3598
3599   /* Enable neighbour discovery */
3600   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3601       "transport-udp", "BROADCAST");
3602   if (enable_broadcasting == GNUNET_SYSERR)
3603     enable_broadcasting = GNUNET_NO;
3604
3605   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3606       "transport-udp", "BROADCAST_RECEIVE");
3607   if (enable_broadcasting_recv == GNUNET_SYSERR)
3608     enable_broadcasting_recv = GNUNET_YES;
3609
3610   if (GNUNET_SYSERR ==
3611       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3612                                              "BROADCAST_INTERVAL",
3613                                              &fancy_interval))
3614   {
3615     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
3616   }
3617   else
3618   {
3619     if (GNUNET_SYSERR ==
3620         GNUNET_STRINGS_fancy_time_to_relative (fancy_interval, &interval))
3621     {
3622       interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
3623     }
3624     GNUNET_free(fancy_interval);
3625   }
3626
3627   /* Maximum datarate */
3628   if (GNUNET_OK !=
3629       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
3630                                              "MAX_BPS", &udp_max_bps))
3631   {
3632     udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */
3633   }
3634
3635   p = GNUNET_new (struct Plugin);
3636   p->port = port;
3637   p->aport = aport;
3638   p->broadcast_interval = interval;
3639   p->enable_ipv6 = enable_v6;
3640   p->enable_ipv4 = GNUNET_YES; /* default */
3641   p->enable_broadcasting = enable_broadcasting;
3642   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3643   p->env = env;
3644   p->sessions = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
3645   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (
3646       GNUNET_CONTAINER_HEAP_ORDER_MIN);
3647   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
3648                                      p);
3649   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3650                                  NULL,
3651                                  NULL,
3652                                  GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
3653                                  30);
3654   LOG(GNUNET_ERROR_TYPE_DEBUG,
3655       "Setting up sockets\n");
3656   res = setup_sockets (p,
3657                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3658                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3659   if ((res == 0) || ((p->sockv4 == NULL )&& (p->sockv6 == NULL)))
3660   {
3661     LOG (GNUNET_ERROR_TYPE_ERROR,
3662         _("Failed to create network sockets, plugin failed\n"));
3663     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3664     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3665     GNUNET_SERVER_mst_destroy (p->mst);
3666     GNUNET_free (p);
3667     return NULL;
3668   }
3669
3670   /* Setup broadcasting and receiving beacons */
3671   setup_broadcast (p, &server_addrv6, &server_addrv4);
3672
3673   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3674   api->cls = p;
3675   api->send = NULL;
3676   api->disconnect_session = &udp_disconnect_session;
3677   api->query_keepalive_factor = &udp_query_keepalive_factor;
3678   api->disconnect_peer = &udp_disconnect;
3679   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3680   api->address_to_string = &udp_address_to_string;
3681   api->string_to_address = &udp_string_to_address;
3682   api->check_address = &udp_plugin_check_address;
3683   api->get_session = &udp_plugin_get_session;
3684   api->send = &udp_plugin_send;
3685   api->get_network = &udp_get_network;
3686   api->update_session_timeout = &udp_plugin_update_session_timeout;
3687   api->setup_monitor = &udp_plugin_setup_monitor;
3688   return api;
3689 }
3690
3691
3692 /**
3693  * Function called on each entry in the defragmentation heap to
3694  * clean it up.
3695  *
3696  * @param cls NULL
3697  * @param node node in the heap (to be removed)
3698  * @param element a `struct DefragContext` to be cleaned up
3699  * @param cost unused
3700  * @return #GNUNET_YES
3701  */
3702 static int
3703 heap_cleanup_iterator (void *cls,
3704                        struct GNUNET_CONTAINER_HeapNode *node,
3705                        void *element,
3706                        GNUNET_CONTAINER_HeapCostType cost)
3707 {
3708   struct DefragContext *d_ctx = element;
3709
3710   GNUNET_CONTAINER_heap_remove_node (node);
3711   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3712   GNUNET_free (d_ctx);
3713   return GNUNET_YES;
3714 }
3715
3716
3717 /**
3718  * The exported method. Makes the core api available via a global and
3719  * returns the udp transport API.
3720  *
3721  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3722  * @return NULL
3723  */
3724 void *
3725 libgnunet_plugin_transport_udp_done (void *cls)
3726 {
3727   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3728   struct Plugin *plugin = api->cls;
3729   struct PrettyPrinterContext *cur;
3730   struct PrettyPrinterContext *next;
3731   struct UDP_MessageWrapper *udpw;
3732
3733   if (NULL == plugin)
3734   {
3735     GNUNET_free(api);
3736     return NULL;
3737   }
3738   stop_broadcast (plugin);
3739   if (plugin->select_task != NULL)
3740   {
3741     GNUNET_SCHEDULER_cancel (plugin->select_task);
3742     plugin->select_task = NULL;
3743   }
3744   if (plugin->select_task_v6 != NULL)
3745   {
3746     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3747     plugin->select_task_v6 = NULL;
3748   }
3749
3750   /* Closing sockets */
3751   if (GNUNET_YES == plugin->enable_ipv4)
3752   {
3753     if (NULL != plugin->sockv4)
3754     {
3755       GNUNET_break (GNUNET_OK ==
3756                     GNUNET_NETWORK_socket_close (plugin->sockv4));
3757       plugin->sockv4 = NULL;
3758     }
3759     GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
3760     GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
3761   }
3762   if (GNUNET_YES == plugin->enable_ipv6)
3763   {
3764     if (NULL != plugin->sockv6)
3765     {
3766       GNUNET_break (GNUNET_OK ==
3767                     GNUNET_NETWORK_socket_close (plugin->sockv6));
3768       plugin->sockv6 = NULL;
3769
3770       GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
3771       GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
3772     }
3773   }
3774   if (NULL != plugin->nat)
3775   {
3776     GNUNET_NAT_unregister (plugin->nat);
3777     plugin->nat = NULL;
3778   }
3779   if (NULL != plugin->defrag_ctxs)
3780   {
3781     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3782                                    &heap_cleanup_iterator, NULL);
3783     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3784     plugin->defrag_ctxs = NULL;
3785   }
3786   if (NULL != plugin->mst)
3787   {
3788     GNUNET_SERVER_mst_destroy (plugin->mst);
3789     plugin->mst = NULL;
3790   }
3791
3792   /* Clean up leftover messages */
3793   udpw = plugin->ipv4_queue_head;
3794   while (NULL != udpw)
3795   {
3796     struct UDP_MessageWrapper *tmp = udpw->next;
3797     dequeue (plugin, udpw);
3798     call_continuation (udpw, GNUNET_SYSERR);
3799     GNUNET_free(udpw);
3800     udpw = tmp;
3801   }
3802   udpw = plugin->ipv6_queue_head;
3803   while (NULL != udpw)
3804   {
3805     struct UDP_MessageWrapper *tmp = udpw->next;
3806     dequeue (plugin, udpw);
3807     call_continuation (udpw, GNUNET_SYSERR);
3808     GNUNET_free(udpw);
3809     udpw = tmp;
3810   }
3811
3812   /* Clean up sessions */
3813   LOG (GNUNET_ERROR_TYPE_DEBUG,
3814        "Cleaning up sessions\n");
3815   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3816                                          &disconnect_and_free_it, plugin);
3817   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3818
3819   next = plugin->ppc_dll_head;
3820   for (cur = next; NULL != cur; cur = next)
3821   {
3822     GNUNET_break(0);
3823     next = cur->next;
3824     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
3825                                  plugin->ppc_dll_tail,
3826                                  cur);
3827     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
3828     GNUNET_free (cur);
3829   }
3830   GNUNET_free (plugin);
3831   GNUNET_free (api);
3832   return NULL;
3833 }
3834
3835 /* end of plugin_transport_udp.c */