-no longer needed
[oweals/gnunet.git] / src / transport / plugin_transport_udp.c
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2010-2015 Christian Grothoff (and other contributing authors)
4
5  GNUnet is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  General Public License for more details.
14
15  You should have received a copy of the GNU General Public License
16  along with GNUnet; see the file COPYING.  If not, write to the
17  Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  Boston, MA 02111-1307, USA.
19  */
20
21 /**
22  * @file transport/plugin_transport_udp.c
23  * @brief Implementation of the UDP transport protocol
24  * @author Christian Grothoff
25  * @author Nathan Evans
26  * @author Matthias Wachs
27  */
28 #include "platform.h"
29 #include "plugin_transport_udp.h"
30 #include "gnunet_hello_lib.h"
31 #include "gnunet_util_lib.h"
32 #include "gnunet_fragmentation_lib.h"
33 #include "gnunet_nat_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_resolver_service.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_constants.h"
38 #include "gnunet_statistics_service.h"
39 #include "gnunet_transport_service.h"
40 #include "gnunet_transport_plugin.h"
41 #include "transport.h"
42
43 #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__)
44
45 #define UDP_SESSION_TIME_OUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
46
47 /**
48  * Number of messages we can defragment in parallel.  We only really
49  * defragment 1 message at a time, but if messages get re-ordered, we
50  * may want to keep knowledge about the previous message to avoid
51  * discarding the current message in favor of a single fragment of a
52  * previous message.  3 should be good since we don't expect massive
53  * message reorderings with UDP.
54  */
55 #define UDP_MAX_MESSAGES_IN_DEFRAG 3
56
57 /**
58  * We keep a defragmentation queue per sender address.  How many
59  * sender addresses do we support at the same time? Memory consumption
60  * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
61  * value. (So 128 corresponds to 12 MB and should suffice for
62  * connecting to roughly 128 peers via UDP).
63  */
64 #define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
65
66
67 /**
68  * Closure for #append_port().
69  */
70 struct PrettyPrinterContext
71 {
72   /**
73    * DLL
74    */
75   struct PrettyPrinterContext *next;
76
77   /**
78    * DLL
79    */
80   struct PrettyPrinterContext *prev;
81
82   /**
83    * Our plugin.
84    */
85   struct Plugin *plugin;
86
87   /**
88    * Resolver handle
89    */
90   struct GNUNET_RESOLVER_RequestHandle *resolver_handle;
91
92   /**
93    * Function to call with the result.
94    */
95   GNUNET_TRANSPORT_AddressStringCallback asc;
96
97   /**
98    * Clsoure for @e asc.
99    */
100   void *asc_cls;
101
102   /**
103    * Timeout task
104    */
105   struct GNUNET_SCHEDULER_Task *timeout_task;
106
107   /**
108    * Is this an IPv6 address?
109    */
110   int ipv6;
111
112   /**
113    * Options
114    */
115   uint32_t options;
116
117   /**
118    * Port to add after the IP address.
119    */
120   uint16_t port;
121
122 };
123
124
125 /**
126  * Session with another peer.
127  */
128 struct Session
129 {
130   /**
131    * Which peer is this session for?
132    */
133   struct GNUNET_PeerIdentity target;
134
135   /**
136    * Plugin this session belongs to.
137    */
138   struct Plugin *plugin;
139
140   /**
141    * Context for dealing with fragments.
142    */
143   struct UDP_FragmentationContext *frag_ctx;
144
145   /**
146    * Desired delay for next sending we send to other peer
147    */
148   struct GNUNET_TIME_Relative flow_delay_for_other_peer;
149
150   /**
151    * Desired delay for next sending we received from other peer
152    */
153   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
154
155   /**
156    * Session timeout task
157    */
158   struct GNUNET_SCHEDULER_Task *timeout_task;
159
160   /**
161    * When does this session time out?
162    */
163   struct GNUNET_TIME_Absolute timeout;
164
165   /**
166    * expected delay for ACKs
167    */
168   struct GNUNET_TIME_Relative last_expected_ack_delay;
169
170   /**
171    * desired delay between UDP messages
172    */
173   struct GNUNET_TIME_Relative last_expected_msg_delay;
174
175   /**
176    * Our own address.
177    */
178   struct GNUNET_HELLO_Address *address;
179
180   /**
181    * Number of bytes waiting for transmission to this peer.
182    */
183   unsigned long long bytes_in_queue;
184
185   /**
186    * Number of messages waiting for transmission to this peer.
187    */
188   unsigned int msgs_in_queue;
189
190   /**
191    * Reference counter to indicate that this session is
192    * currently being used and must not be destroyed;
193    * setting @e in_destroy will destroy it as soon as
194    * possible.
195    */
196   unsigned int rc;
197
198   /**
199    * Network type of the address.
200    */
201   enum GNUNET_ATS_Network_Type scope;
202
203   /**
204    * Is this session about to be destroyed (sometimes we cannot
205    * destroy a session immediately as below us on the stack
206    * there might be code that still uses it; in this case,
207    * @e rc is non-zero).
208    */
209   int in_destroy;
210 };
211
212
213 /**
214  * Closure for #process_inbound_tokenized_messages().
215  */
216 struct SourceInformation
217 {
218   /**
219    * Sender identity.
220    */
221   struct GNUNET_PeerIdentity sender;
222
223   /**
224    * Associated session.
225    */
226   struct Session *session;
227
228 };
229
230 /**
231  * Closure for #find_receive_context().
232  */
233 struct FindReceiveContext
234 {
235   /**
236    * Where to store the result.
237    */
238   struct DefragContext *rc;
239
240   /**
241    * Session associated with this context.
242    */
243   struct Session *session;
244
245   /**
246    * Address to find.
247    */
248   const union UdpAddress *udp_addr;
249
250   /**
251    * Number of bytes in @e udp_addr.
252    */
253   size_t udp_addr_len;
254
255 };
256
257 /**
258  * Data structure to track defragmentation contexts based
259  * on the source of the UDP traffic.
260  */
261 struct DefragContext
262 {
263
264   /**
265    * Defragmentation context.
266    */
267   struct GNUNET_DEFRAGMENT_Context *defrag;
268
269   /**
270    * Reference to master plugin struct.
271    */
272   struct Plugin *plugin;
273
274   /**
275    * Node in the defrag heap.
276    */
277   struct GNUNET_CONTAINER_HeapNode *hnode;
278
279   /**
280    * Who's message(s) are we defragmenting here?
281    * Only initialized once we succeeded and
282    * @e have_sender is set.
283    */
284   struct GNUNET_PeerIdentity sender;
285
286   /**
287    * Source address this receive context is for (allocated at the
288    * end of the struct).
289    */
290   const union UdpAddress *udp_addr;
291
292   /**
293    * Length of @e udp_addr.
294    */
295   size_t udp_addr_len;
296
297   /**
298    * Network type the address belongs to.
299    */
300   enum GNUNET_ATS_Network_Type network_type;
301
302   /**
303    * Has the @e sender field been initialized yet?
304    */
305   int have_sender;
306 };
307
308
309 /**
310  * Context to send fragmented messages
311  */
312 struct UDP_FragmentationContext
313 {
314   /**
315    * Next in linked list
316    */
317   struct UDP_FragmentationContext *next;
318
319   /**
320    * Previous in linked list
321    */
322   struct UDP_FragmentationContext *prev;
323
324   /**
325    * The plugin
326    */
327   struct Plugin *plugin;
328
329   /**
330    * Handle for GNUNET_FRAGMENT context
331    */
332   struct GNUNET_FRAGMENT_Context *frag;
333
334   /**
335    * The session this fragmentation context belongs to
336    */
337   struct Session *session;
338
339   /**
340    * Function to call upon completion of the transmission.
341    */
342   GNUNET_TRANSPORT_TransmitContinuation cont;
343
344   /**
345    * Closure for @e cont.
346    */
347   void *cont_cls;
348
349   /**
350    * Message timeout
351    */
352   struct GNUNET_TIME_Absolute timeout;
353
354   /**
355    * Payload size of original unfragmented message
356    */
357   size_t payload_size;
358
359   /**
360    * Bytes used to send all fragments on wire including UDP overhead
361    */
362   size_t on_wire_size;
363
364   /**
365    * FIXME.
366    */
367   unsigned int fragments_used;
368
369 };
370
371
372 /**
373  * Message types included in a `struct UDP_MessageWrapper`
374  */
375 enum UDP_MessageType
376 {
377   /**
378    * Uninitialized (error)
379    */
380   UMT_UNDEFINED = 0,
381
382   /**
383    * Fragment of a message.
384    */
385   UMT_MSG_FRAGMENTED = 1,
386
387   /**
388    *
389    */
390   UMT_MSG_FRAGMENTED_COMPLETE = 2,
391
392   /**
393    * Unfragmented message.
394    */
395   UMT_MSG_UNFRAGMENTED = 3,
396
397   /**
398    * Receipt confirmation.
399    */
400   UMT_MSG_ACK = 4
401
402 };
403
404
405 /**
406  * Information we track for each message in the queue.
407  */
408 struct UDP_MessageWrapper
409 {
410   /**
411    * Session this message belongs to
412    */
413   struct Session *session;
414
415   /**
416    * DLL of messages
417    * previous element
418    */
419   struct UDP_MessageWrapper *prev;
420
421   /**
422    * DLL of messages
423    * previous element
424    */
425   struct UDP_MessageWrapper *next;
426
427   /**
428    * Message with size msg_size including UDP specific overhead
429    */
430   char *msg_buf;
431
432   /**
433    * Function to call upon completion of the transmission.
434    */
435   GNUNET_TRANSPORT_TransmitContinuation cont;
436
437   /**
438    * Closure for @e cont.
439    */
440   void *cont_cls;
441
442   /**
443    * Fragmentation context
444    * frag_ctx == NULL if transport <= MTU
445    * frag_ctx != NULL if transport > MTU
446    */
447   struct UDP_FragmentationContext *frag_ctx;
448
449   /**
450    * Message timeout
451    */
452   struct GNUNET_TIME_Absolute timeout;
453
454   /**
455    * Size of UDP message to send including UDP specific overhead
456    */
457   size_t msg_size;
458
459   /**
460    * Payload size of original message
461    */
462   size_t payload_size;
463
464   /**
465    * Message type
466    */
467   enum UDP_MessageType msg_type;
468
469 };
470
471
472 /**
473  * UDP ACK Message-Packet header.
474  */
475 struct UDP_ACK_Message
476 {
477   /**
478    * Message header.
479    */
480   struct GNUNET_MessageHeader header;
481
482   /**
483    * Desired delay for flow control
484    */
485   uint32_t delay;
486
487   /**
488    * What is the identity of the sender
489    */
490   struct GNUNET_PeerIdentity sender;
491
492 };
493
494
495 /**
496  * If a session monitor is attached, notify it about the new
497  * session state.
498  *
499  * @param plugin our plugin
500  * @param session session that changed state
501  * @param state new state of the session
502  */
503 static void
504 notify_session_monitor (struct Plugin *plugin,
505                         struct Session *session,
506                         enum GNUNET_TRANSPORT_SessionState state)
507 {
508   struct GNUNET_TRANSPORT_SessionInfo info;
509
510   if (NULL == plugin->sic)
511     return;
512   if (GNUNET_YES == session->in_destroy)
513     return; /* already destroyed, just RC>0 left-over actions */
514   memset (&info, 0, sizeof (info));
515   info.state = state;
516   info.is_inbound = GNUNET_SYSERR; /* hard to say */
517   info.num_msg_pending = session->msgs_in_queue;
518   info.num_bytes_pending = session->bytes_in_queue;
519   /* info.receive_delay remains zero as this is not supported by UDP
520      (cannot selectively not receive from 'some' peer while continuing
521      to receive from others) */
522   info.session_timeout = session->timeout;
523   info.address = session->address;
524   plugin->sic (plugin->sic_cls,
525                session,
526                &info);
527 }
528
529
530 /**
531  * We have been notified that our readset has something to read.  We don't
532  * know which socket needs to be read, so we have to check each one
533  * Then reschedule this function to be called again once more is available.
534  *
535  * @param cls the plugin handle
536  * @param tc the scheduling context (for rescheduling this function again)
537  */
538 static void
539 udp_plugin_select (void *cls,
540                    const struct GNUNET_SCHEDULER_TaskContext *tc);
541
542
543 /**
544  * We have been notified that our readset has something to read.  We don't
545  * know which socket needs to be read, so we have to check each one
546  * Then reschedule this function to be called again once more is available.
547  *
548  * @param cls the plugin handle
549  * @param tc the scheduling context (for rescheduling this function again)
550  */
551 static void
552 udp_plugin_select_v6 (void *cls,
553                       const struct GNUNET_SCHEDULER_TaskContext *tc);
554
555
556 /**
557  * (re)schedule select tasks for this plugin.
558  *
559  * @param plugin plugin to reschedule
560  */
561 static void
562 schedule_select (struct Plugin *plugin)
563 {
564   struct GNUNET_TIME_Relative min_delay;
565   struct UDP_MessageWrapper *udpw;
566
567   if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
568   {
569     /* Find a message ready to send:
570      * Flow delay from other peer is expired or not set (0) */
571     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
572     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
573       min_delay = GNUNET_TIME_relative_min (min_delay,
574           GNUNET_TIME_absolute_get_remaining (
575               udpw->session->flow_delay_from_other_peer));
576
577     if (plugin->select_task != NULL )
578       GNUNET_SCHEDULER_cancel (plugin->select_task);
579
580     /* Schedule with:
581      * - write active set if message is ready
582      * - timeout minimum delay */
583     plugin->select_task = GNUNET_SCHEDULER_add_select (
584         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
585         (0 == min_delay.rel_value_us) ?
586             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v4,
587         (0 == min_delay.rel_value_us) ? plugin->ws_v4 : NULL,
588         &udp_plugin_select, plugin);
589   }
590   if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
591   {
592     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
593     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
594       min_delay = GNUNET_TIME_relative_min (min_delay,
595           GNUNET_TIME_absolute_get_remaining (
596               udpw->session->flow_delay_from_other_peer));
597
598     if (NULL != plugin->select_task_v6)
599       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
600     plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (
601         GNUNET_SCHEDULER_PRIORITY_DEFAULT,
602         (0 == min_delay.rel_value_us) ?
603             GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v6,
604         (0 == min_delay.rel_value_us) ? plugin->ws_v6 : NULL,
605         &udp_plugin_select_v6, plugin);
606   }
607 }
608
609
610 /**
611  * Function called for a quick conversion of the binary address to
612  * a numeric address.  Note that the caller must not free the
613  * address and that the next call to this function is allowed
614  * to override the address again.
615  *
616  * @param cls closure
617  * @param addr binary address (a `union UdpAddress`)
618  * @param addrlen length of the @a addr
619  * @return string representing the same address
620  */
621 const char *
622 udp_address_to_string (void *cls,
623                        const void *addr,
624                        size_t addrlen)
625 {
626   static char rbuf[INET6_ADDRSTRLEN + 10];
627   char buf[INET6_ADDRSTRLEN];
628   const void *sb;
629   struct in_addr a4;
630   struct in6_addr a6;
631   const struct IPv4UdpAddress *t4;
632   const struct IPv6UdpAddress *t6;
633   int af;
634   uint16_t port;
635   uint32_t options;
636
637   if ((NULL != addr) && (addrlen == sizeof(struct IPv6UdpAddress)))
638   {
639     t6 = addr;
640     af = AF_INET6;
641     options = ntohl (t6->options);
642     port = ntohs (t6->u6_port);
643     memcpy (&a6, &t6->ipv6_addr, sizeof(a6));
644     sb = &a6;
645   }
646   else if ((NULL != addr) && (addrlen == sizeof(struct IPv4UdpAddress)))
647   {
648     t4 = addr;
649     af = AF_INET;
650     options = ntohl (t4->options);
651     port = ntohs (t4->u4_port);
652     memcpy (&a4, &t4->ipv4_addr, sizeof(a4));
653     sb = &a4;
654   }
655   else
656   {
657     return NULL;
658   }
659   inet_ntop (af, sb, buf, INET6_ADDRSTRLEN);
660
661   GNUNET_snprintf (rbuf, sizeof(rbuf),
662                    (af == AF_INET6)
663                    ? "%s.%u.[%s]:%u"
664                    : "%s.%u.%s:%u",
665                    PLUGIN_NAME,
666                    options,
667                    buf,
668                    port);
669   return rbuf;
670 }
671
672
673 /**
674  * Function called to convert a string address to
675  * a binary address.
676  *
677  * @param cls closure (`struct Plugin *`)
678  * @param addr string address
679  * @param addrlen length of the address
680  * @param buf location to store the buffer
681  * @param added location to store the number of bytes in the buffer.
682  *        If the function returns #GNUNET_SYSERR, its contents are undefined.
683  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
684  */
685 static int
686 udp_string_to_address (void *cls,
687                        const char *addr,
688                        uint16_t addrlen,
689                        void **buf,
690                        size_t *added)
691 {
692   struct sockaddr_storage socket_address;
693   char *address;
694   char *plugin;
695   char *optionstr;
696   uint32_t options;
697
698   /* Format tcp.options.address:port */
699   address = NULL;
700   plugin = NULL;
701   optionstr = NULL;
702
703   if ((NULL == addr) || (addrlen == 0))
704   {
705     GNUNET_break(0);
706     return GNUNET_SYSERR;
707   }
708   if ('\0' != addr[addrlen - 1])
709   {
710     GNUNET_break(0);
711     return GNUNET_SYSERR;
712   }
713   if (strlen (addr) != addrlen - 1)
714   {
715     GNUNET_break(0);
716     return GNUNET_SYSERR;
717   }
718   plugin = GNUNET_strdup (addr);
719   optionstr = strchr (plugin, '.');
720   if (NULL == optionstr)
721   {
722     GNUNET_break(0);
723     GNUNET_free(plugin);
724     return GNUNET_SYSERR;
725   }
726   optionstr[0] = '\0';
727   optionstr++;
728   options = atol (optionstr);
729   address = strchr (optionstr, '.');
730   if (NULL == address)
731   {
732     GNUNET_break(0);
733     GNUNET_free(plugin);
734     return GNUNET_SYSERR;
735   }
736   address[0] = '\0';
737   address++;
738
739   if (GNUNET_OK !=
740       GNUNET_STRINGS_to_address_ip (address, strlen (address),
741                                     &socket_address))
742   {
743     GNUNET_break(0);
744     GNUNET_free(plugin);
745     return GNUNET_SYSERR;
746   }
747
748   GNUNET_free(plugin);
749
750   switch (socket_address.ss_family)
751   {
752   case AF_INET:
753   {
754     struct IPv4UdpAddress *u4;
755     struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
756     u4 = GNUNET_new (struct IPv4UdpAddress);
757     u4->options = htonl (options);
758     u4->ipv4_addr = in4->sin_addr.s_addr;
759     u4->u4_port = in4->sin_port;
760     *buf = u4;
761     *added = sizeof(struct IPv4UdpAddress);
762     return GNUNET_OK;
763   }
764   case AF_INET6:
765   {
766     struct IPv6UdpAddress *u6;
767     struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
768     u6 = GNUNET_new (struct IPv6UdpAddress);
769     u6->options = htonl (options);
770     u6->ipv6_addr = in6->sin6_addr;
771     u6->u6_port = in6->sin6_port;
772     *buf = u6;
773     *added = sizeof(struct IPv6UdpAddress);
774     return GNUNET_OK;
775   }
776   default:
777     GNUNET_break(0);
778     return GNUNET_SYSERR;
779   }
780 }
781
782
783 /**
784  * Append our port and forward the result.
785  *
786  * @param cls a `struct PrettyPrinterContext *`
787  * @param hostname result from DNS resolver
788  */
789 static void
790 append_port (void *cls,
791              const char *hostname)
792 {
793   struct PrettyPrinterContext *ppc = cls;
794   struct Plugin *plugin = ppc->plugin;
795   char *ret;
796
797   if (NULL == hostname)
798   {
799     /* Final call, done */
800     ppc->asc (ppc->asc_cls,
801               NULL,
802               GNUNET_OK);
803     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
804                                  plugin->ppc_dll_tail,
805                                  ppc);
806     ppc->resolver_handle = NULL;
807     GNUNET_free (ppc);
808     return;
809   }
810   if (GNUNET_YES == ppc->ipv6)
811     GNUNET_asprintf (&ret,
812                      "%s.%u.[%s]:%d",
813                      PLUGIN_NAME,
814                      ppc->options,
815                      hostname,
816                      ppc->port);
817   else
818     GNUNET_asprintf (&ret,
819                      "%s.%u.%s:%d",
820                      PLUGIN_NAME,
821                      ppc->options,
822                      hostname,
823                      ppc->port);
824   ppc->asc (ppc->asc_cls,
825             ret,
826             GNUNET_OK);
827   GNUNET_free (ret);
828 }
829
830
831 /**
832  * Convert the transports address to a nice, human-readable
833  * format.
834  *
835  * @param cls closure with the `struct Plugin *`
836  * @param type name of the transport that generated the address
837  * @param addr one of the addresses of the host, NULL for the last address
838  *        the specific address format depends on the transport;
839  *        a `union UdpAddress`
840  * @param addrlen length of the address
841  * @param numeric should (IP) addresses be displayed in numeric form?
842  * @param timeout after how long should we give up?
843  * @param asc function to call on each string
844  * @param asc_cls closure for @a asc
845  */
846 static void
847 udp_plugin_address_pretty_printer (void *cls,
848                                    const char *type,
849                                    const void *addr,
850                                    size_t addrlen,
851                                    int numeric,
852                                    struct GNUNET_TIME_Relative timeout,
853                                    GNUNET_TRANSPORT_AddressStringCallback asc,
854                                    void *asc_cls)
855 {
856   struct Plugin *plugin = cls;
857   struct PrettyPrinterContext *ppc;
858   const void *sb;
859   size_t sbs;
860   struct sockaddr_in a4;
861   struct sockaddr_in6 a6;
862   const struct IPv4UdpAddress *u4;
863   const struct IPv6UdpAddress *u6;
864   uint16_t port;
865   uint32_t options;
866
867   if (addrlen == sizeof(struct IPv6UdpAddress))
868   {
869     u6 = addr;
870     memset (&a6, 0, sizeof(a6));
871     a6.sin6_family = AF_INET6;
872 #if HAVE_SOCKADDR_IN_SIN_LEN
873     a6.sin6_len = sizeof (a6);
874 #endif
875     a6.sin6_port = u6->u6_port;
876     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
877     port = ntohs (u6->u6_port);
878     options = ntohl (u6->options);
879     sb = &a6;
880     sbs = sizeof(a6);
881   }
882   else if (addrlen == sizeof(struct IPv4UdpAddress))
883   {
884     u4 = addr;
885     memset (&a4, 0, sizeof(a4));
886     a4.sin_family = AF_INET;
887 #if HAVE_SOCKADDR_IN_SIN_LEN
888     a4.sin_len = sizeof (a4);
889 #endif
890     a4.sin_port = u4->u4_port;
891     a4.sin_addr.s_addr = u4->ipv4_addr;
892     port = ntohs (u4->u4_port);
893     options = ntohl (u4->options);
894     sb = &a4;
895     sbs = sizeof(a4);
896   }
897   else
898   {
899     /* invalid address */
900     GNUNET_break_op (0);
901     asc (asc_cls, NULL , GNUNET_SYSERR);
902     asc (asc_cls, NULL, GNUNET_OK);
903     return;
904   }
905   ppc = GNUNET_new (struct PrettyPrinterContext);
906   ppc->plugin = plugin;
907   ppc->asc = asc;
908   ppc->asc_cls = asc_cls;
909   ppc->port = port;
910   ppc->options = options;
911   if (addrlen == sizeof(struct IPv6UdpAddress))
912     ppc->ipv6 = GNUNET_YES;
913   else
914     ppc->ipv6 = GNUNET_NO;
915   GNUNET_CONTAINER_DLL_insert (plugin->ppc_dll_head,
916                                plugin->ppc_dll_tail,
917                                ppc);
918   ppc->resolver_handle
919     = GNUNET_RESOLVER_hostname_get (sb,
920                                     sbs,
921                                     ! numeric,
922                                     timeout,
923                                     &append_port, ppc);
924 }
925
926
927 /**
928  * FIXME.
929  */
930 static void
931 call_continuation (struct UDP_MessageWrapper *udpw,
932                    int result)
933 {
934   struct Session *session = udpw->session;
935   struct Plugin *plugin = session->plugin;
936   size_t overhead;
937
938   LOG (GNUNET_ERROR_TYPE_DEBUG,
939        "Calling continuation for %u byte message to `%s' with result %s\n",
940        udpw->payload_size,
941        GNUNET_i2s (&udpw->session->target),
942        (GNUNET_OK == result) ? "OK" : "SYSERR");
943
944   if (udpw->msg_size >= udpw->payload_size)
945     overhead = udpw->msg_size - udpw->payload_size;
946   else
947     overhead = udpw->msg_size;
948
949   switch (result)
950   {
951   case GNUNET_OK:
952     switch (udpw->msg_type)
953     {
954     case UMT_MSG_UNFRAGMENTED:
955       if (NULL != udpw->cont)
956       {
957         /* Transport continuation */
958         udpw->cont (udpw->cont_cls,
959                     &udpw->session->target,
960                     result,
961                     udpw->payload_size,
962                     udpw->msg_size);
963       }
964       GNUNET_STATISTICS_update (plugin->env->stats,
965                                 "# UDP, unfragmented msgs, messages, sent, success",
966                                 1,
967                                 GNUNET_NO);
968       GNUNET_STATISTICS_update (plugin->env->stats,
969                                 "# UDP, unfragmented msgs, bytes payload, sent, success",
970                                 udpw->payload_size,
971                                 GNUNET_NO);
972       GNUNET_STATISTICS_update (plugin->env->stats,
973                                 "# UDP, unfragmented msgs, bytes overhead, sent, success",
974                                 overhead,
975                                 GNUNET_NO);
976       GNUNET_STATISTICS_update (plugin->env->stats,
977                                 "# UDP, total, bytes overhead, sent",
978                                 overhead,
979                                 GNUNET_NO);
980       GNUNET_STATISTICS_update (plugin->env->stats,
981                                 "# UDP, total, bytes payload, sent",
982                                 udpw->payload_size,
983                                 GNUNET_NO);
984       break;
985     case UMT_MSG_FRAGMENTED_COMPLETE:
986       GNUNET_assert(NULL != udpw->frag_ctx);
987       if (udpw->frag_ctx->cont != NULL )
988         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls,
989                               &udpw->session->target,
990                               GNUNET_OK,
991                               udpw->frag_ctx->payload_size,
992                               udpw->frag_ctx->on_wire_size);
993       GNUNET_STATISTICS_update (plugin->env->stats,
994                                 "# UDP, fragmented msgs, messages, sent, success",
995                                 1,
996                                 GNUNET_NO);
997       GNUNET_STATISTICS_update (plugin->env->stats,
998                                 "# UDP, fragmented msgs, bytes payload, sent, success",
999                                 udpw->payload_size,
1000                                 GNUNET_NO);
1001       GNUNET_STATISTICS_update (plugin->env->stats,
1002                                 "# UDP, fragmented msgs, bytes overhead, sent, success",
1003                                 overhead,
1004                                 GNUNET_NO);
1005       GNUNET_STATISTICS_update (plugin->env->stats,
1006                                 "# UDP, total, bytes overhead, sent",
1007                                 overhead,
1008                                 GNUNET_NO);
1009       GNUNET_STATISTICS_update (plugin->env->stats,
1010                                 "# UDP, total, bytes payload, sent",
1011                                 udpw->payload_size,
1012                                 GNUNET_NO);
1013       GNUNET_STATISTICS_update (plugin->env->stats,
1014                                 "# UDP, fragmented msgs, messages, pending",
1015                                 -1,
1016                                 GNUNET_NO);
1017       break;
1018     case UMT_MSG_FRAGMENTED:
1019       /* Fragmented message: enqueue next fragment */
1020       if (NULL != udpw->cont)
1021         udpw->cont (udpw->cont_cls,
1022                     &udpw->session->target,
1023                     result,
1024                     udpw->payload_size,
1025                     udpw->msg_size);
1026       GNUNET_STATISTICS_update (plugin->env->stats,
1027                                 "# UDP, fragmented msgs, fragments, sent, success",
1028                                 1,
1029                                 GNUNET_NO);
1030       GNUNET_STATISTICS_update (plugin->env->stats,
1031                                 "# UDP, fragmented msgs, fragments bytes, sent, success",
1032                                 udpw->msg_size,
1033                                 GNUNET_NO);
1034       break;
1035     case UMT_MSG_ACK:
1036       /* No continuation */
1037       GNUNET_STATISTICS_update (plugin->env->stats,
1038                                 "# UDP, ACK msgs, messages, sent, success",
1039                                 1,
1040                                 GNUNET_NO);
1041       GNUNET_STATISTICS_update (plugin->env->stats,
1042                                 "# UDP, ACK msgs, bytes overhead, sent, success",
1043                                 overhead,
1044                                 GNUNET_NO);
1045       GNUNET_STATISTICS_update (plugin->env->stats,
1046                                 "# UDP, total, bytes overhead, sent",
1047                                 overhead,
1048                                 GNUNET_NO);
1049       break;
1050     default:
1051       GNUNET_break(0);
1052       break;
1053     }
1054     break;
1055   case GNUNET_SYSERR:
1056     switch (udpw->msg_type)
1057     {
1058     case UMT_MSG_UNFRAGMENTED:
1059       /* Unfragmented message: failed to send */
1060       if (NULL != udpw->cont)
1061         udpw->cont (udpw->cont_cls,
1062                     &udpw->session->target,
1063                     result,
1064                     udpw->payload_size,
1065                     overhead);
1066       GNUNET_STATISTICS_update (plugin->env->stats,
1067                                 "# UDP, unfragmented msgs, messages, sent, failure",
1068                                 1,
1069                                 GNUNET_NO);
1070       GNUNET_STATISTICS_update (plugin->env->stats,
1071                                 "# UDP, unfragmented msgs, bytes payload, sent, failure",
1072                                 udpw->payload_size,
1073                                 GNUNET_NO);
1074       GNUNET_STATISTICS_update (plugin->env->stats,
1075                                 "# UDP, unfragmented msgs, bytes overhead, sent, failure",
1076                                 overhead,
1077                                 GNUNET_NO);
1078       break;
1079     case UMT_MSG_FRAGMENTED_COMPLETE:
1080       GNUNET_assert (NULL != udpw->frag_ctx);
1081       if (udpw->frag_ctx->cont != NULL)
1082         udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls,
1083                               &udpw->session->target,
1084                               GNUNET_SYSERR,
1085                               udpw->frag_ctx->payload_size,
1086                               udpw->frag_ctx->on_wire_size);
1087       GNUNET_STATISTICS_update (plugin->env->stats,
1088                                 "# UDP, fragmented msgs, messages, sent, failure",
1089                                 1,
1090                                 GNUNET_NO);
1091       GNUNET_STATISTICS_update (plugin->env->stats,
1092                                 "# UDP, fragmented msgs, bytes payload, sent, failure",
1093                                 udpw->payload_size,
1094                                 GNUNET_NO);
1095       GNUNET_STATISTICS_update (plugin->env->stats,
1096                                 "# UDP, fragmented msgs, bytes payload, sent, failure",
1097                                 overhead,
1098                                 GNUNET_NO);
1099       GNUNET_STATISTICS_update (plugin->env->stats,
1100                                 "# UDP, fragmented msgs, bytes payload, sent, failure",
1101                                 overhead,
1102                                 GNUNET_NO);
1103       GNUNET_STATISTICS_update (plugin->env->stats,
1104                                 "# UDP, fragmented msgs, messages, pending",
1105                                 -1,
1106                                 GNUNET_NO);
1107       break;
1108     case UMT_MSG_FRAGMENTED:
1109       GNUNET_assert (NULL != udpw->frag_ctx);
1110       /* Fragmented message: failed to send */
1111       GNUNET_STATISTICS_update (plugin->env->stats,
1112                                 "# UDP, fragmented msgs, fragments, sent, failure",
1113                                 1,
1114                                 GNUNET_NO);
1115       GNUNET_STATISTICS_update (plugin->env->stats,
1116                                 "# UDP, fragmented msgs, fragments bytes, sent, failure",
1117                                 udpw->msg_size,
1118                                 GNUNET_NO);
1119       break;
1120     case UMT_MSG_ACK:
1121       /* ACK message: failed to send */
1122       GNUNET_STATISTICS_update (plugin->env->stats,
1123                                 "# UDP, ACK msgs, messages, sent, failure",
1124                                 1,
1125                                 GNUNET_NO);
1126       break;
1127     default:
1128       GNUNET_break(0);
1129       break;
1130     }
1131     break;
1132   default:
1133     GNUNET_break(0);
1134     break;
1135   }
1136 }
1137
1138
1139 /**
1140  * Check if the given port is plausible (must be either our listen
1141  * port or our advertised port).  If it is neither, we return
1142  * #GNUNET_SYSERR.
1143  *
1144  * @param plugin global variables
1145  * @param in_port port number to check
1146  * @return #GNUNET_OK if port is either open_port or adv_port
1147  */
1148 static int
1149 check_port (struct Plugin *plugin,
1150             uint16_t in_port)
1151 {
1152   if ((in_port == plugin->port) || (in_port == plugin->aport))
1153     return GNUNET_OK;
1154   return GNUNET_SYSERR;
1155 }
1156
1157
1158 /**
1159  * Function that will be called to check if a binary address for this
1160  * plugin is well-formed and corresponds to an address for THIS peer
1161  * (as per our configuration).  Naturally, if absolutely necessary,
1162  * plugins can be a bit conservative in their answer, but in general
1163  * plugins should make sure that the address does not redirect
1164  * traffic to a 3rd party that might try to man-in-the-middle our
1165  * traffic.
1166  *
1167  * @param cls closure, should be our handle to the Plugin
1168  * @param addr pointer to a `union UdpAddress`
1169  * @param addrlen length of @a addr
1170  * @return #GNUNET_OK if this is a plausible address for this peer
1171  *         and transport, #GNUNET_SYSERR if not
1172  */
1173 static int
1174 udp_plugin_check_address (void *cls,
1175                           const void *addr,
1176                           size_t addrlen)
1177 {
1178   struct Plugin *plugin = cls;
1179   struct IPv4UdpAddress *v4;
1180   struct IPv6UdpAddress *v6;
1181
1182   if ( (addrlen != sizeof(struct IPv4UdpAddress)) &&
1183        (addrlen != sizeof(struct IPv6UdpAddress)) )
1184   {
1185     GNUNET_break_op(0);
1186     return GNUNET_SYSERR;
1187   }
1188   if (addrlen == sizeof(struct IPv4UdpAddress))
1189   {
1190     v4 = (struct IPv4UdpAddress *) addr;
1191     if (GNUNET_OK != check_port (plugin, ntohs (v4->u4_port)))
1192       return GNUNET_SYSERR;
1193     if (GNUNET_OK !=
1194         GNUNET_NAT_test_address (plugin->nat,
1195                                  &v4->ipv4_addr,
1196                                  sizeof (struct in_addr)))
1197       return GNUNET_SYSERR;
1198   }
1199   else
1200   {
1201     v6 = (struct IPv6UdpAddress *) addr;
1202     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1203     {
1204       GNUNET_break_op(0);
1205       return GNUNET_SYSERR;
1206     }
1207     if (GNUNET_OK != check_port (plugin, ntohs (v6->u6_port)))
1208       return GNUNET_SYSERR;
1209     if (GNUNET_OK !=
1210         GNUNET_NAT_test_address (plugin->nat,
1211                                  &v6->ipv6_addr,
1212                                  sizeof(struct in6_addr)))
1213       return GNUNET_SYSERR;
1214   }
1215   return GNUNET_OK;
1216 }
1217
1218
1219 /**
1220  * Function to free last resources associated with a session.
1221  *
1222  * @param s session to free
1223  */
1224 static void
1225 free_session (struct Session *s)
1226 {
1227   if (NULL != s->frag_ctx)
1228   {
1229     GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, NULL, NULL );
1230     GNUNET_free(s->frag_ctx);
1231     s->frag_ctx = NULL;
1232   }
1233   GNUNET_free(s);
1234 }
1235
1236
1237 /**
1238  * Remove a message from the transmission queue.
1239  *
1240  * @param plugin the UDP plugin
1241  * @param udpw message wrapper to queue
1242  */
1243 static void
1244 dequeue (struct Plugin *plugin,
1245          struct UDP_MessageWrapper *udpw)
1246 {
1247   struct Session *session = udpw->session;
1248
1249   if (plugin->bytes_in_buffer < udpw->msg_size)
1250   {
1251     GNUNET_break (0);
1252   }
1253   else
1254   {
1255     GNUNET_STATISTICS_update (plugin->env->stats,
1256                               "# UDP, total, bytes in buffers",
1257                               -(long long) udpw->msg_size,
1258                               GNUNET_NO);
1259     plugin->bytes_in_buffer -= udpw->msg_size;
1260   }
1261   GNUNET_STATISTICS_update (plugin->env->stats,
1262                             "# UDP, total, msgs in buffers",
1263                             -1, GNUNET_NO);
1264   if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress))
1265     GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
1266                                  plugin->ipv4_queue_tail,
1267                                  udpw);
1268   else if (udpw->session->address->address_length == sizeof(struct IPv6UdpAddress))
1269     GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
1270                                  plugin->ipv6_queue_tail,
1271                                  udpw);
1272   else
1273   {
1274     GNUNET_break (0);
1275     return;
1276   }
1277   GNUNET_assert (session->msgs_in_queue > 0);
1278   session->msgs_in_queue--;
1279   GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
1280   session->bytes_in_queue -= udpw->msg_size;
1281 }
1282
1283
1284 /**
1285  * We have completed our (attempt) to transmit a message
1286  * that had to be fragmented -- either because we got an
1287  * ACK saying that all fragments were received, or because
1288  * of timeout / disconnect.  Clean up our state.
1289  *
1290  * @param fc fragmentation context to clean up
1291  * @param result #GNUNET_OK if we succeeded (got ACK),
1292  *               #GNUNET_SYSERR if the transmission failed
1293  */
1294 static void
1295 fragmented_message_done (struct UDP_FragmentationContext *fc,
1296                          int result)
1297 {
1298   struct Plugin *plugin = fc->plugin;
1299   struct Session *s = fc->session;
1300   struct UDP_MessageWrapper *udpw;
1301   struct UDP_MessageWrapper *tmp;
1302   struct UDP_MessageWrapper dummy;
1303
1304   LOG (GNUNET_ERROR_TYPE_DEBUG,
1305        "%p : Fragmented message removed with result %s\n",
1306        fc,
1307        (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS");
1308
1309   /* Call continuation for fragmented message */
1310   memset (&dummy, 0, sizeof(dummy));
1311   dummy.msg_type = UMT_MSG_FRAGMENTED_COMPLETE;
1312   dummy.msg_size = s->frag_ctx->on_wire_size;
1313   dummy.payload_size = s->frag_ctx->payload_size;
1314   dummy.frag_ctx = s->frag_ctx;
1315   dummy.cont = NULL;
1316   dummy.cont_cls = NULL;
1317   dummy.session = s;
1318   call_continuation (&dummy, result);
1319   /* Remove leftover fragments from queue */
1320   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
1321   {
1322     udpw = plugin->ipv6_queue_head;
1323     while (NULL != udpw)
1324     {
1325       tmp = udpw->next;
1326       if ( (udpw->frag_ctx != NULL) &&
1327            (udpw->frag_ctx == s->frag_ctx) )
1328       {
1329         dequeue (plugin, udpw);
1330         call_continuation (udpw, GNUNET_SYSERR);
1331         GNUNET_free (udpw);
1332       }
1333       udpw = tmp;
1334     }
1335   }
1336   if (s->address->address_length == sizeof(struct IPv4UdpAddress))
1337   {
1338     udpw = plugin->ipv4_queue_head;
1339     while (udpw != NULL )
1340     {
1341       tmp = udpw->next;
1342       if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx))
1343       {
1344         dequeue (plugin, udpw);
1345         call_continuation (udpw, GNUNET_SYSERR);
1346         GNUNET_free(udpw);
1347       }
1348       udpw = tmp;
1349     }
1350   }
1351   notify_session_monitor (s->plugin,
1352                           s,
1353                           GNUNET_TRANSPORT_SS_UPDATE);
1354   /* Destroy fragmentation context */
1355   GNUNET_FRAGMENT_context_destroy (fc->frag,
1356                                    &s->last_expected_msg_delay,
1357                                    &s->last_expected_ack_delay);
1358   s->frag_ctx = NULL;
1359   GNUNET_free (fc);
1360 }
1361
1362
1363 /**
1364  * Scan the heap for a receive context with the given address.
1365  *
1366  * @param cls the `struct FindReceiveContext`
1367  * @param node internal node of the heap
1368  * @param element value stored at the node (a `struct ReceiveContext`)
1369  * @param cost cost associated with the node
1370  * @return #GNUNET_YES if we should continue to iterate,
1371  *         #GNUNET_NO if not.
1372  */
1373 static int
1374 find_receive_context (void *cls,
1375                       struct GNUNET_CONTAINER_HeapNode *node,
1376                       void *element,
1377                       GNUNET_CONTAINER_HeapCostType cost)
1378 {
1379   struct FindReceiveContext *frc = cls;
1380   struct DefragContext *e = element;
1381
1382   if ( (frc->udp_addr_len == e->udp_addr_len) &&
1383        (0 == memcmp (frc->udp_addr,
1384                      e->udp_addr,
1385                      frc->udp_addr_len)) )
1386   {
1387     frc->rc = e;
1388     return GNUNET_NO;
1389   }
1390   return GNUNET_YES;
1391 }
1392
1393
1394 /**
1395  * Functions with this signature are called whenever we need
1396  * to close a session due to a disconnect or failure to
1397  * establish a connection.
1398  *
1399  * @param cls closure with the `struct Plugin`
1400  * @param s session to close down
1401  * @return #GNUNET_OK on success
1402  */
1403 static int
1404 udp_disconnect_session (void *cls,
1405                         struct Session *s)
1406 {
1407   struct Plugin *plugin = cls;
1408   struct UDP_MessageWrapper *udpw;
1409   struct UDP_MessageWrapper *next;
1410   struct FindReceiveContext frc;
1411
1412   GNUNET_assert (GNUNET_YES != s->in_destroy);
1413   LOG (GNUNET_ERROR_TYPE_DEBUG,
1414        "Session %p to peer `%s' address ended\n",
1415        s,
1416        GNUNET_i2s (&s->target),
1417        udp_address_to_string (plugin,
1418                               s->address->address,
1419                               s->address->address_length));
1420   /* stop timeout task */
1421   if (NULL != s->timeout_task)
1422   {
1423     GNUNET_SCHEDULER_cancel (s->timeout_task);
1424     s->timeout_task = NULL;
1425   }
1426   if (NULL != s->frag_ctx)
1427   {
1428     /* Remove fragmented message due to disconnect */
1429     fragmented_message_done (s->frag_ctx,
1430                              GNUNET_SYSERR);
1431   }
1432
1433   frc.rc = NULL;
1434   frc.udp_addr = s->address->address;
1435   frc.udp_addr_len = s->address->address_length;
1436   /* Lookup existing receive context for this address */
1437   if (NULL != plugin->defrag_ctxs)
1438   {
1439     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
1440                                    &find_receive_context,
1441                                    &frc);
1442     if (NULL != frc.rc)
1443     {
1444       struct DefragContext *d_ctx = frc.rc;
1445
1446       GNUNET_CONTAINER_heap_remove_node (d_ctx->hnode);
1447       GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
1448       GNUNET_free (d_ctx);
1449     }
1450   }
1451   next = plugin->ipv4_queue_head;
1452   while (NULL != (udpw = next))
1453   {
1454     next = udpw->next;
1455     if (udpw->session == s)
1456     {
1457       dequeue (plugin, udpw);
1458       call_continuation (udpw, GNUNET_SYSERR);
1459       GNUNET_free(udpw);
1460     }
1461   }
1462   next = plugin->ipv6_queue_head;
1463   while (NULL != (udpw = next))
1464   {
1465     next = udpw->next;
1466     if (udpw->session == s)
1467     {
1468       dequeue (plugin, udpw);
1469       call_continuation (udpw, GNUNET_SYSERR);
1470       GNUNET_free(udpw);
1471     }
1472   }
1473   notify_session_monitor (s->plugin,
1474                           s,
1475                           GNUNET_TRANSPORT_SS_DONE);
1476   plugin->env->session_end (plugin->env->cls,
1477                             s->address,
1478                             s);
1479
1480   if (NULL != s->frag_ctx)
1481   {
1482     if (NULL != s->frag_ctx->cont)
1483     {
1484       s->frag_ctx->cont (s->frag_ctx->cont_cls,
1485                          &s->target,
1486                          GNUNET_SYSERR,
1487                          s->frag_ctx->payload_size,
1488                          s->frag_ctx->on_wire_size);
1489       LOG (GNUNET_ERROR_TYPE_DEBUG,
1490            "Calling continuation for fragemented message to `%s' with result SYSERR\n",
1491            GNUNET_i2s (&s->target));
1492     }
1493   }
1494
1495   GNUNET_assert (GNUNET_YES ==
1496                  GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
1497                                                        &s->target,
1498                                                        s));
1499   GNUNET_STATISTICS_set (plugin->env->stats,
1500                          "# UDP sessions active",
1501                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
1502                          GNUNET_NO);
1503   if (s->rc > 0)
1504   {
1505     s->in_destroy = GNUNET_YES;
1506   }
1507   else
1508   {
1509     GNUNET_HELLO_address_free (s->address);
1510     free_session (s);
1511   }
1512   return GNUNET_OK;
1513 }
1514
1515
1516 /**
1517  * Function that is called to get the keepalive factor.
1518  * #GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT is divided by this number to
1519  * calculate the interval between keepalive packets.
1520  *
1521  * @param cls closure with the `struct Plugin`
1522  * @return keepalive factor
1523  */
1524 static unsigned int
1525 udp_query_keepalive_factor (void *cls)
1526 {
1527   return 15;
1528 }
1529
1530
1531 /**
1532  * Destroy a session, plugin is being unloaded.
1533  *
1534  * @param cls the `struct Plugin`
1535  * @param key hash of public key of target peer
1536  * @param value a `struct PeerSession *` to clean up
1537  * @return #GNUNET_OK (continue to iterate)
1538  */
1539 static int
1540 disconnect_and_free_it (void *cls,
1541                         const struct GNUNET_PeerIdentity *key,
1542                         void *value)
1543 {
1544   struct Plugin *plugin = cls;
1545
1546   udp_disconnect_session (plugin, value);
1547   return GNUNET_OK;
1548 }
1549
1550
1551 /**
1552  * Disconnect from a remote node.  Clean up session if we have one for
1553  * this peer.
1554  *
1555  * @param cls closure for this call (should be handle to Plugin)
1556  * @param target the peeridentity of the peer to disconnect
1557  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
1558  */
1559 static void
1560 udp_disconnect (void *cls,
1561                 const struct GNUNET_PeerIdentity *target)
1562 {
1563   struct Plugin *plugin = cls;
1564
1565   LOG (GNUNET_ERROR_TYPE_DEBUG,
1566        "Disconnecting from peer `%s'\n",
1567        GNUNET_i2s (target));
1568   /* Clean up sessions */
1569   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1570                                               target,
1571                                               &disconnect_and_free_it,
1572                                               plugin);
1573 }
1574
1575
1576 /**
1577  * Session was idle, so disconnect it
1578  *
1579  * @param cls the `struct Session` to time out
1580  * @param tc scheduler context
1581  */
1582 static void
1583 session_timeout (void *cls,
1584                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1585 {
1586   struct Session *s = cls;
1587   struct Plugin *plugin = s->plugin;
1588   struct GNUNET_TIME_Relative left;
1589
1590   s->timeout_task = NULL;
1591   left = GNUNET_TIME_absolute_get_remaining (s->timeout);
1592   if (left.rel_value_us > 0)
1593   {
1594     /* not actually our turn yet, but let's at least update
1595        the monitor, it may think we're about to die ... */
1596     notify_session_monitor (s->plugin,
1597                             s,
1598                             GNUNET_TRANSPORT_SS_UPDATE);
1599     s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
1600                                                     &session_timeout,
1601                                                     s);
1602     return;
1603   }
1604   LOG (GNUNET_ERROR_TYPE_DEBUG,
1605        "Session %p was idle for %s, disconnecting\n",
1606        s,
1607        GNUNET_STRINGS_relative_time_to_string (UDP_SESSION_TIME_OUT,
1608                                                GNUNET_YES));
1609   /* call session destroy function */
1610   udp_disconnect_session (plugin, s);
1611 }
1612
1613
1614 /**
1615  * Increment session timeout due to activity
1616  *
1617  * @param s session to reschedule timeout activity for
1618  */
1619 static void
1620 reschedule_session_timeout (struct Session *s)
1621 {
1622   if (GNUNET_YES == s->in_destroy)
1623     return;
1624   GNUNET_assert(NULL != s->timeout_task);
1625   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1626 }
1627
1628
1629 /**
1630  * Function obtain the network type for a session
1631  *
1632  * @param cls closure (`struct Plugin *`)
1633  * @param session the session
1634  * @return the network type
1635  */
1636 static enum GNUNET_ATS_Network_Type
1637 udp_get_network (void *cls,
1638                  struct Session *session)
1639 {
1640   return session->scope;
1641 }
1642
1643
1644 /**
1645  * Closure for #session_cmp_it().
1646  */
1647 struct SessionCompareContext
1648 {
1649   /**
1650    * Set to session matching the address.
1651    */
1652   struct Session *res;
1653
1654   /**
1655    * Address we are looking for.
1656    */
1657   const struct GNUNET_HELLO_Address *address;
1658 };
1659
1660
1661 /**
1662  * Find a session with a matching address.
1663  *
1664  * @param cls the `struct SessionCompareContext *`
1665  * @param key peer identity (unused)
1666  * @param value the `struct Session *`
1667  * @return #GNUNET_NO if we found the session, #GNUNET_OK if not
1668  */
1669 static int
1670 session_cmp_it (void *cls,
1671                 const struct GNUNET_PeerIdentity *key,
1672                 void *value)
1673 {
1674   struct SessionCompareContext *cctx = cls;
1675   struct Session *s = value;
1676
1677   if (0 == GNUNET_HELLO_address_cmp (s->address,
1678                                      cctx->address))
1679   {
1680     cctx->res = s;
1681     return GNUNET_NO;
1682   }
1683   return GNUNET_YES;
1684 }
1685
1686
1687 /**
1688  * Locate an existing session the transport service is using to
1689  * send data to another peer.  Performs some basic sanity checks
1690  * on the address and then tries to locate a matching session.
1691  *
1692  * @param cls the plugin
1693  * @param address the address we should locate the session by
1694  * @return the session if it exists, or NULL if it is not found
1695  */
1696 static struct Session *
1697 udp_plugin_lookup_session (void *cls,
1698                            const struct GNUNET_HELLO_Address *address)
1699 {
1700   struct Plugin *plugin = cls;
1701   const struct IPv6UdpAddress *udp_a6;
1702   const struct IPv4UdpAddress *udp_a4;
1703   struct SessionCompareContext cctx;
1704
1705   if ( (NULL == address->address) ||
1706        ((address->address_length != sizeof (struct IPv4UdpAddress)) &&
1707         (address->address_length != sizeof (struct IPv6UdpAddress))))
1708   {
1709     LOG (GNUNET_ERROR_TYPE_WARNING,
1710          "Trying to locate session for address of unexpected length %u (should be %u or %u)\n",
1711          address->address_length,
1712          sizeof (struct IPv4UdpAddress),
1713          sizeof (struct IPv6UdpAddress));
1714     return NULL;
1715   }
1716
1717   if (address->address_length == sizeof(struct IPv4UdpAddress))
1718   {
1719     if (NULL == plugin->sockv4)
1720       return NULL;
1721     udp_a4 = (const struct IPv4UdpAddress *) address->address;
1722     if (0 == udp_a4->u4_port)
1723       return NULL;
1724   }
1725
1726   if (address->address_length == sizeof(struct IPv6UdpAddress))
1727   {
1728     if (NULL == plugin->sockv6)
1729       return NULL;
1730     udp_a6 = (const struct IPv6UdpAddress *) address->address;
1731     if (0 == udp_a6->u6_port)
1732       return NULL;
1733   }
1734
1735   /* check if session already exists */
1736   cctx.address = address;
1737   cctx.res = NULL;
1738   LOG (GNUNET_ERROR_TYPE_DEBUG,
1739        "Looking for existing session for peer `%s' `%s' \n",
1740        GNUNET_i2s (&address->peer),
1741        udp_address_to_string (plugin,
1742                               address->address,
1743                               address->address_length));
1744   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->sessions,
1745                                               &address->peer,
1746                                               &session_cmp_it,
1747                                               &cctx);
1748   if (NULL != cctx.res)
1749   {
1750     LOG (GNUNET_ERROR_TYPE_DEBUG,
1751          "Found existing session %p\n",
1752          cctx.res);
1753     return cctx.res;
1754   }
1755   return NULL;
1756 }
1757
1758
1759 /**
1760  * Allocate a new session for the given endpoint address.
1761  * Note that this function does not inform the service
1762  * of the new session, this is the responsibility of the
1763  * caller (if needed).
1764  *
1765  * @param cls the `struct Plugin`
1766  * @param address address of the other peer to use
1767  * @param network_type network type the address belongs to
1768  * @return NULL on error, otherwise session handle
1769  */
1770 static struct Session *
1771 udp_plugin_create_session (void *cls,
1772                            const struct GNUNET_HELLO_Address *address,
1773                            enum GNUNET_ATS_Network_Type network_type)
1774 {
1775   struct Plugin *plugin = cls;
1776   struct Session *s;
1777
1778   s = GNUNET_new (struct Session);
1779   s->plugin = plugin;
1780   s->address = GNUNET_HELLO_address_copy (address);
1781   s->target = address->peer;
1782   s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1783                                                               250);
1784   s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1785   s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS;
1786   s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO;
1787   s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT);
1788   s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT,
1789                                                   &session_timeout, s);
1790   s->scope = network_type;
1791
1792   LOG (GNUNET_ERROR_TYPE_DEBUG,
1793        "Creating new session %p for peer `%s' address `%s'\n",
1794        s,
1795        GNUNET_i2s (&address->peer),
1796        udp_address_to_string (plugin,
1797                               address->address,
1798                               address->address_length));
1799   GNUNET_assert(GNUNET_OK ==
1800                 GNUNET_CONTAINER_multipeermap_put (plugin->sessions,
1801                                                    &s->target,
1802                                                    s,
1803                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1804   GNUNET_STATISTICS_set (plugin->env->stats,
1805                          "# UDP sessions active",
1806                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
1807                          GNUNET_NO);
1808   return s;
1809 }
1810
1811
1812 /**
1813  * Function that will be called whenever the transport service wants to
1814  * notify the plugin that a session is still active and in use and
1815  * therefore the session timeout for this session has to be updated
1816  *
1817  * @param cls closure with the `struct Plugin`
1818  * @param peer which peer was the session for
1819  * @param session which session is being updated
1820  */
1821 static void
1822 udp_plugin_update_session_timeout (void *cls,
1823                                    const struct GNUNET_PeerIdentity *peer,
1824                                    struct Session *session)
1825 {
1826   struct Plugin *plugin = cls;
1827
1828   if (GNUNET_YES !=
1829       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
1830                                                     peer,
1831                                                     session))
1832   {
1833     GNUNET_break(0);
1834     return;
1835   }
1836   /* Reschedule session timeout */
1837   reschedule_session_timeout (session);
1838 }
1839
1840
1841 /**
1842  * Creates a new outbound session the transport service will use to
1843  * send data to the peer.
1844  *
1845  * @param cls the `struct Plugin *`
1846  * @param address the address
1847  * @return the session or NULL of max connections exceeded
1848  */
1849 static struct Session *
1850 udp_plugin_get_session (void *cls,
1851                         const struct GNUNET_HELLO_Address *address)
1852 {
1853   struct Plugin *plugin = cls;
1854   struct Session *s;
1855   enum GNUNET_ATS_Network_Type network_type;
1856   struct IPv4UdpAddress *udp_v4;
1857   struct IPv6UdpAddress *udp_v6;
1858
1859   if (NULL == address)
1860   {
1861     GNUNET_break (0);
1862     return NULL;
1863   }
1864   if ( (address->address_length != sizeof(struct IPv4UdpAddress)) &&
1865        (address->address_length != sizeof(struct IPv6UdpAddress)) )
1866   {
1867     GNUNET_break_op (0);
1868     return NULL;
1869   }
1870   if (NULL != (s = udp_plugin_lookup_session (cls,
1871                                               address)))
1872     return s;
1873
1874   /* need to create new session */
1875   if (sizeof (struct IPv4UdpAddress) == address->address_length)
1876   {
1877     struct sockaddr_in v4;
1878
1879     udp_v4 = (struct IPv4UdpAddress *) address->address;
1880     memset (&v4, '\0', sizeof (v4));
1881     v4.sin_family = AF_INET;
1882 #if HAVE_SOCKADDR_IN_SIN_LEN
1883     v4.sin_len = sizeof (struct sockaddr_in);
1884 #endif
1885     v4.sin_port = udp_v4->u4_port;
1886     v4.sin_addr.s_addr = udp_v4->ipv4_addr;
1887     network_type = plugin->env->get_address_type (plugin->env->cls,
1888                                                   (const struct sockaddr *) &v4,
1889                                                   sizeof (v4));
1890   }
1891   if (sizeof (struct IPv6UdpAddress) == address->address_length)
1892   {
1893     struct sockaddr_in6 v6;
1894
1895     udp_v6 = (struct IPv6UdpAddress *) address->address;
1896     memset (&v6, '\0', sizeof (v6));
1897     v6.sin6_family = AF_INET6;
1898 #if HAVE_SOCKADDR_IN_SIN_LEN
1899     v6.sin6_len = sizeof (struct sockaddr_in6);
1900 #endif
1901     v6.sin6_port = udp_v6->u6_port;
1902     v6.sin6_addr = udp_v6->ipv6_addr;
1903     network_type = plugin->env->get_address_type (plugin->env->cls,
1904                                                   (const struct sockaddr *) &v6,
1905                                                   sizeof (v6));
1906   }
1907   return udp_plugin_create_session (cls,
1908                                     address,
1909                                     network_type);
1910 }
1911
1912
1913 /**
1914  * Enqueue a message for transmission.
1915  *
1916  * @param plugin the UDP plugin
1917  * @param udpw message wrapper to queue
1918  */
1919 static void
1920 enqueue (struct Plugin *plugin,
1921          struct UDP_MessageWrapper *udpw)
1922 {
1923   struct Session *session = udpw->session;
1924
1925   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
1926   {
1927     GNUNET_break (0);
1928   }
1929   else
1930   {
1931     GNUNET_STATISTICS_update (plugin->env->stats,
1932         "# UDP, total, bytes in buffers", udpw->msg_size, GNUNET_NO);
1933     plugin->bytes_in_buffer += udpw->msg_size;
1934   }
1935   GNUNET_STATISTICS_update (plugin->env->stats,
1936                             "# UDP, total, msgs in buffers",
1937                             1, GNUNET_NO);
1938   if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
1939     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
1940                                 plugin->ipv4_queue_tail,
1941                                 udpw);
1942   else if (udpw->session->address->address_length == sizeof (struct IPv6UdpAddress))
1943     GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
1944                                  plugin->ipv6_queue_tail,
1945                                  udpw);
1946   else
1947   {
1948     GNUNET_break (0);
1949     return;
1950   }
1951   session->msgs_in_queue++;
1952   session->bytes_in_queue += udpw->msg_size;
1953 }
1954
1955
1956 /**
1957  * Fragment message was transmitted via UDP, let fragmentation know
1958  * to send the next fragment now.
1959  *
1960  * @param cls the `struct UDPMessageWrapper *` of the fragment
1961  * @param target destination peer (ignored)
1962  * @param result #GNUNET_OK on success (ignored)
1963  * @param payload bytes payload sent
1964  * @param physical bytes physical sent
1965  */
1966 static void
1967 send_next_fragment (void *cls,
1968                     const struct GNUNET_PeerIdentity *target,
1969                     int result,
1970                     size_t payload,
1971                     size_t physical)
1972 {
1973   struct UDP_MessageWrapper *udpw = cls;
1974
1975   GNUNET_FRAGMENT_context_transmission_done (udpw->frag_ctx->frag);
1976 }
1977
1978
1979 /**
1980  * Function that is called with messages created by the fragmentation
1981  * module.  In the case of the 'proc' callback of the
1982  * #GNUNET_FRAGMENT_context_create() function, this function must
1983  * eventually call #GNUNET_FRAGMENT_context_transmission_done().
1984  *
1985  * @param cls closure, the 'struct FragmentationContext'
1986  * @param msg the message that was created
1987  */
1988 static void
1989 enqueue_fragment (void *cls,
1990                   const struct GNUNET_MessageHeader *msg)
1991 {
1992   struct UDP_FragmentationContext *frag_ctx = cls;
1993   struct Plugin *plugin = frag_ctx->plugin;
1994   struct UDP_MessageWrapper * udpw;
1995   size_t msg_len = ntohs (msg->size);
1996
1997   LOG (GNUNET_ERROR_TYPE_DEBUG,
1998        "Enqueuing fragment with %u bytes\n",
1999        msg_len);
2000   frag_ctx->fragments_used++;
2001   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
2002   udpw->session = frag_ctx->session;
2003   udpw->msg_buf = (char *) &udpw[1];
2004   udpw->msg_size = msg_len;
2005   udpw->payload_size = msg_len; /*FIXME: minus fragment overhead */
2006   udpw->cont = &send_next_fragment;
2007   udpw->cont_cls = udpw;
2008   udpw->timeout = frag_ctx->timeout;
2009   udpw->frag_ctx = frag_ctx;
2010   udpw->msg_type = UMT_MSG_FRAGMENTED;
2011   memcpy (udpw->msg_buf, msg, msg_len);
2012   enqueue (plugin, udpw);
2013   schedule_select (plugin);
2014 }
2015
2016
2017 /**
2018  * Function that can be used by the transport service to transmit
2019  * a message using the plugin.   Note that in the case of a
2020  * peer disconnecting, the continuation MUST be called
2021  * prior to the disconnect notification itself.  This function
2022  * will be called with this peer's HELLO message to initiate
2023  * a fresh connection to another peer.
2024  *
2025  * @param cls closure
2026  * @param s which session must be used
2027  * @param msgbuf the message to transmit
2028  * @param msgbuf_size number of bytes in 'msgbuf'
2029  * @param priority how important is the message (most plugins will
2030  *                 ignore message priority and just FIFO)
2031  * @param to how long to wait at most for the transmission (does not
2032  *                require plugins to discard the message after the timeout,
2033  *                just advisory for the desired delay; most plugins will ignore
2034  *                this as well)
2035  * @param cont continuation to call once the message has
2036  *        been transmitted (or if the transport is ready
2037  *        for the next transmission call; or if the
2038  *        peer disconnected...); can be NULL
2039  * @param cont_cls closure for cont
2040  * @return number of bytes used (on the physical network, with overheads);
2041  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
2042  *         and does NOT mean that the message was not transmitted (DV)
2043  */
2044 static ssize_t
2045 udp_plugin_send (void *cls,
2046                  struct Session *s,
2047                  const char *msgbuf,
2048                  size_t msgbuf_size,
2049                  unsigned int priority,
2050                  struct GNUNET_TIME_Relative to,
2051                  GNUNET_TRANSPORT_TransmitContinuation cont,
2052                  void *cont_cls)
2053 {
2054   struct Plugin *plugin = cls;
2055   size_t udpmlen = msgbuf_size + sizeof(struct UDPMessage);
2056   struct UDP_FragmentationContext * frag_ctx;
2057   struct UDP_MessageWrapper * udpw;
2058   struct UDPMessage *udp;
2059   char mbuf[udpmlen];
2060
2061   if ( (s->address->address_length == sizeof(struct IPv6UdpAddress)) &&
2062        (plugin->sockv6 == NULL) )
2063     return GNUNET_SYSERR;
2064   if ( (s->address->address_length == sizeof(struct IPv4UdpAddress)) &&
2065        (plugin->sockv4 == NULL) )
2066     return GNUNET_SYSERR;
2067   if (udpmlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
2068   {
2069     GNUNET_break(0);
2070     return GNUNET_SYSERR;
2071   }
2072   if (GNUNET_YES !=
2073       GNUNET_CONTAINER_multipeermap_contains_value (plugin->sessions,
2074                                                     &s->target,
2075                                                     s))
2076   {
2077     GNUNET_break(0);
2078     return GNUNET_SYSERR;
2079   }
2080   LOG (GNUNET_ERROR_TYPE_DEBUG,
2081        "UDP transmits %u-byte message to `%s' using address `%s'\n",
2082        udpmlen,
2083        GNUNET_i2s (&s->target),
2084        udp_address_to_string (plugin,
2085                               s->address->address,
2086                               s->address->address_length));
2087
2088   /* Message */
2089   udp = (struct UDPMessage *) mbuf;
2090   udp->header.size = htons (udpmlen);
2091   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
2092   udp->reserved = htonl (0);
2093   udp->sender = *plugin->env->my_identity;
2094
2095   /* We do not update the session time out here!
2096    * Otherwise this session will not timeout since we send keep alive before
2097    * session can timeout
2098    *
2099    * For UDP we update session timeout only on receive, this will cover keep
2100    * alives, since remote peer will reply with keep alive response!
2101    */
2102   if (udpmlen <= UDP_MTU)
2103   {
2104     /* unfragmented message */
2105     udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
2106     udpw->session = s;
2107     udpw->msg_buf = (char *) &udpw[1];
2108     udpw->msg_size = udpmlen; /* message size with UDP overhead */
2109     udpw->payload_size = msgbuf_size; /* message size without UDP overhead */
2110     udpw->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), to);
2111     udpw->cont = cont;
2112     udpw->cont_cls = cont_cls;
2113     udpw->frag_ctx = NULL;
2114     udpw->msg_type = UMT_MSG_UNFRAGMENTED;
2115     memcpy (udpw->msg_buf, udp, sizeof(struct UDPMessage));
2116     memcpy (&udpw->msg_buf[sizeof(struct UDPMessage)], msgbuf, msgbuf_size);
2117     enqueue (plugin, udpw);
2118
2119     GNUNET_STATISTICS_update (plugin->env->stats,
2120         "# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO);
2121     GNUNET_STATISTICS_update (plugin->env->stats,
2122                               "# UDP, unfragmented msgs, bytes payload, attempt",
2123                               udpw->payload_size,
2124                               GNUNET_NO);
2125   }
2126   else
2127   {
2128     /* fragmented message */
2129     if (s->frag_ctx != NULL)
2130       return GNUNET_SYSERR;
2131     memcpy (&udp[1], msgbuf, msgbuf_size);
2132     frag_ctx = GNUNET_new (struct UDP_FragmentationContext);
2133     frag_ctx->plugin = plugin;
2134     frag_ctx->session = s;
2135     frag_ctx->cont = cont;
2136     frag_ctx->cont_cls = cont_cls;
2137     frag_ctx->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
2138         to);
2139     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without UDP overhead */
2140     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
2141     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
2142                                                      UDP_MTU,
2143                                                      &plugin->tracker,
2144                                                      s->last_expected_msg_delay,
2145                                                      s->last_expected_ack_delay,
2146                                                      &udp->header,
2147                                                      &enqueue_fragment,
2148                                                      frag_ctx);
2149     s->frag_ctx = frag_ctx;
2150     GNUNET_STATISTICS_update (plugin->env->stats,
2151                               "# UDP, fragmented msgs, messages, pending",
2152                               1,
2153                               GNUNET_NO);
2154     GNUNET_STATISTICS_update (plugin->env->stats,
2155                               "# UDP, fragmented msgs, messages, attempt",
2156                               1,
2157                               GNUNET_NO);
2158     GNUNET_STATISTICS_update (plugin->env->stats,
2159                               "# UDP, fragmented msgs, bytes payload, attempt",
2160                               frag_ctx->payload_size,
2161                               GNUNET_NO);
2162   }
2163   notify_session_monitor (s->plugin,
2164                           s,
2165                           GNUNET_TRANSPORT_SS_UPDATE);
2166   schedule_select (plugin);
2167   return udpmlen;
2168 }
2169
2170
2171 /**
2172  * Our external IP address/port mapping has changed.
2173  *
2174  * @param cls closure, the `struct LocalAddrList`
2175  * @param add_remove #GNUNET_YES to mean the new public IP address, #GNUNET_NO to mean
2176  *     the previous (now invalid) one
2177  * @param addr either the previous or the new public IP address
2178  * @param addrlen actual lenght of the address
2179  */
2180 static void
2181 udp_nat_port_map_callback (void *cls,
2182                            int add_remove,
2183                            const struct sockaddr *addr,
2184                            socklen_t addrlen)
2185 {
2186   struct Plugin *plugin = cls;
2187   struct GNUNET_HELLO_Address *address;
2188   struct IPv4UdpAddress u4;
2189   struct IPv6UdpAddress u6;
2190   void *arg;
2191   size_t args;
2192
2193   LOG (GNUNET_ERROR_TYPE_INFO,
2194        "NAT notification to %s address `%s'\n",
2195        (GNUNET_YES == add_remove) ? "add" : "remove",
2196        GNUNET_a2s (addr, addrlen));
2197
2198   /* convert 'address' to our internal format */
2199   switch (addr->sa_family)
2200   {
2201   case AF_INET:
2202     GNUNET_assert(addrlen == sizeof(struct sockaddr_in));
2203     memset (&u4, 0, sizeof(u4));
2204     u4.options = htonl (plugin->myoptions);
2205     u4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
2206     u4.u4_port = ((struct sockaddr_in *) addr)->sin_port;
2207     if (0 == ((struct sockaddr_in *) addr)->sin_port)
2208       return;
2209     arg = &u4;
2210     args = sizeof(struct IPv4UdpAddress);
2211     break;
2212   case AF_INET6:
2213     GNUNET_assert(addrlen == sizeof(struct sockaddr_in6));
2214     memset (&u6, 0, sizeof(u6));
2215     u6.options = htonl (plugin->myoptions);
2216     if (0 == ((struct sockaddr_in6 *) addr)->sin6_port)
2217       return;
2218     memcpy (&u6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
2219         sizeof(struct in6_addr));
2220     u6.u6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
2221     arg = &u6;
2222     args = sizeof(struct IPv6UdpAddress);
2223     break;
2224   default:
2225     GNUNET_break(0);
2226     return;
2227   }
2228   /* modify our published address list */
2229   address = GNUNET_HELLO_address_allocate (plugin->env->my_identity,
2230                                            PLUGIN_NAME,
2231                                            arg, args,
2232                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2233   plugin->env->notify_address (plugin->env->cls, add_remove, address);
2234   GNUNET_HELLO_address_free (address);
2235 }
2236
2237
2238 /**
2239  * Message tokenizer has broken up an incomming message. Pass it on
2240  * to the service.
2241  *
2242  * @param cls the `struct Plugin *`
2243  * @param client the `struct SourceInformation *`
2244  * @param hdr the actual message
2245  * @return #GNUNET_OK (always)
2246  */
2247 static int
2248 process_inbound_tokenized_messages (void *cls,
2249                                     void *client,
2250                                     const struct GNUNET_MessageHeader *hdr)
2251 {
2252   struct Plugin *plugin = cls;
2253   struct SourceInformation *si = client;
2254   struct GNUNET_TIME_Relative delay;
2255
2256   GNUNET_assert (NULL != si->session);
2257   if (GNUNET_YES == si->session->in_destroy)
2258     return GNUNET_OK;
2259   /* setup ATS */
2260   reschedule_session_timeout (si->session);
2261   delay = plugin->env->receive (plugin->env->cls,
2262                                 si->session->address,
2263                                 si->session,
2264                                 hdr);
2265   si->session->flow_delay_for_other_peer = delay;
2266   return GNUNET_OK;
2267 }
2268
2269
2270 /**
2271  * We've received a UDP Message.  Process it (pass contents to main service).
2272  *
2273  * @param plugin plugin context
2274  * @param msg the message
2275  * @param udp_addr sender address
2276  * @param udp_addr_len number of bytes in @a udp_addr
2277  * @param network_type network type the address belongs to
2278  */
2279 static void
2280 process_udp_message (struct Plugin *plugin,
2281                      const struct UDPMessage *msg,
2282                      const union UdpAddress *udp_addr,
2283                      size_t udp_addr_len,
2284                      enum GNUNET_ATS_Network_Type network_type)
2285 {
2286   struct SourceInformation si;
2287   struct Session *s;
2288   struct GNUNET_HELLO_Address *address;
2289
2290   GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type);
2291   if (0 != ntohl (msg->reserved))
2292   {
2293     GNUNET_break_op(0);
2294     return;
2295   }
2296   if (ntohs (msg->header.size)
2297       < sizeof(struct GNUNET_MessageHeader) + sizeof(struct UDPMessage))
2298   {
2299     GNUNET_break_op(0);
2300     return;
2301   }
2302
2303   address = GNUNET_HELLO_address_allocate (&msg->sender,
2304                                            PLUGIN_NAME,
2305                                            udp_addr,
2306                                            udp_addr_len,
2307                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2308   if (NULL ==
2309       (s = udp_plugin_lookup_session (plugin, address)))
2310   {
2311     s = udp_plugin_create_session (plugin,
2312                                    address,
2313                                    network_type);
2314     plugin->env->session_start (plugin->env->cls,
2315                                 address,
2316                                 s,
2317                                 s->scope);
2318     notify_session_monitor (s->plugin,
2319                             s,
2320                             GNUNET_TRANSPORT_SS_INIT);
2321     notify_session_monitor (s->plugin,
2322                             s,
2323                             GNUNET_TRANSPORT_SS_UP);
2324   }
2325   GNUNET_free (address);
2326
2327   /* iterate over all embedded messages */
2328   si.session = s;
2329   si.sender = msg->sender;
2330   s->rc++;
2331   GNUNET_SERVER_mst_receive (plugin->mst,
2332                              &si,
2333                              (const char *) &msg[1],
2334                              ntohs (msg->header.size) - sizeof(struct UDPMessage),
2335                              GNUNET_YES,
2336                              GNUNET_NO);
2337   s->rc--;
2338   if ((0 == s->rc) && (GNUNET_YES == s->in_destroy))
2339     free_session (s);
2340 }
2341
2342
2343 /**
2344  * Process a defragmented message.
2345  *
2346  * @param cls the `struct DefragContext *`
2347  * @param msg the message
2348  */
2349 static void
2350 fragment_msg_proc (void *cls,
2351                    const struct GNUNET_MessageHeader *msg)
2352 {
2353   struct DefragContext *rc = cls;
2354   const struct UDPMessage *um;
2355
2356   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
2357   {
2358     GNUNET_break(0);
2359     return;
2360   }
2361   if (ntohs (msg->size) < sizeof(struct UDPMessage))
2362   {
2363     GNUNET_break(0);
2364     return;
2365   }
2366   um = (const struct UDPMessage *) msg;
2367   rc->sender = um->sender;
2368   rc->have_sender = GNUNET_YES;
2369   process_udp_message (rc->plugin,
2370                        um,
2371                        rc->udp_addr,
2372                        rc->udp_addr_len,
2373                        rc->network_type);
2374 }
2375
2376
2377 /**
2378  * Transmit an acknowledgement.
2379  *
2380  * @param cls the `struct DefragContext *`
2381  * @param id message ID (unused)
2382  * @param msg ack to transmit
2383  */
2384 static void
2385 ack_proc (void *cls,
2386           uint32_t id,
2387           const struct GNUNET_MessageHeader *msg)
2388 {
2389   struct DefragContext *rc = cls;
2390   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
2391   struct UDP_ACK_Message *udp_ack;
2392   uint32_t delay = 0;
2393   struct UDP_MessageWrapper *udpw;
2394   struct Session *s;
2395   struct GNUNET_HELLO_Address *address;
2396
2397   if (GNUNET_NO == rc->have_sender)
2398   {
2399     /* tried to defragment but never succeeded, hence will not ACK */
2400     GNUNET_break_op (0);
2401     return;
2402   }
2403   address = GNUNET_HELLO_address_allocate (&rc->sender,
2404                                            PLUGIN_NAME,
2405                                            rc->udp_addr,
2406                                            rc->udp_addr_len,
2407                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2408   s = udp_plugin_lookup_session (rc->plugin,
2409                                  address);
2410   GNUNET_HELLO_address_free (address);
2411   if (NULL == s)
2412   {
2413     LOG (GNUNET_ERROR_TYPE_ERROR,
2414          "Trying to transmit ACK to peer `%s' but no session found!\n",
2415          udp_address_to_string (rc->plugin,
2416                                 rc->udp_addr,
2417                                 rc->udp_addr_len));
2418     GNUNET_CONTAINER_heap_remove_node (rc->hnode);
2419     GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
2420     GNUNET_free (rc);
2421     return;
2422   }
2423   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
2424     delay = s->flow_delay_for_other_peer.rel_value_us;
2425
2426   LOG (GNUNET_ERROR_TYPE_DEBUG,
2427        "Sending ACK to `%s' including delay of %s\n",
2428        udp_address_to_string (rc->plugin,
2429                               rc->udp_addr,
2430                               rc->udp_addr_len),
2431        GNUNET_STRINGS_relative_time_to_string (s->flow_delay_for_other_peer,
2432                                                GNUNET_YES));
2433   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msize);
2434   udpw->msg_size = msize;
2435   udpw->payload_size = 0;
2436   udpw->session = s;
2437   udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
2438   udpw->msg_buf = (char *) &udpw[1];
2439   udpw->msg_type = UMT_MSG_ACK;
2440   udp_ack = (struct UDP_ACK_Message *) udpw->msg_buf;
2441   udp_ack->header.size = htons ((uint16_t) msize);
2442   udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
2443   udp_ack->delay = htonl (delay);
2444   udp_ack->sender = *rc->plugin->env->my_identity;
2445   memcpy (&udp_ack[1], msg, ntohs (msg->size));
2446   enqueue (rc->plugin, udpw);
2447   notify_session_monitor (s->plugin,
2448                           s,
2449                           GNUNET_TRANSPORT_SS_UPDATE);
2450   schedule_select (rc->plugin);
2451 }
2452
2453
2454 /**
2455  * Handle an ACK message.
2456  *
2457  * @param plugin the UDP plugin
2458  * @param msg the (presumed) UDP ACK message
2459  * @param udp_addr sender address
2460  * @param udp_addr_len number of bytes in @a udp_addr
2461  */
2462 static void
2463 read_process_ack (struct Plugin *plugin,
2464                   const struct GNUNET_MessageHeader *msg,
2465                   const union UdpAddress *udp_addr,
2466                   socklen_t udp_addr_len)
2467 {
2468   const struct GNUNET_MessageHeader *ack;
2469   const struct UDP_ACK_Message *udp_ack;
2470   struct GNUNET_HELLO_Address *address;
2471   struct Session *s;
2472   struct GNUNET_TIME_Relative flow_delay;
2473
2474   if (ntohs (msg->size)
2475       < sizeof(struct UDP_ACK_Message) + sizeof(struct GNUNET_MessageHeader))
2476   {
2477     GNUNET_break_op(0);
2478     return;
2479   }
2480   udp_ack = (const struct UDP_ACK_Message *) msg;
2481   address = GNUNET_HELLO_address_allocate (&udp_ack->sender,
2482                                            PLUGIN_NAME,
2483                                            udp_addr,
2484                                            udp_addr_len,
2485                                            GNUNET_HELLO_ADDRESS_INFO_NONE);
2486   s = udp_plugin_lookup_session (plugin,
2487                                  address);
2488   if (NULL == s)
2489   {
2490     LOG (GNUNET_ERROR_TYPE_WARNING,
2491          "UDP session of address %s for ACK not found\n",
2492          udp_address_to_string (plugin,
2493                                 address->address,
2494                                 address->address_length));
2495     GNUNET_HELLO_address_free (address);
2496     return;
2497   }
2498   if (NULL == s->frag_ctx)
2499   {
2500     LOG (GNUNET_ERROR_TYPE_WARNING,
2501          "Fragmentation context of address %s for ACK not found\n",
2502          udp_address_to_string (plugin,
2503                                 address->address,
2504                                 address->address_length));
2505     GNUNET_HELLO_address_free (address);
2506     return;
2507   }
2508   GNUNET_HELLO_address_free (address);
2509
2510   flow_delay.rel_value_us = (uint64_t) ntohl (udp_ack->delay);
2511   LOG (GNUNET_ERROR_TYPE_DEBUG,
2512        "We received a sending delay of %s\n",
2513        GNUNET_STRINGS_relative_time_to_string (flow_delay,
2514                                                GNUNET_YES));
2515   s->flow_delay_from_other_peer = GNUNET_TIME_relative_to_absolute (flow_delay);
2516
2517   ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
2518   if (ntohs (ack->size) != ntohs (msg->size) - sizeof(struct UDP_ACK_Message))
2519   {
2520     GNUNET_break_op(0);
2521     return;
2522   }
2523
2524   if (GNUNET_OK !=
2525       GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag,
2526                                    ack))
2527   {
2528     LOG(GNUNET_ERROR_TYPE_DEBUG,
2529         "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
2530         (unsigned int ) ntohs (msg->size),
2531         GNUNET_i2s (&udp_ack->sender),
2532         udp_address_to_string (plugin,
2533                                udp_addr,
2534                                udp_addr_len));
2535     /* Expect more ACKs to arrive */
2536     return;
2537   }
2538
2539   LOG (GNUNET_ERROR_TYPE_DEBUG,
2540        "Message full ACK'ed\n",
2541        (unsigned int ) ntohs (msg->size),
2542        GNUNET_i2s (&udp_ack->sender),
2543        udp_address_to_string (plugin,
2544                               udp_addr,
2545                               udp_addr_len));
2546
2547   /* Remove fragmented message after successful sending */
2548   fragmented_message_done (s->frag_ctx,
2549                            GNUNET_OK);
2550 }
2551
2552
2553 /**
2554  * We received a fragment, process it.
2555  *
2556  * @param plugin our plugin
2557  * @param msg a message of type #GNUNET_MESSAGE_TYPE_FRAGMENT
2558  * @param udp_addr sender address
2559  * @param udp_addr_len number of bytes in @a udp_addr
2560  * @param network_type network type the address belongs to
2561  */
2562 static void
2563 read_process_fragment (struct Plugin *plugin,
2564                        const struct GNUNET_MessageHeader *msg,
2565                        const union UdpAddress *udp_addr,
2566                        size_t udp_addr_len,
2567                        enum GNUNET_ATS_Network_Type network_type)
2568 {
2569   struct DefragContext *d_ctx;
2570   struct GNUNET_TIME_Absolute now;
2571   struct FindReceiveContext frc;
2572
2573   frc.rc = NULL;
2574   frc.udp_addr = udp_addr;
2575   frc.udp_addr_len = udp_addr_len;
2576
2577   /* Lookup existing receive context for this address */
2578   GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
2579                                  &find_receive_context,
2580                                  &frc);
2581   now = GNUNET_TIME_absolute_get ();
2582   d_ctx = frc.rc;
2583
2584   if (NULL == d_ctx)
2585   {
2586     /* Create a new defragmentation context */
2587     d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + udp_addr_len);
2588     memcpy (&d_ctx[1],
2589             udp_addr,
2590             udp_addr_len);
2591     d_ctx->udp_addr = (const union UdpAddress *) &d_ctx[1];
2592     d_ctx->udp_addr_len = udp_addr_len;
2593     d_ctx->network_type = network_type;
2594     d_ctx->plugin = plugin;
2595     d_ctx->defrag = GNUNET_DEFRAGMENT_context_create (plugin->env->stats,
2596                                                       UDP_MTU,
2597                                                       UDP_MAX_MESSAGES_IN_DEFRAG,
2598                                                       d_ctx,
2599                                                       &fragment_msg_proc,
2600                                                       &ack_proc);
2601     d_ctx->hnode = GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs,
2602                                                  d_ctx,
2603         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2604     LOG (GNUNET_ERROR_TYPE_DEBUG,
2605          "Created new defragmentation context for %u-byte fragment from `%s'\n",
2606          (unsigned int ) ntohs (msg->size),
2607          udp_address_to_string (plugin,
2608                                 udp_addr,
2609                                 udp_addr_len));
2610   }
2611   else
2612   {
2613     LOG (GNUNET_ERROR_TYPE_DEBUG,
2614          "Found existing defragmentation context for %u-byte fragment from `%s'\n",
2615          (unsigned int ) ntohs (msg->size),
2616          udp_address_to_string (plugin,
2617                                 udp_addr,
2618                                 udp_addr_len));
2619   }
2620
2621   if (GNUNET_OK ==
2622       GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
2623   {
2624     /* keep this 'rc' from expiring */
2625     GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs,
2626                                        d_ctx->hnode,
2627         (GNUNET_CONTAINER_HeapCostType) now.abs_value_us);
2628   }
2629   if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
2630       UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
2631   {
2632     /* remove 'rc' that was inactive the longest */
2633     d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
2634     GNUNET_assert (NULL != d_ctx);
2635     GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
2636     GNUNET_free (d_ctx);
2637   }
2638 }
2639
2640
2641 /**
2642  * Read and process a message from the given socket.
2643  *
2644  * @param plugin the overall plugin
2645  * @param rsock socket to read from
2646  */
2647 static void
2648 udp_select_read (struct Plugin *plugin,
2649                  struct GNUNET_NETWORK_Handle *rsock)
2650 {
2651   socklen_t fromlen;
2652   struct sockaddr_storage addr;
2653   char buf[65536] GNUNET_ALIGN;
2654   ssize_t size;
2655   const struct GNUNET_MessageHeader *msg;
2656   struct IPv4UdpAddress v4;
2657   struct IPv6UdpAddress v6;
2658   const struct sockaddr *sa;
2659   const struct sockaddr_in *sa4;
2660   const struct sockaddr_in6 *sa6;
2661   const union UdpAddress *int_addr;
2662   size_t int_addr_len;
2663   enum GNUNET_ATS_Network_Type network_type;
2664
2665   fromlen = sizeof(addr);
2666   memset (&addr, 0, sizeof(addr));
2667   size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof(buf),
2668                                          (struct sockaddr *) &addr, &fromlen);
2669   sa = (const struct sockaddr *) &addr;
2670 #if MINGW
2671   /* On SOCK_DGRAM UDP sockets recvfrom might fail with a
2672    * WSAECONNRESET error to indicate that previous sendto() (yes, sendto!)
2673    * on this socket has failed.
2674    * Quote from MSDN:
2675    *   WSAECONNRESET - The virtual circuit was reset by the remote side
2676    *   executing a hard or abortive close. The application should close
2677    *   the socket; it is no longer usable. On a UDP-datagram socket this
2678    *   error indicates a previous send operation resulted in an ICMP Port
2679    *   Unreachable message.
2680    */
2681   if ( (-1 == size) && (ECONNRESET == errno) )
2682   return;
2683 #endif
2684   if (-1 == size)
2685   {
2686     LOG (GNUNET_ERROR_TYPE_DEBUG,
2687          "UDP failed to receive data: %s\n",
2688          STRERROR (errno));
2689     /* Connection failure or something. Not a protocol violation. */
2690     return;
2691   }
2692   if (size < sizeof(struct GNUNET_MessageHeader))
2693   {
2694     LOG (GNUNET_ERROR_TYPE_WARNING,
2695          "UDP got %u bytes from %s, which is not enough for a GNUnet message header\n",
2696          (unsigned int ) size,
2697          GNUNET_a2s (sa, fromlen));
2698     /* _MAY_ be a connection failure (got partial message) */
2699     /* But it _MAY_ also be that the other side uses non-GNUnet protocol. */
2700     GNUNET_break_op(0);
2701     return;
2702   }
2703   msg = (const struct GNUNET_MessageHeader *) buf;
2704   LOG (GNUNET_ERROR_TYPE_DEBUG,
2705        "UDP received %u-byte message from `%s' type %u\n",
2706        (unsigned int) size,
2707        GNUNET_a2s (sa,
2708                    fromlen),
2709        ntohs (msg->type));
2710   if (size != ntohs (msg->size))
2711   {
2712     LOG (GNUNET_ERROR_TYPE_WARNING,
2713          "UDP malformed message header from %s\n",
2714          (unsigned int) size,
2715          GNUNET_a2s (sa,
2716                      fromlen));
2717     GNUNET_break_op (0);
2718     return;
2719   }
2720   GNUNET_STATISTICS_update (plugin->env->stats,
2721                             "# UDP, total, bytes, received",
2722                             size,
2723                             GNUNET_NO);
2724   network_type = plugin->env->get_address_type (plugin->env->cls,
2725                                                 sa,
2726                                                 fromlen);
2727   switch (sa->sa_family)
2728   {
2729   case AF_INET:
2730     sa4 = (const struct sockaddr_in *) &addr;
2731     v4.options = 0;
2732     v4.ipv4_addr = sa4->sin_addr.s_addr;
2733     v4.u4_port = sa4->sin_port;
2734     int_addr = (union UdpAddress *) &v4;
2735     int_addr_len = sizeof (v4);
2736     break;
2737   case AF_INET6:
2738     sa6 = (const struct sockaddr_in6 *) &addr;
2739     v6.options = 0;
2740     v6.ipv6_addr = sa6->sin6_addr;
2741     v6.u6_port = sa6->sin6_port;
2742     int_addr = (union UdpAddress *) &v6;
2743     int_addr_len = sizeof (v6);
2744     break;
2745   default:
2746     GNUNET_break (0);
2747     return;
2748   }
2749
2750   switch (ntohs (msg->type))
2751   {
2752   case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON:
2753     if (GNUNET_YES == plugin->enable_broadcasting_receiving)
2754       udp_broadcast_receive (plugin,
2755                              buf,
2756                              size,
2757                              int_addr,
2758                              int_addr_len,
2759                              network_type);
2760     return;
2761   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE:
2762     if (ntohs (msg->size) < sizeof(struct UDPMessage))
2763     {
2764       GNUNET_break_op(0);
2765       return;
2766     }
2767     process_udp_message (plugin,
2768                          (const struct UDPMessage *) msg,
2769                          int_addr,
2770                          int_addr_len,
2771                          network_type);
2772     return;
2773   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
2774     read_process_ack (plugin,
2775                       msg,
2776                       int_addr,
2777                       int_addr_len);
2778     return;
2779   case GNUNET_MESSAGE_TYPE_FRAGMENT:
2780     read_process_fragment (plugin,
2781                            msg,
2782                            int_addr,
2783                            int_addr_len,
2784                            network_type);
2785     return;
2786   default:
2787     GNUNET_break_op(0);
2788     return;
2789   }
2790 }
2791
2792
2793 /**
2794  * Removes messages from the transmission queue that have
2795  * timed out, and then selects a message that should be
2796  * transmitted next.
2797  *
2798  * @param plugin the UDP plugin
2799  * @param sock which socket should we process the queue for (v4 or v6)
2800  * @return message selected for transmission, or NULL for none
2801  */
2802 static struct UDP_MessageWrapper *
2803 remove_timeout_messages_and_select (struct Plugin *plugin,
2804                                     struct GNUNET_NETWORK_Handle *sock)
2805 {
2806   struct UDP_MessageWrapper *udpw = NULL;
2807   struct GNUNET_TIME_Relative remaining;
2808   struct Session *session;
2809   int removed;
2810
2811   removed = GNUNET_NO;
2812   udpw = (sock == plugin->sockv4)
2813     ? plugin->ipv4_queue_head
2814     : plugin->ipv6_queue_head;
2815   while (NULL != udpw)
2816   {
2817     session = udpw->session;
2818     /* Find messages with timeout */
2819     remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout);
2820     if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2821     {
2822       /* Message timed out */
2823       switch (udpw->msg_type)
2824       {
2825       case UMT_MSG_UNFRAGMENTED:
2826         GNUNET_STATISTICS_update (plugin->env->stats,
2827                                   "# UDP, total, bytes, sent, timeout",
2828                                   udpw->msg_size,
2829                                   GNUNET_NO);
2830         GNUNET_STATISTICS_update (plugin->env->stats,
2831                                   "# UDP, total, messages, sent, timeout",
2832                                   1,
2833                                   GNUNET_NO);
2834         GNUNET_STATISTICS_update (plugin->env->stats,
2835                                   "# UDP, unfragmented msgs, messages, sent, timeout",
2836                                   1,
2837                                   GNUNET_NO);
2838         GNUNET_STATISTICS_update (plugin->env->stats,
2839                                   "# UDP, unfragmented msgs, bytes, sent, timeout",
2840                                   udpw->payload_size,
2841                                   GNUNET_NO);
2842         /* Not fragmented message */
2843         LOG (GNUNET_ERROR_TYPE_DEBUG,
2844              "Message for peer `%s' with size %u timed out\n",
2845              GNUNET_i2s (&udpw->session->target),
2846              udpw->payload_size);
2847         call_continuation (udpw, GNUNET_SYSERR);
2848         /* Remove message */
2849         removed = GNUNET_YES;
2850         dequeue (plugin, udpw);
2851         GNUNET_free(udpw);
2852         break;
2853       case UMT_MSG_FRAGMENTED:
2854         /* Fragmented message */
2855         GNUNET_STATISTICS_update (plugin->env->stats,
2856                                   "# UDP, total, bytes, sent, timeout",
2857                                   udpw->frag_ctx->on_wire_size,
2858                                   GNUNET_NO);
2859         GNUNET_STATISTICS_update (plugin->env->stats,
2860                                   "# UDP, total, messages, sent, timeout",
2861                                   1,
2862                                   GNUNET_NO);
2863         call_continuation (udpw,
2864                            GNUNET_SYSERR);
2865         LOG (GNUNET_ERROR_TYPE_DEBUG,
2866              "Fragment for message for peer `%s' with size %u timed out\n",
2867              GNUNET_i2s (&udpw->session->target),
2868             udpw->frag_ctx->payload_size);
2869
2870         GNUNET_STATISTICS_update (plugin->env->stats,
2871                                   "# UDP, fragmented msgs, messages, sent, timeout",
2872                                   1,
2873                                   GNUNET_NO);
2874         GNUNET_STATISTICS_update (plugin->env->stats,
2875                                   "# UDP, fragmented msgs, bytes, sent, timeout",
2876                                   udpw->frag_ctx->payload_size,
2877                                   GNUNET_NO);
2878         /* Remove fragmented message due to timeout */
2879         fragmented_message_done (udpw->frag_ctx,
2880                                  GNUNET_SYSERR);
2881         break;
2882       case UMT_MSG_ACK:
2883         GNUNET_STATISTICS_update (plugin->env->stats,
2884                                   "# UDP, total, bytes, sent, timeout",
2885                                   udpw->msg_size,
2886                                   GNUNET_NO);
2887         GNUNET_STATISTICS_update (plugin->env->stats,
2888                                   "# UDP, total, messages, sent, timeout",
2889                                   1,
2890                                   GNUNET_NO);
2891         LOG (GNUNET_ERROR_TYPE_DEBUG,
2892              "ACK Message for peer `%s' with size %u timed out\n",
2893              GNUNET_i2s (&udpw->session->target),
2894              udpw->payload_size);
2895         call_continuation (udpw,
2896                            GNUNET_SYSERR);
2897         removed = GNUNET_YES;
2898         dequeue (plugin,
2899                  udpw);
2900         GNUNET_free (udpw);
2901         break;
2902       default:
2903         break;
2904       }
2905       if (sock == plugin->sockv4)
2906         udpw = plugin->ipv4_queue_head;
2907       else if (sock == plugin->sockv6)
2908         udpw = plugin->ipv6_queue_head;
2909       else
2910       {
2911         GNUNET_break(0); /* should never happen */
2912         udpw = NULL;
2913       }
2914       GNUNET_STATISTICS_update (plugin->env->stats,
2915                                 "# messages discarded due to timeout",
2916                                 1,
2917                                 GNUNET_NO);
2918     }
2919     else
2920     {
2921       /* Message did not time out, check flow delay */
2922       remaining = GNUNET_TIME_absolute_get_remaining (udpw->session->flow_delay_from_other_peer);
2923       if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
2924       {
2925         /* this message is not delayed */
2926         LOG (GNUNET_ERROR_TYPE_DEBUG,
2927              "Message for peer `%s' (%u bytes) is not delayed \n",
2928              GNUNET_i2s (&udpw->session->target),
2929              udpw->payload_size);
2930         break; /* Found message to send, break */
2931       }
2932       else
2933       {
2934         /* Message is delayed, try next */
2935         LOG (GNUNET_ERROR_TYPE_DEBUG,
2936              "Message for peer `%s' (%u bytes) is delayed for %s\n",
2937              GNUNET_i2s (&udpw->session->target),
2938              udpw->payload_size,
2939              GNUNET_STRINGS_relative_time_to_string (remaining,
2940                                                      GNUNET_YES));
2941         udpw = udpw->next;
2942       }
2943     }
2944   }
2945   if (GNUNET_YES == removed)
2946     notify_session_monitor (session->plugin,
2947                             session,
2948                             GNUNET_TRANSPORT_SS_UPDATE);
2949   return udpw;
2950 }
2951
2952
2953 /**
2954  * FIXME.
2955  */
2956 static void
2957 analyze_send_error (struct Plugin *plugin,
2958                     const struct sockaddr *sa,
2959                     socklen_t slen, int error)
2960 {
2961   enum GNUNET_ATS_Network_Type type;
2962
2963   type = plugin->env->get_address_type (plugin->env->cls, sa, slen);
2964   if (((GNUNET_ATS_NET_LAN == type)
2965        || (GNUNET_ATS_NET_WAN == type))
2966       && ((ENETUNREACH == errno)|| (ENETDOWN == errno)))
2967   {
2968     if (slen == sizeof (struct sockaddr_in))
2969     {
2970       /* IPv4: "Network unreachable" or "Network down"
2971        *
2972        * This indicates we do not have connectivity
2973        */
2974       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2975            _("UDP could not transmit message to `%s': "
2976              "Network seems down, please check your network configuration\n"),
2977            GNUNET_a2s (sa, slen));
2978     }
2979     if (slen == sizeof (struct sockaddr_in6))
2980     {
2981       /* IPv6: "Network unreachable" or "Network down"
2982        *
2983        * This indicates that this system is IPv6 enabled, but does not
2984        * have a valid global IPv6 address assigned or we do not have
2985        * connectivity
2986        */
2987       LOG (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
2988            _("UDP could not transmit IPv6 message! "
2989              "Please check your network configuration and disable IPv6 if your "
2990              "connection does not have a global IPv6 address\n"));
2991     }
2992   }
2993   else
2994   {
2995     LOG (GNUNET_ERROR_TYPE_WARNING,
2996          "UDP could not transmit message to `%s': `%s'\n",
2997          GNUNET_a2s (sa, slen), STRERROR (error));
2998   }
2999 }
3000
3001
3002 /**
3003  * It is time to try to transmit a UDP message.  Select one
3004  * and send.
3005  *
3006  * @param plugin the plugin
3007  * @param sock which socket (v4/v6) to send on
3008  * @return number of bytes transmitted, #GNUNET_SYSERR on failure
3009  */
3010 static size_t
3011 udp_select_send (struct Plugin *plugin,
3012                  struct GNUNET_NETWORK_Handle *sock)
3013 {
3014   ssize_t sent;
3015   socklen_t slen;
3016   struct sockaddr *a;
3017   const struct IPv4UdpAddress *u4;
3018   struct sockaddr_in a4;
3019   const struct IPv6UdpAddress *u6;
3020   struct sockaddr_in6 a6;
3021   struct UDP_MessageWrapper *udpw;
3022
3023   /* Find message to send */
3024   udpw = remove_timeout_messages_and_select (plugin,
3025                                              sock);
3026   if (NULL == udpw)
3027     return 0; /* No message to send */
3028
3029   if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
3030   {
3031     u4 = udpw->session->address->address;
3032     memset (&a4, 0, sizeof(a4));
3033     a4.sin_family = AF_INET;
3034 #if HAVE_SOCKADDR_IN_SIN_LEN
3035     a4.sin_len = sizeof (a4);
3036 #endif
3037     a4.sin_port = u4->u4_port;
3038     memcpy (&a4.sin_addr,
3039             &u4->ipv4_addr,
3040             sizeof(struct in_addr));
3041     a = (struct sockaddr *) &a4;
3042     slen = sizeof (a4);
3043   }
3044   else if (sizeof (struct IPv6UdpAddress) == udpw->session->address->address_length)
3045   {
3046     u6 = udpw->session->address->address;
3047     memset (&a6, 0, sizeof(a6));
3048     a6.sin6_family = AF_INET6;
3049 #if HAVE_SOCKADDR_IN_SIN_LEN
3050     a6.sin6_len = sizeof (a6);
3051 #endif
3052     a6.sin6_port = u6->u6_port;
3053     memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
3054     a = (struct sockaddr *) &a6;
3055     slen = sizeof (a6);
3056   }
3057   else
3058   {
3059     call_continuation (udpw,
3060                        GNUNET_OK);
3061     dequeue (plugin,
3062              udpw);
3063     notify_session_monitor (plugin,
3064                             udpw->session,
3065                             GNUNET_TRANSPORT_SS_UPDATE);
3066     GNUNET_free (udpw);
3067     return GNUNET_SYSERR;
3068   }
3069   sent = GNUNET_NETWORK_socket_sendto (sock,
3070                                        udpw->msg_buf,
3071                                        udpw->msg_size,
3072                                        a,
3073                                        slen);
3074   if (GNUNET_SYSERR == sent)
3075   {
3076     /* Failure */
3077     analyze_send_error (plugin,
3078                         a,
3079                         slen,
3080                         errno);
3081     call_continuation (udpw,
3082                        GNUNET_SYSERR);
3083     GNUNET_STATISTICS_update (plugin->env->stats,
3084                               "# UDP, total, bytes, sent, failure",
3085                               sent,
3086                               GNUNET_NO);
3087     GNUNET_STATISTICS_update (plugin->env->stats,
3088                               "# UDP, total, messages, sent, failure",
3089                               1,
3090                               GNUNET_NO);
3091   }
3092   else
3093   {
3094     /* Success */
3095     LOG (GNUNET_ERROR_TYPE_DEBUG,
3096          "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
3097          (unsigned int) (udpw->msg_size),
3098          GNUNET_i2s (&udpw->session->target),
3099          GNUNET_a2s (a, slen),
3100          (int ) sent,
3101          (sent < 0) ? STRERROR (errno) : "ok");
3102     GNUNET_STATISTICS_update (plugin->env->stats,
3103                               "# UDP, total, bytes, sent, success",
3104                               sent,
3105                               GNUNET_NO);
3106     GNUNET_STATISTICS_update (plugin->env->stats,
3107                               "# UDP, total, messages, sent, success",
3108                               1,
3109                               GNUNET_NO);
3110     if (NULL != udpw->frag_ctx)
3111       udpw->frag_ctx->on_wire_size += udpw->msg_size;
3112     call_continuation (udpw, GNUNET_OK);
3113   }
3114   dequeue (plugin, udpw);
3115   notify_session_monitor (plugin,
3116                           udpw->session,
3117                           GNUNET_TRANSPORT_SS_UPDATE);
3118   GNUNET_free (udpw);
3119   return sent;
3120 }
3121
3122
3123 /**
3124  * We have been notified that our readset has something to read.  We don't
3125  * know which socket needs to be read, so we have to check each one
3126  * Then reschedule this function to be called again once more is available.
3127  *
3128  * @param cls the plugin handle
3129  * @param tc the scheduling context (for rescheduling this function again)
3130  */
3131 static void
3132 udp_plugin_select (void *cls,
3133                    const struct GNUNET_SCHEDULER_TaskContext *tc)
3134 {
3135   struct Plugin *plugin = cls;
3136
3137   plugin->select_task = NULL;
3138   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3139     return;
3140   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
3141       && (NULL != plugin->sockv4)
3142       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
3143     udp_select_read (plugin, plugin->sockv4);
3144   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
3145       && (NULL != plugin->sockv4) && (NULL != plugin->ipv4_queue_head)
3146       && (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
3147     udp_select_send (plugin, plugin->sockv4);
3148   schedule_select (plugin);
3149 }
3150
3151
3152 /**
3153  * We have been notified that our readset has something to read.  We don't
3154  * know which socket needs to be read, so we have to check each one
3155  * Then reschedule this function to be called again once more is available.
3156  *
3157  * @param cls the plugin handle
3158  * @param tc the scheduling context (for rescheduling this function again)
3159  */
3160 static void
3161 udp_plugin_select_v6 (void *cls,
3162                       const struct GNUNET_SCHEDULER_TaskContext *tc)
3163 {
3164   struct Plugin *plugin = cls;
3165
3166   plugin->select_task_v6 = NULL;
3167   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
3168     return;
3169   if (((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
3170       && (NULL != plugin->sockv6)
3171       && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
3172     udp_select_read (plugin, plugin->sockv6);
3173   if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
3174       && (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL )&&
3175       (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) )udp_select_send (plugin, plugin->sockv6);
3176   schedule_select (plugin);
3177 }
3178
3179
3180 /**
3181  * Setup the UDP sockets (for IPv4 and IPv6) for the plugin.
3182  *
3183  * @param plugin the plugin to initialize
3184  * @param bind_v6 IPv6 address to bind to (can be NULL, for 'any')
3185  * @param bind_v4 IPv4 address to bind to (can be NULL, for 'any')
3186  * @return number of sockets that were successfully bound
3187  */
3188 static int
3189 setup_sockets (struct Plugin *plugin,
3190                const struct sockaddr_in6 *bind_v6,
3191                const struct sockaddr_in *bind_v4)
3192 {
3193   int tries;
3194   int sockets_created = 0;
3195   struct sockaddr_in6 server_addrv6;
3196   struct sockaddr_in server_addrv4;
3197   struct sockaddr *server_addr;
3198   struct sockaddr *addrs[2];
3199   socklen_t addrlens[2];
3200   socklen_t addrlen;
3201   int eno;
3202
3203   /* Create IPv6 socket */
3204   eno = EINVAL;
3205   if (GNUNET_YES == plugin->enable_ipv6)
3206   {
3207     plugin->sockv6 = GNUNET_NETWORK_socket_create (PF_INET6, SOCK_DGRAM, 0);
3208     if (NULL == plugin->sockv6)
3209     {
3210       LOG(GNUNET_ERROR_TYPE_WARNING,
3211           "Disabling IPv6 since it is not supported on this system!\n");
3212       plugin->enable_ipv6 = GNUNET_NO;
3213     }
3214     else
3215     {
3216       memset (&server_addrv6, '\0', sizeof(struct sockaddr_in6));
3217 #if HAVE_SOCKADDR_IN_SIN_LEN
3218       server_addrv6.sin6_len = sizeof (struct sockaddr_in6);
3219 #endif
3220       server_addrv6.sin6_family = AF_INET6;
3221       if (NULL != bind_v6)
3222         server_addrv6.sin6_addr = bind_v6->sin6_addr;
3223       else
3224         server_addrv6.sin6_addr = in6addr_any;
3225
3226       if (0 == plugin->port) /* autodetect */
3227         server_addrv6.sin6_port
3228           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3229                    + 32000);
3230       else
3231         server_addrv6.sin6_port = htons (plugin->port);
3232       addrlen = sizeof(struct sockaddr_in6);
3233       server_addr = (struct sockaddr *) &server_addrv6;
3234
3235       tries = 0;
3236       while (tries < 10)
3237       {
3238         LOG(GNUNET_ERROR_TYPE_DEBUG,
3239             "Binding to IPv6 `%s'\n",
3240             GNUNET_a2s (server_addr, addrlen));
3241         /* binding */
3242         if (GNUNET_OK ==
3243             GNUNET_NETWORK_socket_bind (plugin->sockv6,
3244                                         server_addr,
3245                                         addrlen))
3246           break;
3247         eno = errno;
3248         if (0 != plugin->port)
3249         {
3250           tries = 10; /* fail immediately */
3251           break; /* bind failed on specific port */
3252         }
3253         /* autodetect */
3254         server_addrv6.sin6_port
3255           = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3256                    + 32000);
3257         tries++;
3258       }
3259       if (tries >= 10)
3260       {
3261         GNUNET_NETWORK_socket_close (plugin->sockv6);
3262         plugin->enable_ipv6 = GNUNET_NO;
3263         plugin->sockv6 = NULL;
3264       }
3265       else
3266       {
3267         plugin->port = ntohs (server_addrv6.sin6_port);
3268       }
3269       if (NULL != plugin->sockv6)
3270       {
3271         LOG (GNUNET_ERROR_TYPE_DEBUG,
3272              "IPv6 socket created on port %s\n",
3273              GNUNET_a2s (server_addr, addrlen));
3274         addrs[sockets_created] = (struct sockaddr *) &server_addrv6;
3275         addrlens[sockets_created] = sizeof(struct sockaddr_in6);
3276         sockets_created++;
3277       }
3278       else
3279       {
3280         LOG (GNUNET_ERROR_TYPE_ERROR,
3281              "Failed to bind UDP socket to %s: %s\n",
3282              GNUNET_a2s (server_addr, addrlen),
3283              STRERROR (eno));
3284       }
3285     }
3286   }
3287
3288   /* Create IPv4 socket */
3289   eno = EINVAL;
3290   plugin->sockv4 = GNUNET_NETWORK_socket_create (PF_INET, SOCK_DGRAM, 0);
3291   if (NULL == plugin->sockv4)
3292   {
3293     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
3294                          "socket");
3295     LOG (GNUNET_ERROR_TYPE_WARNING,
3296          "Disabling IPv4 since it is not supported on this system!\n");
3297     plugin->enable_ipv4 = GNUNET_NO;
3298   }
3299   else
3300   {
3301     memset (&server_addrv4, '\0', sizeof(struct sockaddr_in));
3302 #if HAVE_SOCKADDR_IN_SIN_LEN
3303     server_addrv4.sin_len = sizeof (struct sockaddr_in);
3304 #endif
3305     server_addrv4.sin_family = AF_INET;
3306     if (NULL != bind_v4)
3307       server_addrv4.sin_addr = bind_v4->sin_addr;
3308     else
3309       server_addrv4.sin_addr.s_addr = INADDR_ANY;
3310
3311     if (0 == plugin->port)
3312       /* autodetect */
3313       server_addrv4.sin_port
3314         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG,
3315                                            33537)
3316                  + 32000);
3317     else
3318       server_addrv4.sin_port = htons (plugin->port);
3319
3320     addrlen = sizeof(struct sockaddr_in);
3321     server_addr = (struct sockaddr *) &server_addrv4;
3322
3323     tries = 0;
3324     while (tries < 10)
3325     {
3326       LOG (GNUNET_ERROR_TYPE_DEBUG,
3327            "Binding to IPv4 `%s'\n",
3328            GNUNET_a2s (server_addr, addrlen));
3329
3330       /* binding */
3331       if (GNUNET_OK ==
3332           GNUNET_NETWORK_socket_bind (plugin->sockv4,
3333                                       server_addr,
3334                                       addrlen))
3335         break;
3336       eno = errno;
3337       if (0 != plugin->port)
3338       {
3339         tries = 10; /* fail */
3340         break; /* bind failed on specific port */
3341       }
3342
3343       /* autodetect */
3344       server_addrv4.sin_port
3345         = htons (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, 33537)
3346                  + 32000);
3347       tries++;
3348     }
3349     if (tries >= 10)
3350     {
3351       GNUNET_NETWORK_socket_close (plugin->sockv4);
3352       plugin->enable_ipv4 = GNUNET_NO;
3353       plugin->sockv4 = NULL;
3354     }
3355     else
3356     {
3357       plugin->port = ntohs (server_addrv4.sin_port);
3358     }
3359
3360     if (NULL != plugin->sockv4)
3361     {
3362       LOG (GNUNET_ERROR_TYPE_DEBUG,
3363            "IPv4 socket created on port %s\n",
3364            GNUNET_a2s (server_addr, addrlen));
3365       addrs[sockets_created] = (struct sockaddr *) &server_addrv4;
3366       addrlens[sockets_created] = sizeof(struct sockaddr_in);
3367       sockets_created++;
3368     }
3369     else
3370     {
3371       LOG (GNUNET_ERROR_TYPE_ERROR,
3372            _("Failed to bind UDP socket to %s: %s\n"),
3373            GNUNET_a2s (server_addr, addrlen),
3374            STRERROR (eno));
3375     }
3376   }
3377
3378   if (0 == sockets_created)
3379   {
3380     LOG (GNUNET_ERROR_TYPE_WARNING,
3381          _("Failed to open UDP sockets\n"));
3382     return 0; /* No sockets created, return */
3383   }
3384
3385   /* Create file descriptors */
3386   if (plugin->enable_ipv4 == GNUNET_YES)
3387   {
3388     plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
3389     plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
3390     GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
3391     GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
3392     if (NULL != plugin->sockv4)
3393     {
3394       GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
3395       GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
3396     }
3397   }
3398
3399   if (plugin->enable_ipv6 == GNUNET_YES)
3400   {
3401     plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
3402     plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
3403     GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
3404     GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
3405     if (NULL != plugin->sockv6)
3406     {
3407       GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
3408       GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
3409     }
3410   }
3411
3412   schedule_select (plugin);
3413   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
3414                                      GNUNET_NO,
3415                                      plugin->port,
3416                                      sockets_created,
3417                                      (const struct sockaddr **) addrs,
3418                                      addrlens,
3419                                      &udp_nat_port_map_callback,
3420                                      NULL,
3421                                      plugin);
3422
3423   return sockets_created;
3424 }
3425
3426
3427 /**
3428  * Return information about the given session to the
3429  * monitor callback.
3430  *
3431  * @param cls the `struct Plugin` with the monitor callback (`sic`)
3432  * @param peer peer we send information about
3433  * @param value our `struct Session` to send information about
3434  * @return #GNUNET_OK (continue to iterate)
3435  */
3436 static int
3437 send_session_info_iter (void *cls,
3438                         const struct GNUNET_PeerIdentity *peer,
3439                         void *value)
3440 {
3441   struct Plugin *plugin = cls;
3442   struct Session *session = value;
3443
3444   notify_session_monitor (plugin,
3445                           session,
3446                           GNUNET_TRANSPORT_SS_INIT);
3447   notify_session_monitor (plugin,
3448                           session,
3449                           GNUNET_TRANSPORT_SS_UP);
3450   return GNUNET_OK;
3451 }
3452
3453
3454 /**
3455  * Begin monitoring sessions of a plugin.  There can only
3456  * be one active monitor per plugin (i.e. if there are
3457  * multiple monitors, the transport service needs to
3458  * multiplex the generated events over all of them).
3459  *
3460  * @param cls closure of the plugin
3461  * @param sic callback to invoke, NULL to disable monitor;
3462  *            plugin will being by iterating over all active
3463  *            sessions immediately and then enter monitor mode
3464  * @param sic_cls closure for @a sic
3465  */
3466 static void
3467 udp_plugin_setup_monitor (void *cls,
3468                           GNUNET_TRANSPORT_SessionInfoCallback sic,
3469                           void *sic_cls)
3470 {
3471   struct Plugin *plugin = cls;
3472
3473   plugin->sic = sic;
3474   plugin->sic_cls = sic_cls;
3475   if (NULL != sic)
3476   {
3477     GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3478                                            &send_session_info_iter,
3479                                            plugin);
3480     /* signal end of first iteration */
3481     sic (sic_cls, NULL, NULL);
3482   }
3483 }
3484
3485
3486 /**
3487  * The exported method. Makes the core api available via a global and
3488  * returns the udp transport API.
3489  *
3490  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3491  * @return our `struct GNUNET_TRANSPORT_PluginFunctions`
3492  */
3493 void *
3494 libgnunet_plugin_transport_udp_init (void *cls)
3495 {
3496   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
3497   struct GNUNET_TRANSPORT_PluginFunctions *api;
3498   struct Plugin *p;
3499   unsigned long long port;
3500   unsigned long long aport;
3501   unsigned long long udp_max_bps;
3502   unsigned long long enable_v6;
3503   unsigned long long enable_broadcasting;
3504   unsigned long long enable_broadcasting_recv;
3505   char *bind4_address;
3506   char *bind6_address;
3507   char *fancy_interval;
3508   struct GNUNET_TIME_Relative interval;
3509   struct sockaddr_in server_addrv4;
3510   struct sockaddr_in6 server_addrv6;
3511   int res;
3512   int have_bind4;
3513   int have_bind6;
3514
3515   if (NULL == env->receive)
3516   {
3517     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
3518      initialze the plugin or the API */
3519     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3520     api->cls = NULL;
3521     api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3522     api->address_to_string = &udp_address_to_string;
3523     api->string_to_address = &udp_string_to_address;
3524     return api;
3525   }
3526
3527   /* Get port number: port == 0 : autodetect a port,
3528    * > 0 : use this port, not given : 2086 default */
3529   if (GNUNET_OK !=
3530       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3531                                              "transport-udp",
3532                                              "PORT", &port))
3533     port = 2086;
3534   if (GNUNET_OK !=
3535       GNUNET_CONFIGURATION_get_value_number (env->cfg,
3536                                              "transport-udp",
3537                                              "ADVERTISED_PORT", &aport))
3538     aport = port;
3539   if (port > 65535)
3540   {
3541     LOG (GNUNET_ERROR_TYPE_WARNING,
3542          _("Given `%s' option is out of range: %llu > %u\n"),
3543          "PORT", port,
3544          65535);
3545     return NULL;
3546   }
3547
3548   /* Protocols */
3549   if (GNUNET_YES ==
3550       GNUNET_CONFIGURATION_get_value_yesno (env->cfg, "nat", "DISABLEV6"))
3551     enable_v6 = GNUNET_NO;
3552   else
3553     enable_v6 = GNUNET_YES;
3554
3555   /* Addresses */
3556   have_bind4 = GNUNET_NO;
3557   memset (&server_addrv4, 0, sizeof(server_addrv4));
3558   if (GNUNET_YES ==
3559       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3560                                              "BINDTO", &bind4_address))
3561   {
3562     LOG (GNUNET_ERROR_TYPE_DEBUG,
3563          "Binding udp plugin to specific address: `%s'\n",
3564          bind4_address);
3565     if (1 != inet_pton (AF_INET,
3566                         bind4_address,
3567                         &server_addrv4.sin_addr))
3568     {
3569       GNUNET_free (bind4_address);
3570       return NULL;
3571     }
3572     have_bind4 = GNUNET_YES;
3573   }
3574   GNUNET_free_non_null(bind4_address);
3575   have_bind6 = GNUNET_NO;
3576   memset (&server_addrv6, 0, sizeof(server_addrv6));
3577   if (GNUNET_YES ==
3578       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3579                                              "BINDTO6", &bind6_address))
3580   {
3581     LOG (GNUNET_ERROR_TYPE_DEBUG,
3582          "Binding udp plugin to specific address: `%s'\n",
3583          bind6_address);
3584     if (1 != inet_pton (AF_INET6,
3585                         bind6_address,
3586                         &server_addrv6.sin6_addr))
3587     {
3588       LOG (GNUNET_ERROR_TYPE_ERROR,
3589            _("Invalid IPv6 address: `%s'\n"),
3590            bind6_address);
3591       GNUNET_free (bind6_address);
3592       return NULL;
3593     }
3594     have_bind6 = GNUNET_YES;
3595   }
3596   GNUNET_free_non_null (bind6_address);
3597
3598   /* Enable neighbour discovery */
3599   enable_broadcasting = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3600       "transport-udp", "BROADCAST");
3601   if (enable_broadcasting == GNUNET_SYSERR)
3602     enable_broadcasting = GNUNET_NO;
3603
3604   enable_broadcasting_recv = GNUNET_CONFIGURATION_get_value_yesno (env->cfg,
3605       "transport-udp", "BROADCAST_RECEIVE");
3606   if (enable_broadcasting_recv == GNUNET_SYSERR)
3607     enable_broadcasting_recv = GNUNET_YES;
3608
3609   if (GNUNET_SYSERR ==
3610       GNUNET_CONFIGURATION_get_value_string (env->cfg, "transport-udp",
3611                                              "BROADCAST_INTERVAL",
3612                                              &fancy_interval))
3613   {
3614     interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
3615   }
3616   else
3617   {
3618     if (GNUNET_SYSERR ==
3619         GNUNET_STRINGS_fancy_time_to_relative (fancy_interval, &interval))
3620     {
3621       interval = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
3622     }
3623     GNUNET_free(fancy_interval);
3624   }
3625
3626   /* Maximum datarate */
3627   if (GNUNET_OK !=
3628       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-udp",
3629                                              "MAX_BPS", &udp_max_bps))
3630   {
3631     udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */
3632   }
3633
3634   p = GNUNET_new (struct Plugin);
3635   p->port = port;
3636   p->aport = aport;
3637   p->broadcast_interval = interval;
3638   p->enable_ipv6 = enable_v6;
3639   p->enable_ipv4 = GNUNET_YES; /* default */
3640   p->enable_broadcasting = enable_broadcasting;
3641   p->enable_broadcasting_receiving = enable_broadcasting_recv;
3642   p->env = env;
3643   p->sessions = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
3644   p->defrag_ctxs = GNUNET_CONTAINER_heap_create (
3645       GNUNET_CONTAINER_HEAP_ORDER_MIN);
3646   p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages,
3647                                      p);
3648   GNUNET_BANDWIDTH_tracker_init (&p->tracker,
3649                                  NULL,
3650                                  NULL,
3651                                  GNUNET_BANDWIDTH_value_init ((uint32_t) udp_max_bps),
3652                                  30);
3653   LOG(GNUNET_ERROR_TYPE_DEBUG,
3654       "Setting up sockets\n");
3655   res = setup_sockets (p,
3656                        (GNUNET_YES == have_bind6) ? &server_addrv6 : NULL,
3657                        (GNUNET_YES == have_bind4) ? &server_addrv4 : NULL);
3658   if ((res == 0) || ((p->sockv4 == NULL )&& (p->sockv6 == NULL)))
3659   {
3660     LOG (GNUNET_ERROR_TYPE_ERROR,
3661         _("Failed to create network sockets, plugin failed\n"));
3662     GNUNET_CONTAINER_multipeermap_destroy (p->sessions);
3663     GNUNET_CONTAINER_heap_destroy (p->defrag_ctxs);
3664     GNUNET_SERVER_mst_destroy (p->mst);
3665     GNUNET_free (p);
3666     return NULL;
3667   }
3668
3669   /* Setup broadcasting and receiving beacons */
3670   setup_broadcast (p, &server_addrv6, &server_addrv4);
3671
3672   api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
3673   api->cls = p;
3674   api->send = NULL;
3675   api->disconnect_session = &udp_disconnect_session;
3676   api->query_keepalive_factor = &udp_query_keepalive_factor;
3677   api->disconnect_peer = &udp_disconnect;
3678   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
3679   api->address_to_string = &udp_address_to_string;
3680   api->string_to_address = &udp_string_to_address;
3681   api->check_address = &udp_plugin_check_address;
3682   api->get_session = &udp_plugin_get_session;
3683   api->send = &udp_plugin_send;
3684   api->get_network = &udp_get_network;
3685   api->update_session_timeout = &udp_plugin_update_session_timeout;
3686   api->setup_monitor = &udp_plugin_setup_monitor;
3687   return api;
3688 }
3689
3690
3691 /**
3692  * Function called on each entry in the defragmentation heap to
3693  * clean it up.
3694  *
3695  * @param cls NULL
3696  * @param node node in the heap (to be removed)
3697  * @param element a `struct DefragContext` to be cleaned up
3698  * @param cost unused
3699  * @return #GNUNET_YES
3700  */
3701 static int
3702 heap_cleanup_iterator (void *cls,
3703                        struct GNUNET_CONTAINER_HeapNode *node,
3704                        void *element,
3705                        GNUNET_CONTAINER_HeapCostType cost)
3706 {
3707   struct DefragContext *d_ctx = element;
3708
3709   GNUNET_CONTAINER_heap_remove_node (node);
3710   GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
3711   GNUNET_free (d_ctx);
3712   return GNUNET_YES;
3713 }
3714
3715
3716 /**
3717  * The exported method. Makes the core api available via a global and
3718  * returns the udp transport API.
3719  *
3720  * @param cls our `struct GNUNET_TRANSPORT_PluginEnvironment`
3721  * @return NULL
3722  */
3723 void *
3724 libgnunet_plugin_transport_udp_done (void *cls)
3725 {
3726   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
3727   struct Plugin *plugin = api->cls;
3728   struct PrettyPrinterContext *cur;
3729   struct PrettyPrinterContext *next;
3730   struct UDP_MessageWrapper *udpw;
3731
3732   if (NULL == plugin)
3733   {
3734     GNUNET_free(api);
3735     return NULL;
3736   }
3737   stop_broadcast (plugin);
3738   if (plugin->select_task != NULL)
3739   {
3740     GNUNET_SCHEDULER_cancel (plugin->select_task);
3741     plugin->select_task = NULL;
3742   }
3743   if (plugin->select_task_v6 != NULL)
3744   {
3745     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
3746     plugin->select_task_v6 = NULL;
3747   }
3748
3749   /* Closing sockets */
3750   if (GNUNET_YES == plugin->enable_ipv4)
3751   {
3752     if (NULL != plugin->sockv4)
3753     {
3754       GNUNET_break (GNUNET_OK ==
3755                     GNUNET_NETWORK_socket_close (plugin->sockv4));
3756       plugin->sockv4 = NULL;
3757     }
3758     GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
3759     GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
3760   }
3761   if (GNUNET_YES == plugin->enable_ipv6)
3762   {
3763     if (NULL != plugin->sockv6)
3764     {
3765       GNUNET_break (GNUNET_OK ==
3766                     GNUNET_NETWORK_socket_close (plugin->sockv6));
3767       plugin->sockv6 = NULL;
3768
3769       GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
3770       GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
3771     }
3772   }
3773   if (NULL != plugin->nat)
3774   {
3775     GNUNET_NAT_unregister (plugin->nat);
3776     plugin->nat = NULL;
3777   }
3778   if (NULL != plugin->defrag_ctxs)
3779   {
3780     GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
3781                                    &heap_cleanup_iterator, NULL);
3782     GNUNET_CONTAINER_heap_destroy (plugin->defrag_ctxs);
3783     plugin->defrag_ctxs = NULL;
3784   }
3785   if (NULL != plugin->mst)
3786   {
3787     GNUNET_SERVER_mst_destroy (plugin->mst);
3788     plugin->mst = NULL;
3789   }
3790
3791   /* Clean up leftover messages */
3792   udpw = plugin->ipv4_queue_head;
3793   while (NULL != udpw)
3794   {
3795     struct UDP_MessageWrapper *tmp = udpw->next;
3796     dequeue (plugin, udpw);
3797     call_continuation (udpw, GNUNET_SYSERR);
3798     GNUNET_free(udpw);
3799     udpw = tmp;
3800   }
3801   udpw = plugin->ipv6_queue_head;
3802   while (NULL != udpw)
3803   {
3804     struct UDP_MessageWrapper *tmp = udpw->next;
3805     dequeue (plugin, udpw);
3806     call_continuation (udpw, GNUNET_SYSERR);
3807     GNUNET_free(udpw);
3808     udpw = tmp;
3809   }
3810
3811   /* Clean up sessions */
3812   LOG (GNUNET_ERROR_TYPE_DEBUG,
3813        "Cleaning up sessions\n");
3814   GNUNET_CONTAINER_multipeermap_iterate (plugin->sessions,
3815                                          &disconnect_and_free_it, plugin);
3816   GNUNET_CONTAINER_multipeermap_destroy (plugin->sessions);
3817
3818   next = plugin->ppc_dll_head;
3819   for (cur = next; NULL != cur; cur = next)
3820   {
3821     GNUNET_break(0);
3822     next = cur->next;
3823     GNUNET_CONTAINER_DLL_remove (plugin->ppc_dll_head,
3824                                  plugin->ppc_dll_tail,
3825                                  cur);
3826     GNUNET_RESOLVER_request_cancel (cur->resolver_handle);
3827     GNUNET_free (cur);
3828   }
3829   GNUNET_free (plugin);
3830   GNUNET_free (api);
3831   return NULL;
3832 }
3833
3834 /* end of plugin_transport_udp.c */